自定义UDF,UDTF,UDAF函数

(1) 自定义UDF:继承UDF,重写 evaluate 方法

(2) 自定义 UDTF:继承自 GenericUDTF,重写 3 个方法:initialize(自定义输出的列名和类型),process(将结果返回 forward(result)),close

打包=》上传集群路径=》在hive客户端注册

为什么要自定义UDF/UDTF?

因为自定义函数,可以自己埋点 Log 打印日志,出错或者数据异常,方便调试。

依赖以及打jar包插件配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flumeInterceptor</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
          <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
UDF

进一出一

方法一:继承UDF类(hive3.x 已经不推荐使用 Deprecated)
package com.ambitfly.udf;

import org.apache.hadoop.hive.ql.exec.UDF;

public class MyLength extends UDF{
    public String evaluate(String s){
        return s.length()+"";
    }
}

添加jar包

add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;

创建临时函数

CREATE TEMPORARY FUNCTION mylength AS ‘com.ambitfly.udf.MyLength’;

销毁临时函数

DROP TEMPORARY FUNCTION myLength;

方法二:继承GenericUDF(可以传入多个值,可变参数,结构体等复杂类型)
示例一:myToUpper

输入String,返回String

package com.ambitfly.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;

import java.util.concurrent.ExecutionException;

public class MyToUpper extends GenericUDF {
    private ObjectInspector strObj;
    
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        if(arguments.length != 1){
            throw new UDFArgumentException("upper函数需要一个参数");
        }

        strObj = (ObjectInspector)arguments[0];

        return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        LazyString lString =  (LazyString) (arguments[0].get());
        String str = ((StringObjectInspector) strObj).getPrimitiveJavaObject(lString);
        if(str == null){
            return null;
        }
        return str.toUpperCase();
    }

    @Override
    public String getDisplayString(String[] children) {
        return "";
    }
}

添加jar包

add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;

创建临时函数

CREATE TEMPORARY FUNCTION myToUpper AS ‘com.ambitfly.udf.MyToUpper’;

销毁临时函数

DROP TEMPORARY FUNCTION myToUpper;

示例二:MyScoreToStruct

输入math,english,history 输出 {“math”:“89”,“english”:“78”,“history”:“99”}

建表语句&插入数据:

create table score(
name string,
math string,
english string,
history string 
)
row format delimited fields terminated by ',';

-- LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
-- 数据导入
load data local inpath '/opt/data/hivefunctiondata/score.txt' into table score;

数据:

aa,89,78,99
bb,90,67,64
cc,91,98,90
dd,45,30,68

代码:

package com.ambitfly.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.Text;
import org.omg.CORBA.Object;

import java.util.ArrayList;

class MyScoreToStruct extends GenericUDF {
    //输入变量类型定义
    private ObjectInspector mathObj;
    private ObjectInspector englishObj;
    private ObjectInspector historyObj;


    /**
     * 两个作用:
     * 1.判断输入参数个数,输入类型是否正确
     * 2.定义输入类型
     * @param arguments
     * @return
     * @throws UDFArgumentException
     */
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        if(arguments.length != 3){
            throw new  UDFArgumentException("必须输入3个参数!");
        }
        mathObj = (ObjectInspector)arguments[0];
        englishObj = (ObjectInspector)arguments[1];
        historyObj = (ObjectInspector)arguments[2];

        //输出结构体定义
        ArrayList structFieldNames = new ArrayList();
        ArrayList structFieldObjectInspectors = new ArrayList();
        structFieldNames.add("math");
        structFieldNames.add("english");
        structFieldNames.add("history");
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);


        return ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
    }

    @Override
    public Object[] evaluate(DeferredObject[] arguments) throws HiveException {
        LazyString lazyMath = (LazyString) (arguments[0].get());
        LazyString lazyEnglish = (LazyString) (arguments[1].get());
        LazyString lazyHistory = (LazyString) (arguments[2].get());

        String strMath =((StringObjectInspector)mathObj).getPrimitiveJavaObject(lazyMath);
        String strEnglish =((StringObjectInspector)englishObj).getPrimitiveJavaObject(lazyEnglish);
        String strHistory =((StringObjectInspector)historyObj).getPrimitiveJavaObject(lazyHistory);

        Object[] e = new Object[3];

        e[0] = (Object) new Text(strMath);
        e[1] = (Object) new Text(strEnglish);
        e[2] = (Object) new Text(strHistory);
        
        return e;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "";
    }
}

添加jar包

add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;

创建临时函数

CREATE TEMPORARY FUNCTION myScoreToStruct AS ‘com.ambitfly.udf.MyScoreToStruct’;

销毁临时函数

DROP TEMPORARY FUNCTION myScoreToStruct ;

示例三:MyScoreStructToSumScore

输入 {“math”:“89”,“english”:“78”,“history”:“99”} 输出 266

代码:

package com.ambitfly.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

import java.util.List;

public class MyScoreStructToSumScore extends GenericUDF {
    private StructObjectInspector scoreObj;
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        if(arguments.length != 1){
            throw new UDFArgumentException("传入的参数不为1!");
        }
        scoreObj = (StructObjectInspector)arguments[0];

        return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
//        LazyStruct lazyStructScore = null;
//        try {
//            lazyStructScore = (LazyStruct)(arguments[0].get);
//        } catch (Exception e) {
//            throw new HiveException(e.getMessage());
//        }

        List<Object> structFieldsDataAsList = null;
        try {
            structFieldsDataAsList = scoreObj.getStructFieldsDataAsList(arguments[0].get());
        } catch (Exception e) {

            throw new HiveException(e.getMessage());
        }

        int sumScore = 0;
        for (Object o : structFieldsDataAsList) {

            try {
                sumScore += Integer.parseInt(o.toString());
            } catch (Exception e) {
                throw new HiveException(e.getMessage());
            }
        }


        return sumScore;
    }

    @Override
    public String getDisplayString(String[] children) {
        return null;
    }
}

添加jar包

add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;

创建临时函数

CREATE TEMPORARY FUNCTION myScoreStructToSumScore AS ‘com.ambitfly.udf.MyScoreStructToSumScore’;

销毁临时函数

DROP TEMPORARY FUNCTION myScoreStructToSumScore ;

UDTF

进一出多

示例:MyExplodeScoreStruct

代码:

package com.ambitfly.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;

public class MyExplodeScoreStruct extends GenericUDTF {
    ArrayList<String> output = new ArrayList<>();

    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        if(argOIs.getAllStructFieldRefs().size()!=1){
            throw new UDFArgumentException("传入的参数不为1!");
        }
        ArrayList structFieldNames = new ArrayList();
        ArrayList structFieldObjectInspectors = new ArrayList();
        structFieldNames.add("score");
        structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        StandardStructObjectInspector standardStructObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);

        return standardStructObjectInspector;
    }

    @Override
    public void process(Object[] args) throws HiveException {
        String str = args[0].toString();
        String[] split = str.split("-");
        for (String s : split) {
            output.clear();
            output.add(s);
            forward(output);
        }

    }

    @Override
    public void close() throws HiveException {

    }
}

添加jar包

add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;

创建临时函数

CREATE TEMPORARY FUNCTION myExplodeScoreStruct AS ‘com.ambitfly.udf.MyExplodeScoreStruct’;

销毁临时函数

DROP TEMPORARY FUNCTION myExplodeScoreStruct;

查询:

select myExplodeScoreStruct(concat(math,'-',english,'-',history)) from score
select name,s from score LATERAL VIEW myExplodeScoreStruct(concat(math,'-',english,'-',history)) score AS s;
UDAF

即User-defined Aggregation Function(用户定义聚合函数),作用于多行记录上,返回一个结果值,多用于相同组内统计分析。

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐