「kafka读写java」kafka读写数据流程

博主:adminadmin 2022-12-24 22:39:06 81

今天给各位分享kafka读写java的知识,其中也会对kafka读写数据流程进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

本文目录一览:

kafka如何做到磁盘读写比内存读写还快?

Kafka作为一个支持大数据量写入写出的消息队列,由于是基于Scala和Java实现的,而Scala和Java均需要在JVM上运行,所以如果是基于内存的方式,即JVM的堆来进行数据存储则需要开辟很大的堆来支持数据读写,从而会导致GC频繁影响性能。考虑到这些因素,kafka是使用磁盘存储数据的。

Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。topic存储结构见下图:

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了 分片 和 索引 机制,将每个partition分为多个segment。每个 segment对应两个文件——“.index”文件和“.log”文件。

partition文件夹命名规则:

topic 名称+分区序号,举例有一个topic名称文“kafka”,这个topic有三个分区,则每个文件夹命名如下:

index和log文件的命名规则:

1)partition文件夹中的第一个segment从0开始,以后每个segement文件以上一个segment文件的最后一条消息的offset+1命名(当前日志中的第一条消息的offset值命名)。

2)数值最大为64位long大小。19位数字字符长度,没有数字用0填充。

举例,有以下三对文件:

以第二个文件为例看下对应的数据结构:

稀疏索引 需要注意下。

消息查找过程 :

找message-2589,即offset为2589:

1)先定位segment文件,在0000000000000002584中。

2)计算查找的offset在日志文件的相对偏移量

offset - 文件名的数量 = 2589 - 2584 = 5;

在index文件查找第一个参数的值,若找到,则获取到偏移量,通过偏移量到log文件去找对应偏移量的数据即可;

本例中没有找到,则找到当前索引中偏移量的上线最接近的值,即3,偏移量文246;然后到log文件中从偏移量为246数据开始向下寻找。

简单了解了kafka在数据存储方面的知识,线面我们具体分析下为什么kafka基于磁盘却快于内存。

在前面了解存储结构过程中,我们发现kafka记录log日志使用的结尾追加的方式,即 顺序写 。这样要比随机写块很多,这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

mmap,简单描述其就是将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件。

即便是顺序写磁盘,磁盘的读写速度任然比内存慢慢的多得多,好在操作系统已经帮我们解决这个问题。在Linux操作系统中,Linux会将磁盘中的一些数据读取到内存当中,我们称之为内存页。当需要读写硬盘的时候,都优先在内存页中进行处理。当内存页的数据比硬盘数据多的时候,就形成了 脏页 ,当脏页达到一定数量,操作系统会进行 刷脏 ,即将内存也数据写到磁盘。

问题:不可靠,写到 mmap 中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用 Flush 的时候才把数据真正的写到硬盘。

零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数,通常使用在IO读写过程中。

传统io过程

如上图所示,上图共经历了四次拷贝的过程:

1)数据到到内核态的read buffer;

2)内核态的read buffer到用户态应用层的buffer;

3)用户态到内核态的socket buffer;

4)socket buffer到网卡的buffer(NIC)。

DMA

引入DMA技术,是指外部设备不通过CPU而直接与系统内存交换数据的接口技术,网卡等硬件设备支持DMA技术。

如上图所示,上图共经历了两次拷贝的过程。

sendfile

在内核版本 2.1 中,引入了 Sendfile 系统调用,以简化网络上和两个本地文件之间的数据传输。同时使用了DMA技术。

如上图所示,上图共经历了一次拷贝的过程。

sendfile( DMA 收集拷贝)

之前我们是把页缓存的数据拷贝到socket缓存中,实际上,我们仅仅需要把缓冲区描述符传到 socket 缓冲区,再把数据长度传过去,这样 DMA 控制器直接将页缓存中的数据打包发送到网络中就可以了。

如上图所示,最后一次的拷贝也被消除了,数据-read buffer-NIC。

kafka通过java和scala实现,而Java对sendfile是通过NIO 的 FileChannel (java.nio.channels.FileChannel )的 transferTo 和 transferFrom 方法实现零拷贝

注: transferTo 和 transferFrom 并不保证一定能使用零拷贝。实际上是否能使用零拷贝与操作系统相关,如果操作系统提供 sendfile 这样的零拷贝系统调用,则这两个方法会通过这样的系统调用充分利用零拷贝的优势,否则并不能通过这两个方法本身实现零拷贝。

如何写java程序代码测试kafka

我这里是使用的是,kafka自带的zookeeper。

以及关于kafka的日志文件啊,都放在默认里即/tmp下,我没修改。保存默认的

1、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ jps

2625 Jps

2、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties

此刻,这时,会一直停在这,因为是前端运行。

另开一窗口,

3、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties

也是前端运行。

如何通过java实现kafka集群的配额

序列化就是一种用来处理对象流的机制,所谓对象流也就是将对象的内容进行流化。可以对流化后的对象进行读写操作,也可将流化后的对象传输于网络之间。序列化是为了解决在对对象流进行读写操作时所引发的问题。

序列化的实现:将需要被序列化的类实现Serializable接口,该接口没有需要实现的方法,implements Serializable只是为了标注该对象是可被序列化的,然后使用一个输出流(如:FileOutputStream)来构造一个ObjectOutputStream(对象流)对象,接着,使用ObjectOutputStream对象的writeObject(Object obj)方法就可以将参数为obj的对象写出(即保存其状态),要恢复的话则用输入流。

kafka消费者java版本读取不到消息怎么办

3.启动服务3.1启动zookeeper启动zk有两种方式,第一种是使用kafka自己带的一个zk。bin/zookeeper-server-start.shconfig/zookeeper.properties另一种是使用其它的zookeeper,可以位于本机也可以位于其它地址。这种情况需要修改config下面的sercer.properties里面的zookeeper地址。例如zookeeper.connect=10.202.4.179:21813.2启动kafkabin/kafka-server-start.shconfig/server.properties4.创建topicbin/kafka-topics.sh--create--zookeeper10.202.4.179:2181--replication-factor1--partitions1--topictest创建一个名为test的topic,只有一个副本,一个分区。通过list命令查看刚刚创建的topicbin/kafka-topics.sh-list-zookeeper10.202.4.179:21815.启动producer并发送消息启动producerbin/kafka-console-producer.sh--broker-listlocalhost:9092--topictest启动之后就可以发送消息了比如testhelloboy按Ctrl+C退出发送消息6.启动consumerbin/kafka-console-consumer.sh--zookeeper10.202.4.179:2181--topictest--from-beginning启动consumer之后就可以在console中看到producer发送的消息了可以开启两个终端,一个发送消息,一个接受消息。如果这样都不行的话,查看zookeeper进程和kafka的topic,一步步排查原因吧。

关于kafka读写java和kafka读写数据流程的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

The End

发布于:2022-12-24,除非注明,否则均为首码项目网原创文章,转载请注明出处。