`
kelvinliu117
  • 浏览: 19071 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

hive UDF UDAF UDTF

    博客分类:
  • hive
阅读更多

Hive进行UDF开发十分简单,此处所说UDF为Temporary的function,所以需要hive版本在0.4.0以上才可以。

一、背景:Hive是基于Hadoop中的MapReduce,提供HQL查询的数据仓库。Hive是一个很开放的系统,很多内容都支持用户定制,包括:

a)文件格式:Text File,Sequence File

b)内存中的数据格式: Java Integer/String, Hadoop IntWritable/Text

c)用户提供的 map/reduce 脚本:不管什么语言,利用 stdin/stdout 传输数据

d)用户自定义函数: Substr, Trim, 1 – 1

e)用户自定义聚合函数: Sum, Average…… n – 1

2、定义:UDF(User-Defined-Function),用户自定义函数对数据进行处理。

二、用法

1、UDF函数可以直接应用于select语句,对查询结构做格式化处理后,再输出内容。

2、编写UDF函数的时候需要注意一下几点:

a)自定义UDF需要继承org.apache.hadoop.hive.ql.UDF。

b)需要实现evaluate函。

c)evaluate函数支持重载。

3、以下是两个数求和函数的UDF。evaluate函数代表两个整型数据相加,两个浮点型数据相加,可变长数据相加

    Hive的UDF开发只需要重构UDF类的evaluate函数即可。例:

package hive.connect;

import org.apache.hadoop.hive.ql.exec.UDF;

public final class Add extends UDF {

public Integer evaluate(Integer a, Integer b) {

               if (null == a || null == b) {

                               return null;

               } return a + b;

}

public Double evaluate(Double a, Double b) {

               if (a == null || b == null)

                               return null;

                               return a + b;

               }

public Integer evaluate(Integer... a) {

               int total = 0;

               for (int i = 0; i < a.length; i++)

                               if (a[i] != null)

                                              total += a[i];

                                              return total;

                               }

}

4、步骤

a)把程序打包放到目标机器上去;

b)进入hive客户端,添加jar包:hive>add jar /run/jar/udf_test.jar;

c)创建临时函数:hive>CREATE TEMPORARY FUNCTION add_example AS 'hive.udf.Add';

d)查询HQL语句:

SELECT add_example(8, 9) FROM scores;

SELECT add_example(scores.math, scores.art) FROM scores;

SELECT add_example(6, 7, 8, 6.8) FROM scores;

e)销毁临时函数:hive> DROP TEMPORARY FUNCTION add_example;

5、细节在使用UDF的时候,会自动进行类型转换,例如:

SELECT add_example(8,9.1) FROM scores;

注:

1.   UDF只能实现一进一出的操作,如果需要实现多进一出,则需要实现UDAF

下面来看下UDAF:

(二)、UDAF

1、Hive查询数据时,有些聚类函数在HQL没有自带,需要用户自定义实现。

2、用户自定义聚合函数: Sum, Average…… n – 1

UDAF(User- Defined Aggregation Funcation)

一、用法

1、一下两个包是必须的import org.apache.hadoop.hive.ql.exec.UDAF和 org.apache.hadoop.hive.ql.exec.UDAFEvaluator。

2、函数类需要继承UDAF类,内部类Evaluator实UDAFEvaluator接口。

3、Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数。

a)init函数实现接口UDAFEvaluator的init函数。

b)iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean。

c)terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据,terminatePartial类似于hadoop的Combiner。

d)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean。

e)terminate返回最终的聚集函数结果。

package hive.udaf;

import org.apache.hadoop.hive.ql.exec.UDAF;

import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

public class Avg extends UDAF {

         public static class AvgState {

         private long mCount;

         private double mSum;

}

public static class AvgEvaluator implements UDAFEvaluator {

         AvgState state;

         public AvgEvaluator() {

                   super();

                   state = new AvgState();

                   init();

}

/** * init函数类似于构造函数,用于UDAF的初始化 */

public void init() {

         state.mSum = 0;

         state.mCount = 0;

}

/** * iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean * * @param o * @return */

public boolean iterate(Double o) {

         if (o != null) {

                   state.mSum += o;

                   state.mCount++;

         } return true;

}

/** * terminatePartial无参数,其为iterate函数轮转结束后,返回轮转数据, * terminatePartial类似于hadoop的Combiner * * @return */

public AvgState terminatePartial() {

         // combiner

         return state.mCount == 0 ? null : state;

}

/** * merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean * * @param o * @return */

public boolean terminatePartial(Double o) {                

         if (o != null) {

                   state.mCount += o.mCount;

                   state.mSum += o.mSum;

         }

         return true;

}

/** * terminate返回最终的聚集函数结果 * * @return */

public Double terminate() {

         return state.mCount == 0 ? null : Double.valueOf(state.mSum / state.mCount);

}

}

5、执行求平均数函数的步骤

a)将java文件编译成Avg_test.jar。

b)进入hive客户端添加jar包:

hive>add jar /run/jar/Avg_test.jar。

c)创建临时函数:

hive>create temporary function avg_test 'hive.udaf.Avg';

d)查询语句:

hive>select avg_test(scores.math) from scores;

e)销毁临时函数:

hive>drop temporary function avg_test;

五、总结

1、重载evaluate函数。

2、UDF函数中参数类型可以为Writable,也可为java中的基本数据对象。

3、UDF支持变长的参数。

4、Hive支持隐式类型转换。

5、客户端退出时,创建的临时函数自动销毁。

6、evaluate函数必须要返回类型值,空的话返回null,不能为void类型。

7、UDF是基于单条记录的列进行的计算操作,而UDFA则是用户自定义的聚类函数,是基于表的所有记录进行的计算操作。

8、UDF和UDAF都可以重载。

9、查看函数

SHOW FUNCTIONS;

1. UDTF介绍

UDTF(User-Defined Table-Generating Functions)  用来解决 输入一行输出多行(On-to-many maping) 的需求。

2. 编写自己需要的UDTF

 

继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF。

实现initialize, process, close三个方法

UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。初始化完成后,会调用process方法,对传入的参数进行处理,可以通过forword()方法把结果返回。最后close()方法调用,对需要清理的方法进行清理。

下面是一个用来切分”key:value;key:value;”这种字符串,返回结果为key, value两个字段。供参考:

    import java.util.ArrayList;

   

    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;

    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;

    import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;

    import org.apache.hadoop.hive.ql.metadata.HiveException;

    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;

    import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

   import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

  

   public class ExplodeMap extends GenericUDTF{

  

       @Override

       public void close() throws HiveException {

           // TODO Auto-generated method stub    

       }

  

       @Override

       public StructObjectInspector initialize(ObjectInspector[] args)

               throws UDFArgumentException {

           if (args.length != 1) {

               throw new UDFArgumentLengthException("ExplodeMap takes only one argument");

           }

           if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {

               throw new UDFArgumentException("ExplodeMap takes string as a parameter");

           }

  

           ArrayList<String> fieldNames = new ArrayList<String>();

           ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

           fieldNames.add("col1");

           fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

           fieldNames.add("col2");

           fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

  

           return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);

       }

  

      @Override

       public void process(Object[] args) throws HiveException {

           String input = args[0].toString();

           String[] test = input.split(";");

           for(int i=0; i<test.length; i++) {

               try {

                   String[] result = test[i].split(":");

                   forward(result);

               } catch (Exception e) {

                  continue;

              }

         }

       }

   }

 

 

 

3. 使用方法

 

 

 

 

UDTF有两种使用方法,一种直接放到select后面,一种和lateral view一起使用。

 

1:直接select中使用:select explode_map(properties) as (col1,col2) from src;

 

不可以添加其他字段使用:select a, explode_map(properties) as (col1,col2) from src

不可以嵌套调用:select explode_map(explode_map(properties)) from src

不可以和group by/cluster by/distribute by/sort by一起使用:select explode_map(properties) as (col1,col2) from src group by col1, col2

 

2:和lateral view一起使用:select src.id, mytable.col1, mytable.col2 from src lateral view explode_map(properties) mytable as col1, col2;

 

此方法更为方便日常使用。执行过程相当于单独执行了两次抽取,然后union到一个表里。

分享到:
评论

相关推荐

    spark-hive-udf:Spark Hive UDF示例

    Spark Hive UDF示例 建立项目 mvn clean package 将spark-hive-udf-1.0.0-SNAPSHOT.jar复制到边缘节点临时目录 spark-hive-udf]# cp target/spark-hive-udf-1.0.0-SNAPSHOT.jar /tmp 通过提供罐子来启动火花壳 spark...

    javasql笔试题-spark-hive-udf:展示如何在ApacheSpark中使用HiveUDF的示例项目

    Hive UDF 项目 介绍 该项目只是一个示例,包含多个 (UDF),用于 Apache Spark。 它旨在演示如何在 Scala 或 Java 中构建 Hive UDF 并在 . 为什么要使用 Hive UDF? Hive UDF 的一个特别好的用途是与 Python 和 ...

    自定义hive函数

    自定义 hive udf udaf 有url解析,获取网站主域名,根据ip获取区域码,有rownum,列聚合以及一些业务实现udf。

    mustached-hive-udfs:一些有用的 Hive UDF 和 UDAF

    这是一些有用的 Hive UDF 和 UDAF 的集合。 提供的功能 UDAF Mode ( de.frosner.hive.udaf.Mode ) - 计算组列的统计模式 从源头构建 git clone https://github.com/FRosner/mustached-hive-udfs.git cd mustached...

    hive-udf-tools:hive udf 部署工具,开发工具...

    hive-udf-hook UDF开发及发布过程 1 用户编写UDF实现类 2 编写完成后,在UDFHooks类中调用相关注册函数: 调用 FunctionRegistry.registerUDF 注册udf 调用 FunctionRegistry.registerUDAF 注册udaf 调用...

    hive:个人配置单元 UDAF

    个人 Hive UDAF 有一堆 Hive UDAF(用户定义的聚合函数)不在标准 Hive 分布中,因为它们可能会导致大型数据集的 OOM。 要使用它们,您需要加载 jar 文件,然后为每个要使用的函数创建一个临时函数: ADD JAR target...

    datasketches-hive:Hive的草图适配器

    =================适用于Apache Hive的DataSketches Java UDF / UDAF适配器请访问主要的以获取更多信息。 如果您有兴趣对此站点做出贡献,请参阅我们的页面以了解如何与我们联系。Hadoop Hive UDF / UDAF 请参阅Java...

    hive常用函数

    hive常用函数,包括时间、类型、udf、udaf等等的归纳。

    hive-udf:hive自定义函数

    hive-udfhive自定义函数主要实现hive3种自定义函数1,udf函数,主要用于处理一对一数据处理2,udtf函数,主要用于处理一对多数据处理2,udaf函数,主要用与处理多对一数据聚合处理

    赵伟:HIVE在腾讯分布式数据仓库实践

    赵伟首先介绍了他们的TDW核心架构,HIVE,MapReduce,HDFS及PostgreSQL构成。赵伟分享了最核心的HIVE模块在TDW中的实践经验;HIVE是一个在Hadoop上构建数据仓库的软件,它...实现了基本的SQL功能,可扩充UDF/UDAF...

    hive

    Apache Hive(TM)数据仓库软件有助于查询和... HiveQL还可以使用自定义标量函数(UDF),聚合(UDAF)和表函数(UDTF)进行扩展。https://mirrors.tuna.tsinghua.edu.cn/apache/hive/hive-standalone-metastore-3.0.0/

    蜂巢:Apache蜂巢

    HiveSQL也可以通过用户定义的函数(UDF),用户定义的集合(UDAF)和用户定义的表函数(UDTF)扩展为用户代码。 Hive用户在执行SQL查询时可以选择3种运行时。 用户可以选择Apache Hadoop MapReduce,Apache Tez或...

    大数据场景化解决方案.pdf

    UDTF(User-Defined Table-Generating Functions) ⽤于接收单个数据⾏,并产⽣多个数据⾏作为输出。 Hive调优 数据倾斜 数据倾斜指计算数据的时候,数据的分散度不够,导致⼤量的数据集中到了⼀台或者⼏台机器上...

    Hadoop权威指南(中文版)2015上传.rar

    1.8.2 编写UDAF 第13章 HBase 2.1 HBasics 2.1.1 背景 2.2 概念 2.2.1 数据模型的"旋风之旅" 2.2.2 实现 2.3 安装 2.3.1 测试驱动 2.4 客户机 2.4.1 Java 2.4.2 Avro,REST,以及Thrift 2.5 示例 2.5.1 模式 2.5.2 ...

    Hadoop权威指南 第二版(中文版)

     1.8.2 编写UDAF 第13章 HBase  2.1 HBasics  2.1.1 背景  2.2 概念  2.2.1 数据模型的“旋风之旅”  2.2.2 实现  2.3 安装  2.3.1 测试驱动  2.4 客户机  2.4.1 Java  2.4.2 Avro,REST,以及Thrift  ...

    Spark:Apache Spark是一个快速的内存数据处理引擎,具有优雅且富有表现力的开发API,可让数据工作者高效执行需要快速迭代访问数据集的流,机器学习或SQL工作负载。该项目将在Scala中提供Spark的示例程序语

    使用Spark-2.1实现自定义UDF,UDAF,Partitioner 使用数据框(ComplexSchema,DropDuplicates,DatasetConversion,GroupingAndAggregation) 使用数据集处理Parquet文件按特定列对数据进行分区并按分区进行存储使用...

Global site tag (gtag.js) - Google Analytics