「java编译udtf」java编译时找不到符号

博主:adminadmin 2022-12-27 23:39:13 55

今天给各位分享java编译udtf的知识,其中也会对java编译时找不到符号进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

本文目录一览:

将Mysql中 test表同步到ES中,并且将tags(逗号分隔的字符串)转化数组同步到ES中的数组。

Mysql中test表结构

数据如下:

ES中数据结构

Flink 中

运行Flink任务脚本如下:

自定义UDTF函数参考阿里云链接,注意需要使用java8

上传jar包后,如果返回如下表明包可以上传。

查看tags的类型

查询hive中有哪些自定义函数

如何在HIVE中使用自己定义的函数,可以有几种方式:

1.在HIVE会话中add 自定义函数的jar文件,然后创建function,继而使用函数

2.在进入HIVE会话之前先自动执行创建function,不用用户手工创建

3.把自定义的函数写到系统函数中,使之成为HIVE的一个默认函数,这样就不需要create temporary function

--------------------------------------------------------------------------------

1.在HIVE会话中add 自定义函数的jar文件,然后创建function,继而使用函数

hive ADD JAR /home/hugh.wangp/UDFMd5.jar;

Added /home/hugh.wangp/UDFMd5.jar to class path

hive CREATE TEMPORARY FUNCTION udfmd5 AS 'udf.UDFMd5';

OK

Time taken: 0.014 seconds

hive select udfmd5('a') from dual;

OK

0cc175b9c0f1b6a831c399e269772661

这种方式的弊端是:每次打开新的会话,就要重新执行一遍如上的add jar和create temporary function的命令。对普通的业务分析人员未免要求太高。第二种方法可以把业务人员释放出来

--------------------------------------------------------------------------------

2.在进入HIVE会话之前先自动执行创建function

HIVE命令有个参数-i:在进入会话,待用户输入自己的HQL之前,先执行-i的参数。我们只需要把add jar和create temporary function的命令写到一个文件中,并把这个文件传到-i的参数,如此一来省去了每次要手工创建的工作。

但是这种方式也有个弊端,当我增添HIVE的gateway,就要拷贝一份这个文件,有时候遗漏真是不可避免的。第三种方法应该就是一个终极方案,只要HIVE本身代码不做修改,或者不删除特定的功能,这个函数就能一直用,而且可以把他作为HIVE的默认函数那样使用

--------------------------------------------------------------------------------

3.把自定义的函数写到系统函数中,使之成为HIVE的一个默认函数

a.编写自己的UDF/UDAF/UDTF,并把代码放到$HIVE_HOME/src/ql/src/java/org/apache/Hadoop/hive/ql/udf/路径下

b.修改$HIVE_HOME/src/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java

以HIVE自带函数Trim()举例,自定义函数操作一样。

第一步:

写UDF代码UDFTrim.java并放到$HIVE_HOME/src/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFTrim.java

第二步:

修改$HIVE_HOME/src/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java文件

a.import org.apache.hadoop.hive.ql.udf.UDFTrim;

b.registerUDF("trim", UDFTrim.class, false);

虽然这种方法是一劳永逸的方法,但是一旦错了,对整个HIVE使用都是灾难性的,所以不是特别通用的自定义函数还是采用前两种,一旦通用的自定义函数累计到一定程度,再去采用第三种方法。

更多相关内容可参考

Blink流式计算-Kafka接入demo

//定义解析Kakfa message的UDTF

CREATE FUNCTION myParse AS 'com.xxxxxx.MyKafkaUDTF';

CREATE FUNCTION myUdf AS 'com.xxxxxxx.MyWaterMarkUDTF';

//注意:kafka源表DDL字段必须与以下例子一致

create table my_input (

messageKey VARBINARY,

`message` VARBINARY,

topic varchar,

`partition` int,

`offset` bigint,

ctTime AS TO_TIMESTAMP (myUdf (`message`)),

//注意计算里的类型必须为timestamp才能在做watermark。

WATERMARK wk FOR `ctTime` AS WITHOFFSET (`ctTime`, 2000) --为rowtime定义watermark

) WITH (

type = 'KAFKA08',

topic = 'myTopic',

`group.id` = 'mGroup',

extraConfig = 'bootstrap.servers=127.0.0.1:9092',

`zookeeper.connect` = '127.0.0.1:2181',

startupMode = 'EARLISET'

);

-- 滚动窗口 group by prodId

CREATE VIEW input_view01 (

windowStart,

windowEnd,

prodId,

prodName,

prodNumber

) AS

SELECT

HOP_START (S.ctTime, INTERVAL '30' SECOND, INTERVAL '2' MINUTE),

HOP_END (S.ctTime, INTERVAL '30' SECOND, INTERVAL '2' MINUTE),

T.prodId as prodId,

T.prodName as prodName,

count (*) as prodNumber

from

my_input as S,

LATERAL TABLE (myParse (`message`)) as T (

id,

prodId,

prodName,

createdAt,

updatedAt

)

Group BY HOP (S.ctTime, INTERVAL '30' SECOND, INTERVAL '2' MINUTE), T.prodId, T.prodName;

CREATE VIEW input_view60 (

id,

prodId,

prodName,

createdAt,

updatedAt

) AS

SELECT

T.id,

T.prodId,

T.prodName,

T.createdAt,

T.updatedAt

from

my_input as S,

LATERAL TABLE (myParse (`message`)) as T (

id,

goCs,

prodId,

prodName,

createdAt,

updatedAt

);

-- 结果print

create table outprint01(

prodId bigint,

prodName varchar,

prodNumber bigint

)with(

type = 'print'

);

insert into outprint01

select prodId , prodName , prodNumber

from input_view01;

-- 结算结果写入Kafka

create table result_kafka (

messageKey VARBINARY,

`message` VARBINARY,

PRIMARY KEY (messageKey)

) with (

type = 'KAFKA08',

topic = 'myResultTopic',

extraConfig='bootstrap.servers=127.0.0.1:9092',

`zookeeper.connect` = '127.0.0.1:2181',

startupMode='EARLISET'

);

//此处的结果输出,可以考虑将结果组装成字符串,中间用|隔开,接收方再解析

INSERT INTO

result_kafka

SELECT

cast(prodId as VARBINARY) as messageKey,

cast(prodName as VARBINARY) as `message`

FROM

input_view01;

MyKafkaUDTF写法:

package com.xxxxxxxx;

import com.alibaba.fastjson.JSONObject;

import org.apache.flink.table.functions.TableFunction;

import org.apache.flink.table.types.DataType;

import org.apache.flink.table.types.DataTypes;

import org.apache.flink.types.Row;

import java.io.UnsupportedEncodingException;

import java.sql.Timestamp;

public class MyKafkaUDTF extends TableFunctionRow {

public void eval(byte[] message) {

try {

String msg = new String(message, "UTF-8");

System.out.println("收到的消息:"+msg);

try {

JSONObject jsonObject = JSONObject.parseObject(msg);

if (jsonObject != null) {

//id

Long id = jsonObject.getLong("id");

//prodId

Long prodId = jsonObject.getLong("prodId");

//prodName

String prodName = jsonObject.getString("prodName ");

Long createAt = jsonObject.getLong("createdAt");

Long updatedAt = jsonObject.getLong("updatedAt");

//创建时间时间戳

Timestamp createAtTimeStamp = new Timestamp(createAt);

Timestamp updatedAtTimeStamp = new Timestamp(updatedAt);

Row row = new Row(8);

row.setField(0, id);

row.setField(1, prodId);

row.setField(2, prodName);

row.setField(3, createAtTimeStamp );

row.setField(4, updatedAtTimeStamp );

System.out.println("message str ==" + row.toString());

collect(row);

}

} catch (Exception e) {

e.printStackTrace();

System.out.println(" error. Input data " + msg + "is not json string");

}

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

}

@Override

// 如果返回值是Row,就必须重载实现这个方法,显式地告诉系统返回的字段类型

public DataType getResultType(Object[] arguments, Class[] argTypes) {

return DataTypes.createRowType(

DataTypes.LONG,

DataTypes.LONG,

DataTypes.STRING,

DataTypes.TIMESTAMP,

DataTypes.TIMESTAMP);

}

}

package xxxxxxx;

import com.alibaba.fastjson.JSONObject;

import org.apache.flink.table.functions.ScalarFunction;

import java.text.SimpleDateFormat;

import java.util.Date;

public class MyWaterMarkUDTF extends ScalarFunction {

public String eval(byte[] message) {

try {

String msg = new String(message, "UTF-8");

JSONObject data = JSONObject.parseObject(msg);

System.out.println("time:"+data.getString("createdAt"));

Long createAtLong = data.getLong("createdAt");

SimpleDateFormat parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

String createTimeStr = parser.format(new Date(createAtLong));

return createTimeStr;

} catch (Exception e) {

e.printStackTrace();

}

return null;

}

//可选,close方法可以不写

@Override

public void close() {

}

}

关于java编译udtf和java编译时找不到符号的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

The End

发布于:2022-12-27,除非注明,否则均为首码项目网原创文章,转载请注明出处。