当前位置: 首页 > news >正文

java zmq订阅_java zmq消息队列

ZMQ(以下ZeroMQ简称ZMQ)是一个消息队列 ,可以在进程内、进程间、TCP、多播中,以消息为单位传输数据,而不是socket的字节流;同时它像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高.

消息系统的核心作用就是三点:解耦,异步和并行

集群对外提供服务的过程中要保证信息从一个节点,将信息无误的分发到各个服务器节点上,并保证信息正确性和一致性。

1.串行方式的最大问题是,随着后端流程越来越多,每步流程都需要额外的耗费很多时间,从而会导致用户更长的等待延迟。

本人测试使用的是Win7 64位的电脑,jzmq.dll,libzmq.dll放在自己JDK的bin目录下,zmq.jar放在工程lib下;

通信模式:1.发送端

/**

* @throws InterruptedException

* @描述:1.非阻塞发送

* @date:2016年11月1日

*/

public static void  zmq_pub_NOBLOCK() throws InterruptedException{

//开启的io线程个数,默认1

Context context = ZMQ.context(1);

//选择发布模式

Socket publisher = context.socket(ZMQ.PUB);

//绑定IP端口

publisher.bind("tcp://*:5561");

//zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布

Thread.sleep(1000); //需要休息一下

int update_nbr;

for (update_nbr = 20; update_nbr < 40; update_nbr++) {

String a="{\"magicNum\":\"CHINSOFT\",\"varName\":\"ZJ_YD_1\",\"varType\":\"5\",\"varValue\":"+update_nbr+",\"varQuality\":\"1111\",\"varTime\":"+System.currentTimeMillis()/1000+"}";

publisher.send(a.getBytes(), ZMQ.NOBLOCK);  //发送,非阻塞模式(ZMQ.SNDMORE:指出当前正在发送的消息是个多帧消息,并且接下来还会发送更多的消息;)

System.out.println(update_nbr);

Thread.sleep(1000);

}

//关闭本进程的socket id,但链接还是开着的,用这个socket id的其它进程还能用这个链接,能读或写这个socket id

publisher.close();

//函数会进入阻塞状态,直到满足下列条件,所有基于context创建的scoekt都已经被zmq_close()函数关闭

context.term();

}

//订阅客户端

public static void main(String[] args) {

//开启的io线程个数,默认1

Context context = ZMQ.context(1);

// PUB/SUB模式: 发布/订阅模式

Socket subscriber = context.socket(ZMQ.SUB);

subscriber.connect("tcp://localhost:5561");

//设置订阅条件"setsockopt"

subscriber.subscribe("".getBytes());

int update_nbr = 0;

while (true) {

byte[] stringValue = subscriber.recv(0);

String string = new String(stringValue);

update_nbr++;

//{"magicNum":"CHINSOFT","varName":"ZJ_YD_1","varType":"5","varValue":20,"varQuality":"1111","varTime":1477963955}

System.out.println("Received " + update_nbr + " updates. :"+ string);

}

}

/**

* @描述:消息队列 -多针发送,

* @date:2016年11月1日

* @throws InterruptedException

*/

public static void zmq_pub_multiSend() throws InterruptedException{

Context context = ZMQ.context(1);

Socket publisher = context.socket(ZMQ.PUB);

publisher.bind("tcp://*:5561");

int i = 0;

while (i<10) {

i++;

Thread.currentThread().sleep(1000);

publisher.send("A".getBytes(), ZMQ.SNDMORE);

publisher.send("This is A".getBytes(), 0);

publisher.send("B".getBytes(), ZMQ.SNDMORE);

publisher.send("This is B".getBytes(), 0);

}

}

public static void sub_1(){

Context context = ZMQ.context(1);

Socket subscribe = context.socket(ZMQ.SUB);

subscribe.connect("tcp://127.0.0.1:5557");

subscribe.subscribe("B".getBytes());

while (true) {

System.out.println(new String(subscribe.recv(0)));

System.out.println(new String(subscribe.recv(0)));

}

}

public static void sub_2(){

Context context = ZMQ.context(1);

Socket subscribe = context.socket(ZMQ.SUB);

subscribe.connect("tcp://127.0.0.1:5561");

//subscribe.subscribe("topic".getBytes());

subscribe.subscribe("A".getBytes());

while (true) {

System.out.println(new String(subscribe.recv(0)));

System.out.println(new String(subscribe.recv(0)));

}

}

public static void main(String[] args) {

for (int j = 0; j < 100; j++) {

new Thread(new Runnable(){

public void run() {

// TODO Auto-generated method stub

ZMQ.Context context = ZMQ.context(1);  //创建1个I/O线程的上下文

ZMQ.Socket subscriber = context.socket(ZMQ.SUB);     //创建一个sub类型,也就是subscriber类型的socket

subscriber.connect("tcp://127.0.0.1:5561");    //与在5555端口监听的publisher建立连接

subscriber.subscribe("fjs".getBytes());     //订阅fjs这个channel

for (int i = 0; i < 100; i++) {

byte[] message = subscriber.recv();  //接收publisher发送过来的消息

System.out.println("receive : " + new String(message));

}

subscriber.close();

context.term();

}

}).start();

}

相关文章:

  • java按键数据库添加_详解Java MyBatis 插入数据库返回主键
  • java ee jdbc_JavaEE JDBC 补充注意点
  • java 返回前台excel_Java后台读取excel表格返回至Web前端
  • eclipse for java web_【Javaweb】Eclipse for JavaEE新建的Web工程自动生成web.xml
  • gopython 获取python 全局线程锁失败_python线程互斥锁递归锁死锁
  • java collections 复制_Java公开课|Java Collections类查复制操作是你学习Java的超车途径,还不来看看就晚了...
  • java 线程的移动问题_Spring Boot中的多线程问题和ThreadLocal
  • Java 经常用到access_用Java连接到Microsoft Access 2007数据库的正确方法是什么?
  • java1.8 interface_JDK1.8新特性——FunctionInterface
  • php file_get_contents 中文,php file_get_contents函数怎么用
  • php 平均下载速度,php限制下载速度的实现方法
  • docker lamp php7,环境准备:docker-compose安装 LAMP、LNMP、php扩展
  • java system.in 怎么写,java 里System.in 输入流如何使用
  • php 两数最大相同子串,用javascript求两个字符串最大的相同的子串(代码实例)...
  • JAVA ulimit,Linux:使用ulimit设置文件最大打开数
  • 【划重点】MySQL技术内幕:InnoDB存储引擎
  • Android Studio:GIT提交项目到远程仓库
  • css系列之关于字体的事
  • Docker 笔记(1):介绍、镜像、容器及其基本操作
  • flutter的key在widget list的作用以及必要性
  • Java知识点总结(JDBC-连接步骤及CRUD)
  • NLPIR语义挖掘平台推动行业大数据应用服务
  • Spring Cloud(3) - 服务治理: Spring Cloud Eureka
  • Wamp集成环境 添加PHP的新版本
  • 安卓应用性能调试和优化经验分享
  • 百度贴吧爬虫node+vue baidu_tieba_crawler
  • 大数据与云计算学习:数据分析(二)
  • 分享一份非常强势的Android面试题
  • 高性能JavaScript阅读简记(三)
  • 基于Vue2全家桶的移动端AppDEMO实现
  • 设计模式 开闭原则
  • 它承受着该等级不该有的简单, leetcode 564 寻找最近的回文数
  • 微信小程序实战练习(仿五洲到家微信版)
  • 一个普通的 5 年iOS开发者的自我总结,以及5年开发经历和感想!
  • ionic入门之数据绑定显示-1
  • 阿里云ACE认证学习知识点梳理
  • 阿里云重庆大学大数据训练营落地分享
  • ​MPV,汽车产品里一个特殊品类的进化过程
  • !!Dom4j 学习笔记
  • #鸿蒙生态创新中心#揭幕仪式在深圳湾科技生态园举行
  • $NOIp2018$劝退记
  • ( 用例图)定义了系统的功能需求,它是从系统的外部看系统功能,并不描述系统内部对功能的具体实现
  • (145)光线追踪距离场柔和阴影
  • (2009.11版)《网络管理员考试 考前冲刺预测卷及考点解析》复习重点
  • (3)nginx 配置(nginx.conf)
  • (arch)linux 转换文件编码格式
  • (ZT) 理解系统底层的概念是多么重要(by趋势科技邹飞)
  • (ZT)北大教授朱青生给学生的一封信:大学,更是一个科学的保证
  • (附源码)spring boot车辆管理系统 毕业设计 031034
  • (附源码)计算机毕业设计ssm高校《大学语文》课程作业在线管理系统
  • (转)关于pipe()的详细解析
  • **CI中自动类加载的用法总结
  • .NET CORE使用Redis分布式锁续命(续期)问题
  • .net refrector
  • .NET 程序如何获取图片的宽高(框架自带多种方法的不同性能)