「kafka集成java」kafka集成spring
今天给各位分享kafka集成java的知识,其中也会对kafka集成spring进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!
本文目录一览:
- 1、clickhouse与kafka集成
- 2、kafka集群测试正常,但是Java连接kafka出现异常,急求大神解答!!!!!!!!!!!
- 3、java客户端使用kafka时什么情况下使用kafka client和spring kafka?
- 4、kafka与Flink集成问题记录
- 5、Springboot集成Kafka
clickhouse与kafka集成
clickhouse支持与多种存储引擎集成,可以从集成的引擎里面读取消息,然后写到真正的数据存储表里。
clickhouse批量写入的性能比较好,我们的业务场景下会大批量的产生数据,如果使用clickhouse-jdbc去写的,写入时机和每批次写入的数量不好把控,最终选择了先将消息写入kafka,然后由clickhouse从kafka消费数据,clickhouse server消费到数据之后写入真正的数据表。
clickhouse集成kafka引擎见官方文档:
下面的介绍会与官方文档有重复,然后补充一些集成过程中遇到的坑。
下面介绍clickhouse与kafka集成的步骤,clickhouse版本是22.1.3.7
必要参数
可选参数
关于必选参数中的kafka_format参数,参见Formats部分,format具体解释如下
。
JSONEachRow, JSONStringsEachRow, JSONCompactEachRow, JSONCompactStringsEachRow
这几种格式,ClickHouse会将行输出为用换行符分隔的JSON值,这些输出数据作为一个整体时,由于没有分隔符(,)因而不是有效的JSON文档。
官方文档给了一些示例。
由于我的真实的数据表,有一个字段是json类型的字符串,但是一开始设置kafka_format的类型为JSONEachRow时,从kafka消费数据会报错,所以kafka_format格式设置成了JSONAsString,具体的错误后面贴出来。
创建kafka引擎表,用于从kafka消费数据
由于我的数据结构里有嵌套json,如果使用JSONEachRow,有个字段是json类型的字符串,带转义字符,导致clickhouse解析失败,没找到解决办法,所以使用了JSONAsString格式。
一个简单的MergeTree引擎的表,其中content是json格式的字符串。
创建的物化视图用于把从kafka消费到的数据,写到真实的数据表里,在这个例子里,msg_json_source从kafka消费到数据,然后通过物化视图msg_json_source_consumer将消费到的数据写到真实的数据表msg_target中。
由于从kafka消费到的数据就是一个json字符串,在这里使用JSONExtractString等json字段提取工具,提取msg里的字段,比如biz,sender_id,content等字段。
status_time原本计划用DatTime64类型的,但是这个时间格式有坑,最终选择了使用UInt64存毫秒级时间戳,具体的问题下面再介绍。
在clickhouse创建好3张表之后(kafka引擎表,真实数据表,物化视图表),往kafka发消息
本地安装一个简易的kafka服务端,然后创建topic
创建好topic之后,使用Java客户端往kafka发消息,使用confluent client发也可以。
添加kafka依赖
实体类,使用fastjson的@JSONField注解,实体类转字符串的时候,将驼峰转换为下划线
测试类
最终发送完,我们查看一下clickhouse里的数据表的数据,可以发现我们发送到kakfa里的数据,已经成功的消费,并且写入到真实的数据表里了。
当时测试环境部署的版本是21.9,但是这个版本有问题,不推荐安装,建议直接部署22以上的clickhouse
我一开始就是使用的JSONEachRow格式,但是我的消息体里还有嵌套的json,类似下面这种格式,里面有个字段还是个json,转行成字符串带转义字符。
然后消息体的string字符串贴一条在这里
然后clickhouse解析消息体报错,当时的错找不到了,现在复现不出来了,非常的难顶。。。。
后来因为赶版本的原因把kafka_format换成了JSONAsString。
clickhouse是支持DateTime64格式的,可以到毫秒级,但是实际使用过程中却有些坑在,
首先是有的客户端解析毫秒字符串有问题,其次是使用JSONExtract*的方法,会有差异,再然后是jdbc查询的时候,也会导致时间查询有问题。
拿毫秒时间戳和秒级时间戳做试验,clickhouse-server版本是22.3.1.1
把上面的kafka引擎表拿出来改一下
其中status_time这个字段的类型改成DateTime64(3, 'Asia/Shanghai'),使用JSONExtractUInt提取时间,看下效果
首先发条数据,数据内容如下
传入的是毫秒级时间戳,然后数据表存储的时候就变成了2282年
然后如果传入秒级的时间戳,真实的数据是这样
clickhouse存储的时候看着时间正常了,但是毫秒丢失了
然后修改一下物化视图的字段提取方式,之前是 JSONExtractUInt(msg,'status_time') as status_time,现在改成使用 JSONExtractString(msg,'status_time') as status_time提取时间
会发现时间类型又正常了。
这一条数据内容如下
最终使用JSONExtractString提取毫秒时间戳,得到了正确的DateTime64的时间,非常的神奇
最终我决定来了个釜底抽薪的方法,时间直接用UInt64存,因为我发送出去的数据是毫秒级时间戳,最终存时间戳,查询时间范围的时候直接用long类型的数据between好了。
这也是无奈之举,万一哪天server更新版本,导致时间出现问题,那就完蛋了,希望后面时间可以稳定一点吧。
kafka集群测试正常,但是Java连接kafka出现异常,急求大神解答!!!!!!!!!!!
首先你在链接时候检查是否代码里的IP 和端口是不是对的,端口是broker 端口,默认9092 ;
其次查看代码是生产者,看Kafka 集群里这个主题是否存在(如果不存在,默认是配置可以自动创建,看是非将该配置修改);然后检测防火墙,相应端口是否开放(防火墙直接关也可以);检测 server.properties 文件的 listeners 是否配置,若没有将其配置好
java客户端使用kafka时什么情况下使用kafka client和spring kafka?
spring-kafka 是基于 java版的 kafka client与spring的集成,提供了 KafkaTemplate,封装了各种方法,方便操作
所以你使用spring的情况下,可以用spring-kafka,当然直接用kafka client也行
kafka与Flink集成问题记录
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2291)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)
... 4 more
Kafka库与Flink的反向类加载方法不兼容,修改 conf/flink-conf.yaml 并重启Flink
classloader.resolve-order: parent-first
Springboot集成Kafka
最近陆续收到不少朋友反馈,他们在计划要在springboot项目引入kafka中间件。在网上找过很多资料,但都比较零散,按照其说明进行操作后让项目正常跑起来仍是比较坎坷,听说我对kafka比较了解,希望给予一起分享。刚好最近因为疫情,实际相对于平常稍宽松,也想借此写点东西,一来作为自己的总结,二来可以给予有需要的朋友一些引导,针对此文期望对各位遇到问题的朋友有一定的帮助。
1. 安装JDK ,具体版本可以根据项目实际情况选择,目前使用最多的为jdk8
2. 安装Zookeeper ,具体版本可以根据项目实际情况选择,本项目使用的是3.5.8
3. 安装Kafka ,具体版本可以根据项目实际情况选择,本项目使用的是3.5.1
4. 安装Kafka Manage (非必要:安装主要是了对kafka项目操作提供图形化界面操作),具体版本可以根据项目实际情况选择,本项目使用的是1.3.3.7
parent
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-parent/artifactId
version2.3.12.RELEASE/version
relativePath/ !-- lookup parent from repository --
/parent
dependencies
!--springboot web依赖 --
dependency
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-web/artifactId
/dependency
!--kafka依赖 --
dependency
groupIdorg.springframework.kafka/groupId
artifactIdspring-kafka/artifactId
/dependency
/dependencies
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer: #生产者
# 发生错误后,消息重发的次数。重试1次,此值需要结合业务场景,重试与否各有千秋(重试,好处:尽可能的确保生产者写入block成功;坏处:有可能时带有顺序写入的数据打乱顺序
#比如:依次写入数据 1/2/3,但写入1时因网络异常,进行了重写,结果落到block的数据成了2/3/1)
retries: 1
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。模式时16k
batch-size: 16384 #16k
# 设置生产者内存缓冲区的大小
buffer-memory: 33554432
acks: 1
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
#group-id: default-group
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D.此属性只有在enable-auto-commit:true时生效
auto-commit-interval: 1S
enable-auto-commit: false
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
# 在侦听器容器中运行的线程数
concurrency: 5
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
-- 127.0.0.1:2181 zookeeper 服务器地址
-- replication-factor partitions 副本数量
--partitions partition数量
点击【Cluster】【Add Cluster】打开如下添加集群的配置界面:
输入集群的名字(如Kafka-Cluster-1)和 Zookeeper 服务器地址(如localhost:2181),选择最接近的Kafka版本(如0.10.1.0)
package com.charlie.cloudconsumer.service.impl.kafka;
import com.charlie.cloudconsumer.common.utils.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.UUID;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : kafka消息生产端,为实践业务提供向kafka block发现消息的API
*/
@Component
@Slf4j
public class QueueProducer {
@Autowired
private KafkaTemplatekafkaTemplate;
public void sendQueue(String topic,Object msgContent) {
String obj2String =JSON.toJSONString(msgContent);
log.info("kafka service 准备发送消息为:{}",obj2String);
//发送消息
ListenableFuturefuture =kafkaTemplate.send(topic,UUID.randomUUID().toString(),obj2String);
future.addCallback(new ListenableFutureCallback() {
//消息发送成功
@Override
public void onSuccess(SendResult stringObjectSendResult) {
log.info("[kafka service-生产成功]topic:{},结果{}",topic, stringObjectSendResult.toString());
}
//消息发送失败
@Override
public void onFailure(Throwable throwable) {
//发送失败的处理,本处只是记录了错误日志,可结合实际业务做处理
log.info("[kafka service-生产失败]topic:{},失败原因{}",topic, throwable.getMessage());
}
});
}
}
package com.charlie.cloudconsumer.service.impl.kafka;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.stereotype.Component;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : 消费端实际的业务处理对象
*/
@Component //添加此注解的原因是因为消费端在项目启动时就会初始化,消费端需要用到此类,故也让此类在项目启动时进行注册
public class QueueDataProcess {
public boolean doExec(Object obj) {
// todu 具体的业务逻辑
if (ObjectUtils.isNotEmpty(obj)) {
return true;
}else {
return false;
}
}
}
package com.charlie.cloudconsumer.service.impl.kafka;
import com.charlie.cloudconsumer.common.utils.JSON;
import com.charlie.cloudconsumer.model.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : kafka消息消费端,负责消费特定topic消息
*/
@Component
@Slf4j
@SuppressWarnings("all")
public class QueueConsumer {
@Autowired
private QueueDataProcess queueDataProcess;
/**
*
*/
@KafkaListener(topics ="test", groupId ="consumer")
public void doConsumer(ConsumerRecord record,Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic) {
Optional message =Optional.ofNullable(record.value());
if (message.isPresent()) {
try {
Object msg =message.get();
log.info("[kafka-消费] doConsumer 消费了: Topic:" + topic +",Message:" +msg);
boolean res =queueDataProcess.doExec(JSON.parseObject(msg.toString(),Order.class));
if (res) {
ack.acknowledge();
}
}catch (Exception ex) {
log.error("[kafka-消费异常] doConsumer Error {} ",ExceptionUtils.getFullStackTrace(ex));
}
}
}
}
package com.charlie.cloudconsumer.controller;
import com.alibaba.fastjson.JSON;
import com.charlie.cloudconsumer.common.utils.AjaxResult;
import com.charlie.cloudconsumer.common.utils.BuildResponseUtils;
import com.charlie.cloudconsumer.model.Order;
import com.charlie.cloudconsumer.service.impl.kafka.QueueProducer;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : kafka消息发送控制器,负责接受用户的发送的队列消息
*/
@RestController
@RequestMapping(value ="/kafka",produces =MediaType.APPLICATION_JSON_VALUE)
public class KafkaController {
@Autowired
private QueueProducer queueProducer;
@RequestMapping(value = "/send",method = RequestMethod.POST)
public AjaxResult? sendMsg(@RequestBody Order order) {
AjaxResult? ajaxResult= null;
if (ObjectUtils.isNotEmpty(order)) {
queueProducer.sendQueue("test",order);
ajaxResult = BuildResponseUtils.success(0,"发送消息:"+ JSON.toJSONString(order) + "成功!");
} else {
ajaxResult = BuildResponseUtils.success(1,"发送消息:"+ JSON.toJSONString(order) + "失败!");
}
return ajaxResult;
}
}
kafka集成java的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于kafka集成spring、kafka集成java的信息别忘了在本站进行查找喔。
发布于:2022-12-01,除非注明,否则均为
原创文章,转载请注明出处。