【ZMQ】ZMQ/ZeroMQ简介、三种消息模式demo程序
ZeroMQ(也称为ÖMQ、0MQ或zmq)看起来像是一个可嵌入的网络库,但它的作用类似于一个并发框架。它为您提供了在进程内、进程间、TCP和多播等各种传输中传递原子消息的套接字。您可以使用扇出、发布订阅、任务分发和请求回复等模式将套接字N到N连接起来。它的速度足以成为集群产品的结构。它的异步I/O模型为您提供了可扩展的多核应用程序,构建为异步消息处理任务。它有许多语言API,并在大多数操作系统上
ZMQ/ZeroMQ简介、三种消息模式demo程序
一、什么是ZMQ
ZeroMQ(也称为ÖMQ、0MQ或zmq)看起来像是一个可嵌入的网络库,但它的作用类似于一个并发框架。它为您提供了在进程内、进程间、TCP和多播等各种传输中传递原子消息的套接字。您可以使用扇出、发布订阅、任务分发和请求回复等模式将套接字N到N连接起来。它的速度足以成为集群产品的结构。它的异步I/O模型为您提供了可扩展的多核应用程序,构建为异步消息处理任务。它有许多语言API,并在大多数操作系统上运行。
ZeroMQ(也拼写为ÖMQ、0MQ或ZMQ)是一个高性能异步消息传递库,旨在用于分布式或并发应用程序。它提供了一个消息队列,但与面向消息的中间件不同,ZeroMQ系统可以在没有专用消息代理的情况下运行。
ZeroMQ支持多种传输(TCP、进程内、进程间、多播、WebSocket等)上的通用消息传递模式(发布/订阅、请求/回复、客户端/服务器等),使进程间消息传递与线程间消息传递一样简单。这使您的代码保持清晰、模块化和极易扩展。
ZeroMQ是由大量贡献者开发的。有许多流行编程语言的第三方绑定,以及C#和Java的本机端口。
二、ZMQ的特点
1、组件来去自如,ZQM会负责自动重连,服务端和客户端可以随意的退出网络。tcp的话,必须现有服务端启动,在启动客户端,否则会报错。
2、ZMQ会在必要的情况下将消息放入队列中保存,一旦建立了连接就开始发送。
3、ZMQ有阈值机制,当队列满的时候,可以自动阻塞发送者,或者丢弃部分消息。
4、ZMQ可以使用不同的通信协议进行连接,TCP,进程间,线程间。
5、ZMQ提供了多种模式进行消息路由,如请求-应答模式(REQ/RES),发布-订阅模式(P/S)和推拉模式(P/P)等,这些模式可以用来搭建网络拓扑结构。
6、ZMQ会在后台线程异步的处理I/O操作,他使用一种不会死锁的数据结构来存储消息。同时ZeroMQ不在乎目的是否存在。
7、TCP的通信拓扑是一对一的,而ZMQ可以是一对一、一对多、多对一或者多对多。
8、ZeroMQ传输的是消息,TCP传输字节。
三、Demo程序代码
3.1 发布-订阅模式(P/S)demo
发布端:
package com.example.demozmq.zmq;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class ZmqPublisher {
public static void main(String[] args) throws InterruptedException {
Context context = ZMQ.context(1);
Socket socket = context.socket(ZMQ.PUB);
// 绑定端口
socket.bind("tcp://*:5556");
Random random = new Random(1000);
while (true) {
// 随机生成一个整数
int value = random.nextInt();
// 将整数作为消息发布到通道上
byte[] topic = "value".getBytes(StandardCharsets.UTF_8);
byte[] data = Integer.toString(value).getBytes(StandardCharsets.UTF_8);
socket.sendMore(topic);
socket.send(data);
System.out.println("发送整数:| " + value);
Thread.sleep(3000);
}
}
}
订阅端:
package com.example.demozmq.zmq;
import java.nio.charset.StandardCharsets;
import org.zeromq.ZMQ;
public class ZmqSubscriber {
public static void main(String[] args) {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.SUB);
// 连接服务端
socket.connect("tcp://localhost:5556");
// 订阅主题的value
socket.subscribe("value".getBytes(StandardCharsets.UTF_8));
while (true) {
// 从通道上接收消息
byte[] topic = socket.recv();
byte[] data = socket.recv();
int value = Integer.parseInt(new String(data));
System.out.println("订阅的主题:" + new String(topic));
System.out.println(System.currentTimeMillis() + "接收到的整数:" + value);
}
}
}
3.2 请求-应答模式(REQ/RES)demo
请求端:
package com.example.demozmq.zmq;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.zeromq.ZMQ;
@SpringBootTest
public class ZmqClient {
@Test
public void testSendMessage2Server() {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.REQ);
// 连接服务器
socket.connect("tcp://localhost:5555");
// 发送消息
String text = "你好,我是客户端。";
byte[] bytes = text.getBytes(StandardCharsets.UTF_8);
socket.send(bytes);
// 等待回复
byte[] reply = socket.recv(0);
String response = new String(reply);
System.out.println("接收到服务器的消息是:" + response);
}
public static void main(String[] args) throws InterruptedException {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.REQ);
// 连接服务器
socket.connect("tcp://localhost:5555");
for (int i = 0; i < 10000; i++) {
// 发送消息
String text = "你好,我是客户端。" + i;
byte[] bytes = text.getBytes(StandardCharsets.UTF_8);
socket.send(bytes);
// 等待回复
byte[] reply = socket.recv(0);
String response = new String(reply);
System.out.println("接收到服务器的消息是:" + response);
Thread.sleep(5000);
}
}
}
响应端:
package com.example.demozmq.zmq;
import java.nio.charset.Charset;
import org.zeromq.ZMQ;
public class ZmqServer {
public static void main(String[] args) {
try {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.REP);
// 绑定端口
socket.bind("tcp://*:5555");
while (true) {
// 等待接收消息
byte[] request = socket.recv();
String reqTest = new String(request);
System.out.println("接收到的消息:" + reqTest);
// 返回消息给客户端
byte[] reply = (System.currentTimeMillis() + "你好啊,我是服务端。").getBytes(Charset.defaultCharset());
socket.send(reply, 0);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
3.3 推拉模式(P/P)demo
推拉模式,PUSH发送。PULL方接收。PUSH可以和多个PULL建立连接,PUSH发送的数据被顺序发送给PULL方。如果是多个PULL,假如第一条消息发送给PULL1,那么第二条消息就会发送给PULL2,第三条又会发给PULL1,一直循环。发送消息的时候也是按照这个顺序发送,保证数据能够准确到达目的地。
推送消息端:
package com.example.demozmq.zmq;
import java.nio.charset.StandardCharsets;
import org.zeromq.ZMQ;
public class ZmqPush {
public static void main(String[] args) throws InterruptedException {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.PUSH);
socket.connect("ipc://fjs");
for (int i = 1; i <= 10000; i++) {
socket.send(("hello【" + i + "】").getBytes(StandardCharsets.UTF_8));
System.out.println("已发送" + i + "次");
Thread.sleep(3000);
}
socket.close();
context.term();
}
}
拉取消息端:
package com.example.demozmq.zmq;
import org.zeromq.ZMQ;
public class ZmqPullServer {
public static void main(String[] args) {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.PULL);
socket.bind("ipc://fjs");
while (true) {
byte[] data = socket.recv();
System.out.println("拉取的数据:" + new String(data));
}
}
}
以上代码可以直接复制使用,如有错误请多多指教!
本文完结!
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)