「javazmq」javazmq 发布订阅

博主:adminadmin 2023-03-17 20:35:06 428

本篇文章给大家谈谈javazmq,以及javazmq 发布订阅对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

ZMQ JAVA使用经验之 ZMQ简介怎么解决

ZMQ JAVA使用经验之 ZMQ简介怎么解决:

ZMQ被称为史上最快消息队列,它处于会话层之上,应用层之下,使用后台异步线程完成消息的接受和发送,完美的封装了Socket API,大大简化了编程人员的复杂度,被称为史上最强大的消息中间件。ZMQ是用C语言编写的,30s内完成消息的传输,能够兼容多个平台,多种语言,可以使用多种方式实现N对N的Socket连接。本文仅以JAVA版本的ZMQ API为例,介绍ZMQ。

ZMQ与传统的TCP Socket相比,具有以下优点:

1) ZMQ发送和接受的是具有固定长度的二进制对象,ZMQ的消息包最大254个字节,前6个字节是协议,然后是数据包。数据包由3个部分组成,第一个字节是包的长度,第二个字节是包的一些属性,最后是包的内容。如果超过255个字节(有一个字节表示包属性),则ZMQ会自动分包传输;而对于TCP Socket,是面向字节流的连接,编程人员需要处理数据块与数据块之间的边界问题,而ZMQ能够保证每次用户发送和接受的都是一个完整的数据块;

2) 传统的TCP Socket的连接是1对1的,可以认为“1个Socket=1个连接”,每一个线程独立的维护一个Socket。但是ZMQ摒弃了这种1对1的模式,ZMQ的Socket可以很轻松的实现1对N,N对1和N对N的连接模式,一个ZMQ的Socket可以自动的维护一组连接,用户无法操作这些连接,用户只能操作套接字,而不是连接本身,所以说ZMQ的世界里,连接是私有的。这里大家关心的一点是,一个Socket是如何识别来自多个Socket的连接的,这里以请求响应模式为例介绍ZMQ是如何实现一个Socket连接N个Socket的;

3)ZMQ使用异步后台线程处理接受和发送请求,这意味着发送完消息,不可以立即释放资源,消息什么时候发送用户是无法控制的,同时,ZMQ自动重连,这意味着用户可以以任意顺序加入到网络中,服务器也可以随时加入或者退出网络;

ZMQ之所以能够在无状态的网络中实现1对N的连接,关键在于信封的机制,信封里保存了应答目标的位置。ZMQ涉及到请求-响应模式的Socket一共有4种类型:

DEALER是一种负载均衡,它会将消息分发给已连接的节点,并使用公平队列的机制处理接受到的消息。

REQ发送消息时会在消息顶部插入一个空帧,接受时会将空帧移去。其实REQ是建立在DEALER之上的,但REQ只有当消息发送并接受到回应后才能继续运行。

ROUTER在收到消息时会在顶部添加一个信封,标记消息来源。发送时会通过该信封决定哪个节点可以获取到该条消息。

REP在收到消息时会将第一个空帧之前的所有信息保存起来,将原始信息传送给应用程序。在发送消息时,REP会用刚才保存的信息包裹应答消息。REP其实是建立在ROUTER之上的,但和REQ一样,必须完成接受和发送这两个动作后才能继续。

在了解了4种类型的Socket之后,我们就不难理解ZMQ的信封机制了。ZMQ信封机制的核心是Router Socket,它的工作原理如下:

从ROUTER中读取一条消息时,ZMQ会包上一层信封,上面注明了消息的来源。向ROUTER写入一条消息时(包含信封),ZMQ会将信封拆开,并将消息递送给相应的对象。当REQ Socket向ROUTER Socket发送一条请求后,REP会从ROUTER收到一条消息,消息格式如下:

第三帧是REP从应用程序收到的数据,第二帧是空帧,是REQ在发送ROUTER数据之前添加的,用来表示结束,第一帧即信封,是ROUTER添加的,主要用来记录消息来源;整个数据包处理过程如下:

对于REQ Socket,可以在创建Socket的时候,为该Sock指定标示符,此时的Socket称为持久Socket,没有指定标示符的我们称为瞬时Socket,ROUTER会自动为瞬时Socket生成一个标示符;

这样REP返回包含信封的数据给ROUTER,ROUTER就可以根据信封上的标示符将该消息发送到对应的REQ上;

ZMQ使用注意事项:

ZMQ是在发送端缓存消息的,可以通过阈值控制消息的溢出;

ZMQ不可以线程之间共享Socket,否则会报org.zeromq.ZMQException: Operation cannot be accomplished in current state错误。

ZMQ一个进程只允许有一个Context,new Context(int arg) arg表示后台线程的数量;

ZMQ的Socket类有一个Linger参数,可以通过SetLinger设置,主要用于表示该Socket关闭以后,未发送成功的消息是否还保存,如果设置为-1表示该消息将永久保存(除非宕机,ZMQ是不持久化消息的),如果为0表示所有未发送成功的消息在Socker关闭以后都将立即清除,如果是一个正数,则表示该消息在Socket关闭后多少毫秒内被删除;这个方法非常有用,尤其在控制发送失败时,是否重发消息。

linux下java开发如何使用jzmq?

运行报错找不到java.library.path是因为没有找到java的lib库路径,建议你按照下面方法尝试下:

1:首先你要在Linux里面把jdk配置好,最好是找个1.6版本的,配置也就是下载好压缩包,解压了,然后把jdk的目录配置到Linux的PATH环境变量里,具体方法,自己查资料;

2:安装了jdk的话,那就再下载个Eclipse这个开发软件吧,作为java的IDE开发工具,挺好使唤的;

3:把你的项目都加到Eclipse里,这次应该就不会找不到java.library.path。

zeromq java 怎么使用安装

首先

1.下载最新版的zeromq

2 解压

tar -xvf zeromq-3.1.0-beta.tar.gz

3 运行configure

./configure --prefix=/data/zeromq (prefix 指定安装目录)

4. make

5. make install

6. 设置环境变量

export CPPFLAGS=-I/home/mine/0mq/include/

export LDFLAGS=-L/home/mine/0mq/lib/

7.测试代码

server.c

#######################################

#include /data/zeromq/include/zmq.h

#include stdio.h

#include unistd.h

#include string.h

#include stdlib.h

int main (void)

{

void *context = zmq_init (1);

// Socket to talk to clients

void *responder = zmq_socket (context, ZMQ_REP);

zmq_bind (responder, "tcp://192.168.0.185:5555");

printf("binding on port 5555.\nwaiting client send message...\n");

while (1) {

// Wait for next request from client

zmq_msg_t request;

zmq_msg_init (request);

char buf[32];

zmq_recv (responder,buf, request, 0);

int size = zmq_msg_size (request);

char *string = malloc (size + 1);

memset(string,0,size+1);

memcpy (string, zmq_msg_data (request), size);

printf ("Received Hello string=[%s]\n",string);

free(string);

zmq_msg_close (request);

// Do some 'work'

sleep (1);

// Send reply back to client

zmq_msg_t reply;

char res[128]={0};

snprintf(res,127,"reply:%d",random());

zmq_msg_init_size (reply, strlen(res));

memcpy (zmq_msg_data (reply), res, strlen(res));

char buf2[32];

zmq_send (responder, buf2, reply, 0);

zmq_msg_close (reply);

}

// We never get here but if we did, this would be how we end

zmq_close (responder);

zmq_term (context);

return 0;

}

########################################################

#client.c

#include zmq.h

#include string.h

#include stdio.h

#include unistd.h

int main ()

{

void *context = zmq_init (1); // Socket to talk to server

printf ("Connecting to hello world server...\n");

void *requester = zmq_socket (context, ZMQ_REQ);

zmq_connect (requester, "tcp://192.168.0.185:5555");

int request_nbr;

for (request_nbr = 0; request_nbr != 10; request_nbr++)

{

zmq_msg_t request;

zmq_msg_init_data (request, "Hello", 6, NULL, NULL);

printf ("Sending request %d...\n", request_nbr);

zmq_send (requester, request, 0,0);

printf("send over") ;

zmq_msg_close (request);

zmq_msg_t reply;

zmq_msg_init (reply);

zmq_recv (requester, reply, 0,0);

printf ("Received reply %d: [%s]\n", request_nbr, (char *) zmq_msg_data (reply));

zmq_msg_close (reply);

}

zmq_close (requester);

zmq_term (context);

return 0;

}

gcc server.c -o server -lzmq -L/data/zeromq/lib -I/data/zeromq/include

gcc client.c -o client -lzmq -L/data/zeromq/lib -I/data/zeromq/include

./server

./client

jzmq 是zeromq 的java客户端

下载地址

1. tar xzf zeromq-jzmq-semver-90-g58c6108.tar.gz

2. ./configure

这个时候需要上面环境变量的配置,

还需要

随便

vi /data/zeromq/.bashrc

export PATH=$PATH:/data/zeromq/include

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/data/zeromq/lib:/data/zeromq/include

只要configure 执行完毕就会生成makefile文件

3.make

4.make install

测试

hwserver.java

//

// Hello World server in Java

// Binds REP socket to tcp://*:5555

// Expects "Hello" from client, replies with "World"

//

// Naveen Chawla naveen.chwl@gmail.com

//

import org.zeromq.ZMQ;

public class hwserver {

public static void main(String[] args) {

// Prepare our context and socket

ZMQ.Context context = ZMQ.context(1);

ZMQ.Socket socket = context.socket(ZMQ.REP);

socket.bind ("tcp://*:5555");

while (true) {

byte[] request;

// Wait for next request from client

// We will wait for a 0-terminated string (C string) from the client,

// so that this server also works with The Guide's C and C++ "Hello World" clients

request = socket.recv (0);

// In order to display the 0-terminated string as a String,

// we omit the last byte from request

System.out.println ("Received request: [" +

new String(request,0,request.length-1) // Creates a String from request, minus the last byte

+ "]");

// Do some 'work'

try {

Thread.sleep (1000);

}

catch(InterruptedException e){

e.printStackTrace();

}

// Send reply back to client

// We will send a 0-terminated string (C string) back to the client,

// so that this server also works with The Guide's C and C++ "Hello World" clients

String replyString = "World" + " ";

byte[] reply = replyString.getBytes();

reply[reply.length-1]=0; //Sets the last byte of the reply to 0

socket.send(reply, 0);

}

}

}

hwclient.java

//

// Hello World client in Java

// Connects REQ socket to tcp://localhost:5555

// Sends "Hello" to server, expects "World" back

//

// Naveen Chawla naveen.chwl@gmail.com

//

import org.zeromq.ZMQ;

public class hwclient{

public static void main(String[] args){

// Prepare our context and socket

ZMQ.Context context = ZMQ.context(1);

ZMQ.Socket socket = context.socket(ZMQ.REQ);

System.out.println("Connecting to hello world server...");

socket.connect ("tcp://localhost:5555");

// Do 10 requests, waiting each time for a response

for(int request_nbr = 0; request_nbr != 10; request_nbr++) {

// Create a "Hello" message.

// Ensure that the last byte of our "Hello" message is 0 because

// our "Hello World" server is expecting a 0-terminated string:

String requestString = "Hello" + " ";

byte[] request = requestString.getBytes();

request[request.length-1]=0; //Sets the last byte to 0

// Send the message

System.out.println("Sending request " + request_nbr + "...");

socket.send(request, 0);

// Get the reply.

byte[] reply = socket.recv(0);

// When displaying reply as a String, omit the last byte because

// our "Hello World" server has sent us a 0-terminated string:

System.out.println("Received reply " + request_nbr + ": [" + new String(reply,0,reply.length-1) + "]");

}

}

}

javac -classpath /data/jzmq/share/java/zmq.jar -d . h*.java

java -Djava.library.path=/data/jzmq/lib -cp /data/jzmq/share/java/zmq.jar:. hwserver

java -Djava.library.path=/data/jzmq/lib -cp /data/jzmq/share/java/zmq.jar:. hwclient

显示内容:

server端

Received request: [Hello]

Received request: [Hello]

Received request: [Hello]

Received request: [Hello]

Received request: [Hello]

Received request: [Hello]

client端

Connecting to hello world server...

Sending request 0...

Received reply 0: [World]

Sending request 1...

Received reply 1: [World]

Sending request 2...

Received reply 2: [World]

Sending request 3...

Received reply 3: [World]

Sending request 4...

Received reply 4: [World]

Sending request 5...

关于javazmq和javazmq 发布订阅的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。