「流式计算java」流式计算应用场景
本篇文章给大家谈谈流式计算java,以及流式计算应用场景对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
Blink流式计算-Kafka接入demo
//定义解析Kakfa message的UDTF
CREATE FUNCTION myParse AS 'com.xxxxxx.MyKafkaUDTF';
CREATE FUNCTION myUdf AS 'com.xxxxxxx.MyWaterMarkUDTF';
//注意:kafka源表DDL字段必须与以下例子一致
create table my_input (
messageKey VARBINARY,
`message` VARBINARY,
topic varchar,
`partition` int,
`offset` bigint,
ctTime AS TO_TIMESTAMP (myUdf (`message`)),
//注意计算里的类型必须为timestamp才能在做watermark。
WATERMARK wk FOR `ctTime` AS WITHOFFSET (`ctTime`, 2000) --为rowtime定义watermark
) WITH (
type = 'KAFKA08',
topic = 'myTopic',
`group.id` = 'mGroup',
extraConfig = 'bootstrap.servers=127.0.0.1:9092',
`zookeeper.connect` = '127.0.0.1:2181',
startupMode = 'EARLISET'
);
-- 滚动窗口 group by prodId
CREATE VIEW input_view01 (
windowStart,
windowEnd,
prodId,
prodName,
prodNumber
) AS
SELECT
HOP_START (S.ctTime, INTERVAL '30' SECOND, INTERVAL '2' MINUTE),
HOP_END (S.ctTime, INTERVAL '30' SECOND, INTERVAL '2' MINUTE),
T.prodId as prodId,
T.prodName as prodName,
count (*) as prodNumber
from
my_input as S,
LATERAL TABLE (myParse (`message`)) as T (
id,
prodId,
prodName,
createdAt,
updatedAt
)
Group BY HOP (S.ctTime, INTERVAL '30' SECOND, INTERVAL '2' MINUTE), T.prodId, T.prodName;
CREATE VIEW input_view60 (
id,
prodId,
prodName,
createdAt,
updatedAt
) AS
SELECT
T.id,
T.prodId,
T.prodName,
T.createdAt,
T.updatedAt
from
my_input as S,
LATERAL TABLE (myParse (`message`)) as T (
id,
goCs,
prodId,
prodName,
createdAt,
updatedAt
);
-- 结果print
create table outprint01(
prodId bigint,
prodName varchar,
prodNumber bigint
)with(
type = 'print'
);
insert into outprint01
select prodId , prodName , prodNumber
from input_view01;
-- 结算结果写入Kafka
create table result_kafka (
messageKey VARBINARY,
`message` VARBINARY,
PRIMARY KEY (messageKey)
) with (
type = 'KAFKA08',
topic = 'myResultTopic',
extraConfig='bootstrap.servers=127.0.0.1:9092',
`zookeeper.connect` = '127.0.0.1:2181',
startupMode='EARLISET'
);
//此处的结果输出,可以考虑将结果组装成字符串,中间用|隔开,接收方再解析
INSERT INTO
result_kafka
SELECT
cast(prodId as VARBINARY) as messageKey,
cast(prodName as VARBINARY) as `message`
FROM
input_view01;
MyKafkaUDTF写法:
package com.xxxxxxxx;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.types.Row;
import java.io.UnsupportedEncodingException;
import java.sql.Timestamp;
public class MyKafkaUDTF extends TableFunctionRow {
public void eval(byte[] message) {
try {
String msg = new String(message, "UTF-8");
System.out.println("收到的消息:"+msg);
try {
JSONObject jsonObject = JSONObject.parseObject(msg);
if (jsonObject != null) {
//id
Long id = jsonObject.getLong("id");
//prodId
Long prodId = jsonObject.getLong("prodId");
//prodName
String prodName = jsonObject.getString("prodName ");
Long createAt = jsonObject.getLong("createdAt");
Long updatedAt = jsonObject.getLong("updatedAt");
//创建时间时间戳
Timestamp createAtTimeStamp = new Timestamp(createAt);
Timestamp updatedAtTimeStamp = new Timestamp(updatedAt);
Row row = new Row(8);
row.setField(0, id);
row.setField(1, prodId);
row.setField(2, prodName);
row.setField(3, createAtTimeStamp );
row.setField(4, updatedAtTimeStamp );
System.out.println("message str ==" + row.toString());
collect(row);
}
} catch (Exception e) {
e.printStackTrace();
System.out.println(" error. Input data " + msg + "is not json string");
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
// 如果返回值是Row,就必须重载实现这个方法,显式地告诉系统返回的字段类型
public DataType getResultType(Object[] arguments, Class[] argTypes) {
return DataTypes.createRowType(
DataTypes.LONG,
DataTypes.LONG,
DataTypes.STRING,
DataTypes.TIMESTAMP,
DataTypes.TIMESTAMP);
}
}
package xxxxxxx;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.table.functions.ScalarFunction;
import java.text.SimpleDateFormat;
import java.util.Date;
public class MyWaterMarkUDTF extends ScalarFunction {
public String eval(byte[] message) {
try {
String msg = new String(message, "UTF-8");
JSONObject data = JSONObject.parseObject(msg);
System.out.println("time:"+data.getString("createdAt"));
Long createAtLong = data.getLong("createdAt");
SimpleDateFormat parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String createTimeStr = parser.format(new Date(createAtLong));
return createTimeStr;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
//可选,close方法可以不写
@Override
public void close() {
}
}
java 8 流式计算 mapToDouble 会丢失精度吗?
ListString list = Arrays.asList("11.11555", "11.225555", "11.35553");
list.stream().mapToDouble(p - Double.parseDouble(p)).forEach(System.out::println);
mapToDouble应该不会丢失精度,这一步操作只是转换而已。丢失应该会在统计计算的时候丢失
北大青鸟java培训:学大数据最快的方式是什么?
我们都会想着自己能够快速学好一项技能,早些进入相关行业,节省一些时间成本。
不少想学大数据的朋友也会有这样的想法,就留言咨询笔者说,想知道学大数据最快的方式是什么,既然大家有这方面的疑问,那么北大青鸟就详细讲一下,学大数据最快的方式是什么,这个话题吧!1:要想知道学习某项技术的捷径,那么就先要了解,它要学什么。
我们先聊聊,大数据要学什么。
合格的大数据工程师,需要熟悉Java,Scala开发,至少熟练使用一种脚本语言如Shell、Python等;有流式计算Sparkstreaming实时数据处理技术项目经验;熟练使用一种数据库开发技术:Oracle、Postgres、MySQL等,能运用SQL实现数据加工处理。
2:除此之外,大数据工程师还需要熟悉hadoop及相关组件,如Hive、Zookeeper、Flume、Kafka、Storm、Spark、Yarn、Impala等,搜索引擎等大数据框架;用过Kafka/Flume/ELK等常见的一种或多种数据收集处理技术;熟悉开源大数据存储方案(如:HBase,Cassandra,MongoDB)等等。
3:从以上就能看得出来,大数据工程师需要掌握的技术还是不少的,学好这门技术的最快方式,就是参加专业的大数据培训,没有之一。
关于流式计算java和流式计算应用场景的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。