Hive自定义UDF,UDTF函数
(1) 自定义UDF:继承UDF,重写 evaluate 方法(2) 自定义 UDTF:继承自 GenericUDTF,重写 3 个方法:initialize(自定义输出的列名和类型),process(将结果返回 forward(result)),close打包=》上传集群路径=》在hive客户端注册为什么要自定义UDF/UDTF?因为自定义函数,可以自己埋点 Log 打印日志,出错或者数据异常,
自定义UDF,UDTF,UDAF函数
(1) 自定义UDF:继承UDF,重写 evaluate 方法
(2) 自定义 UDTF:继承自 GenericUDTF,重写 3 个方法:initialize(自定义输出的列名和类型),process(将结果返回 forward(result)),close
打包=》上传集群路径=》在hive客户端注册
为什么要自定义UDF/UDTF?
因为自定义函数,可以自己埋点 Log 打印日志,出错或者数据异常,方便调试。
依赖以及打jar包插件配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flumeInterceptor</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
UDF
进一出一
方法一:继承UDF类(hive3.x 已经不推荐使用 Deprecated)
package com.ambitfly.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
public class MyLength extends UDF{
public String evaluate(String s){
return s.length()+"";
}
}
添加jar包
add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;
创建临时函数
CREATE TEMPORARY FUNCTION mylength AS ‘com.ambitfly.udf.MyLength’;
销毁临时函数
DROP TEMPORARY FUNCTION myLength;
方法二:继承GenericUDF(可以传入多个值,可变参数,结构体等复杂类型)
示例一:myToUpper
输入String,返回String
package com.ambitfly.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import java.util.concurrent.ExecutionException;
public class MyToUpper extends GenericUDF {
private ObjectInspector strObj;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
if(arguments.length != 1){
throw new UDFArgumentException("upper函数需要一个参数");
}
strObj = (ObjectInspector)arguments[0];
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
LazyString lString = (LazyString) (arguments[0].get());
String str = ((StringObjectInspector) strObj).getPrimitiveJavaObject(lString);
if(str == null){
return null;
}
return str.toUpperCase();
}
@Override
public String getDisplayString(String[] children) {
return "";
}
}
添加jar包
add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;
创建临时函数
CREATE TEMPORARY FUNCTION myToUpper AS ‘com.ambitfly.udf.MyToUpper’;
销毁临时函数
DROP TEMPORARY FUNCTION myToUpper;
示例二:MyScoreToStruct
输入math,english,history 输出 {“math”:“89”,“english”:“78”,“history”:“99”}
建表语句&插入数据:
create table score(
name string,
math string,
english string,
history string
)
row format delimited fields terminated by ',';
-- LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
-- 数据导入
load data local inpath '/opt/data/hivefunctiondata/score.txt' into table score;
数据:
aa,89,78,99
bb,90,67,64
cc,91,98,90
dd,45,30,68
代码:
package com.ambitfly.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.Text;
import org.omg.CORBA.Object;
import java.util.ArrayList;
class MyScoreToStruct extends GenericUDF {
//输入变量类型定义
private ObjectInspector mathObj;
private ObjectInspector englishObj;
private ObjectInspector historyObj;
/**
* 两个作用:
* 1.判断输入参数个数,输入类型是否正确
* 2.定义输入类型
* @param arguments
* @return
* @throws UDFArgumentException
*/
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
if(arguments.length != 3){
throw new UDFArgumentException("必须输入3个参数!");
}
mathObj = (ObjectInspector)arguments[0];
englishObj = (ObjectInspector)arguments[1];
historyObj = (ObjectInspector)arguments[2];
//输出结构体定义
ArrayList structFieldNames = new ArrayList();
ArrayList structFieldObjectInspectors = new ArrayList();
structFieldNames.add("math");
structFieldNames.add("english");
structFieldNames.add("history");
structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
}
@Override
public Object[] evaluate(DeferredObject[] arguments) throws HiveException {
LazyString lazyMath = (LazyString) (arguments[0].get());
LazyString lazyEnglish = (LazyString) (arguments[1].get());
LazyString lazyHistory = (LazyString) (arguments[2].get());
String strMath =((StringObjectInspector)mathObj).getPrimitiveJavaObject(lazyMath);
String strEnglish =((StringObjectInspector)englishObj).getPrimitiveJavaObject(lazyEnglish);
String strHistory =((StringObjectInspector)historyObj).getPrimitiveJavaObject(lazyHistory);
Object[] e = new Object[3];
e[0] = (Object) new Text(strMath);
e[1] = (Object) new Text(strEnglish);
e[2] = (Object) new Text(strHistory);
return e;
}
@Override
public String getDisplayString(String[] children) {
return "";
}
}
添加jar包
add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;
创建临时函数
CREATE TEMPORARY FUNCTION myScoreToStruct AS ‘com.ambitfly.udf.MyScoreToStruct’;
销毁临时函数
DROP TEMPORARY FUNCTION myScoreToStruct ;
示例三:MyScoreStructToSumScore
输入 {“math”:“89”,“english”:“78”,“history”:“99”} 输出 266
代码:
package com.ambitfly.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;
import java.util.List;
public class MyScoreStructToSumScore extends GenericUDF {
private StructObjectInspector scoreObj;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
if(arguments.length != 1){
throw new UDFArgumentException("传入的参数不为1!");
}
scoreObj = (StructObjectInspector)arguments[0];
return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
// LazyStruct lazyStructScore = null;
// try {
// lazyStructScore = (LazyStruct)(arguments[0].get);
// } catch (Exception e) {
// throw new HiveException(e.getMessage());
// }
List<Object> structFieldsDataAsList = null;
try {
structFieldsDataAsList = scoreObj.getStructFieldsDataAsList(arguments[0].get());
} catch (Exception e) {
throw new HiveException(e.getMessage());
}
int sumScore = 0;
for (Object o : structFieldsDataAsList) {
try {
sumScore += Integer.parseInt(o.toString());
} catch (Exception e) {
throw new HiveException(e.getMessage());
}
}
return sumScore;
}
@Override
public String getDisplayString(String[] children) {
return null;
}
}
添加jar包
add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;
创建临时函数
CREATE TEMPORARY FUNCTION myScoreStructToSumScore AS ‘com.ambitfly.udf.MyScoreStructToSumScore’;
销毁临时函数
DROP TEMPORARY FUNCTION myScoreStructToSumScore ;
UDTF
进一出多
示例:MyExplodeScoreStruct
代码:
package com.ambitfly.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;
public class MyExplodeScoreStruct extends GenericUDTF {
ArrayList<String> output = new ArrayList<>();
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
if(argOIs.getAllStructFieldRefs().size()!=1){
throw new UDFArgumentException("传入的参数不为1!");
}
ArrayList structFieldNames = new ArrayList();
ArrayList structFieldObjectInspectors = new ArrayList();
structFieldNames.add("score");
structFieldObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
StandardStructObjectInspector standardStructObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
return standardStructObjectInspector;
}
@Override
public void process(Object[] args) throws HiveException {
String str = args[0].toString();
String[] split = str.split("-");
for (String s : split) {
output.clear();
output.add(s);
forward(output);
}
}
@Override
public void close() throws HiveException {
}
}
添加jar包
add jar /opt/module/hive/lib/hivefunction-1.0-SNAPSHOT.jar;
创建临时函数
CREATE TEMPORARY FUNCTION myExplodeScoreStruct AS ‘com.ambitfly.udf.MyExplodeScoreStruct’;
销毁临时函数
DROP TEMPORARY FUNCTION myExplodeScoreStruct;
查询:
select myExplodeScoreStruct(concat(math,'-',english,'-',history)) from score
select name,s from score LATERAL VIEW myExplodeScoreStruct(concat(math,'-',english,'-',history)) score AS s;
UDAF
即User-defined Aggregation Function(用户定义聚合函数),作用于多行记录上,返回一个结果值,多用于相同组内统计分析。
更多推荐
所有评论(0)