当前位置: 首页 > news >正文

基于Netty与RabbitMQ的消息服务

Netty作为一个高性能的异步网络开发框架,可以作为各种服务的开发框架。

前段时间的一个项目涉及到硬件设备实时数据的采集,采用Netty作为采集服务的实现框架,同时使用RabbitMQ作为采集服务和各个其他模块的通信消息队列,整个服务框架图如下:

将业务代码和实际协议解析部分的代码抽离,得到以上一个简单的设计图,代码开源在GitHub上,简单介绍下NettyMQServer采集服务涉及到的几个关键技术点:

1、设备TCP消息解析:

NettyMQServer和采集设备Device之间采用TCP通信,TCP消息的解析可以使用LengthFieldBasedFrameDecoder(消息头和消息体),可以有效的解决TCP消息“粘包”问题。

消息包解析图如下:

 lengthFieldOffset   =  0
 lengthFieldLength   =  2
 lengthAdjustment    = -2 (= the length of the Length field)
 initialBytesToStrip =  0

 BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
 +--------+----------------+      +--------+----------------+
 | Length | Actual Content |----->| Length | Actual Content |
 | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
 +--------+----------------+      +--------+----------------+

代码中消息长度的存储采用了4个字节,采用LengthFieldBasedFrameDecoder(65535,0,4,-4,0)解码,Netty会从接收的数据中头4个字节中得到消息的长度,进而得到一个TCP消息包。

2、给设备发消息:

首先在连接创建时,要保留TCP的连接:

复制代码
static final ChannelGroup channels = new DefaultChannelGroup(
            GlobalEventExecutor.INSTANCE);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // A closed channel will be removed from ChannelGroup automatically
        channels.add(ctx.channel());
    }
复制代码

在每次一个Channel Active(连接创建)的时候用ChannelGroup保存这个Channel连接,当需要给某个设备发消息的时候,可以遍历该ChannelGroup,找到对应的Channel,给该Channel发送消息:

for (io.netty.channel.Channel c : EchoServerHandler.channels) {
                            ByteBuf msg = Unpooled.copiedBuffer(message.getBytes());
                            c.writeAndFlush(msg);
                        }

这里是给所有的连接的设备都发。当连接断开的时候,ChannelGroup会自动remove掉这个连接,不需要我们手动管理。

3、心跳检测

当某个设备Device由于断电或是其他原因导致设备不正常无法采集数据,Netty服务端需要知道该设备是否在正常工作,可以使用Netty的IdleStateHandler,示例代码如下:

复制代码
// 3 minutes for read idle
ch.pipeline().addLast(new IdleStateHandler(3*60,0,0));
ch.pipeline().addLast(new HeartBeatHandler());

/**
 * Handler implementation for heart beating.
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter{

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
            throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                // Read timeout
                System.out.println("READER_IDLE: read timeout from "+ctx.channel().remoteAddress());
                //ctx.disconnect(); //Channel disconnect
            }
        }
    }
}
复制代码

上面设置3分钟没有读到数据,则触发一个READER_IDLE事件。

4、RabbitMQ消息接收与发送

NettyMQServer消息发送采用了Spring AMQP,只需要在配置文件中简单配置一下,就可以方便使用。

NettyMQServer消息接收同样可以采用Spring AMQP,但由于对Spring相关的配置不是很熟悉,为了更灵活的使用MQ,这里使用了RabbitMQ Client Java API来实现:

复制代码
                    Connection connection = connnectionFactory.newConnection();
                    Channel channel = connection.createChannel();
                    channel.exchangeDeclare(exchangeName, "direct", true, false, null);
                    channel.queueDeclare(queueName, true, false, false, null);
                    channel.queueBind(queueName, exchangeName, routeKey);

                    // process the message one by one
                    channel.basicQos(1);

                    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                    // auto-ack is false
                    channel.basicConsume(queueName, false, queueingConsumer);
                    while (true) {
                        QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                        String message = new String(delivery.getBody());

                        log.debug("Mq Receiver get message");
                        // Send the message to all connected clients
                        // If you want to send to a specified client, just add
                        // your own logic and ack manually
                        // Be aware that ChannelGroup is thread safe
                        log.info(String.format("Conneted client number: %d",EchoServerHandler.channels.size()));
                        for (io.netty.channel.Channel c : EchoServerHandler.channels) {
                            ByteBuf msg = Unpooled.copiedBuffer(message.getBytes());
                            c.writeAndFlush(msg);
                        }
                        // manually ack to MQ server the message is consumed.
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    }
复制代码

以上代码从一个Queue中读取数据,为了有效处理数据,防止异常数据丢失,使用了手动Ack。

RabbitMQ的使用方式:http://www.cnblogs.com/luxiaoxun/p/3918054.html

 

代码托管在GitHub上:https://github.com/luxiaoxun/NettyMqServer

 

参考:

http://netty.io/

http://netty.io/4.0/api/io/netty/handler/codec/LengthFieldBasedFrameDecoder.html

http://netty.io/4.0/api/io/netty/handler/timeout/IdleStateHandler.html

 

作者: 阿凡卢
出处: http://www.cnblogs.com/luxiaoxun/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
 
 

转载于:https://www.cnblogs.com/qxoffice2008/p/4257428.html

相关文章:

  • 32_使用BeanUtils工具包操作JavaBean
  • 常用HTTP状态码
  • 怎样将U盘设置成只读属性
  • Sum、if、mod隔列求和
  • 有关android 应用的plugin框架调研
  • 数据结构之查找(php代码实现)
  • redis常用命令
  • (太强大了) - Linux 性能监控、测试、优化工具
  • Centos 源代码以及编译过程
  • PHP_NoteBook
  • 利用mongo profile 分析mongo慢查询
  • GlusterFS源代码解析 —— GlusterFS 简单介绍
  • rem是什么?
  • SYSOPER和SYSDBA特权的区别与联系
  • 混沌的艺术--- YChaos通过数学公式生成混沌图像
  • 【干货分享】SpringCloud微服务架构分布式组件如何共享session对象
  • 0基础学习移动端适配
  • CentOS 7 修改主机名
  • CODING 缺陷管理功能正式开始公测
  • IDEA 插件开发入门教程
  • input的行数自动增减
  • JAVA 学习IO流
  • JavaScript设计模式系列一:工厂模式
  • Python3爬取英雄联盟英雄皮肤大图
  • Spark VS Hadoop:两大大数据分析系统深度解读
  • tweak 支持第三方库
  • vue总结
  • 阿里云容器服务区块链解决方案全新升级 支持Hyperledger Fabric v1.1
  • 不上全站https的网站你们就等着被恶心死吧
  • 测试开发系类之接口自动化测试
  • 分布式事物理论与实践
  • 服务器之间,相同帐号,实现免密钥登录
  • 构造函数(constructor)与原型链(prototype)关系
  • 两列自适应布局方案整理
  • 入门级的git使用指北
  • 使用 QuickBI 搭建酷炫可视化分析
  • 用 vue 组件自定义 v-model, 实现一个 Tab 组件。
  • PostgreSQL之连接数修改
  • ​LeetCode解法汇总2583. 二叉树中的第 K 大层和
  • ​软考-高级-系统架构设计师教程(清华第2版)【第9章 软件可靠性基础知识(P320~344)-思维导图】​
  • #预处理和函数的对比以及条件编译
  • $ is not function   和JQUERY 命名 冲突的解说 Jquer问题 (
  • (1)(1.13) SiK无线电高级配置(五)
  • (10)ATF MMU转换表
  • (cljs/run-at (JSVM. :browser) 搭建刚好可用的开发环境!)
  • (附源码)springboot太原学院贫困生申请管理系统 毕业设计 101517
  • (深度全面解析)ChatGPT的重大更新给创业者带来了哪些红利机会
  • (十六)一篇文章学会Java的常用API
  • (转)C#开发微信门户及应用(1)--开始使用微信接口
  • (转)IOS中获取各种文件的目录路径的方法
  • (转)利用ant在Mac 下自动化打包签名Android程序
  • ../depcomp: line 571: exec: g++: not found
  • .bat批处理(十一):替换字符串中包含百分号%的子串
  • .NET 2.0中新增的一些TryGet,TryParse等方法
  • .net core MVC 通过 Filters 过滤器拦截请求及响应内容