当前位置: 首页 > news >正文

mina编解码(摘录)

一、Mina对编解码的支持

 

我们知道网络通讯过程实际是对二进制数据进行处理的过程,二进制数据是计算机认识的数据。对于接收到的二进制数据我们需要将其转换成我们所熟悉的数据格式,此过程称为解码(decode);对于所要发送的数据,我们需要转换为计算机所能处理的二进制数据,此过程称为编码(encode)。



       Mina对数据的编解码提供了良好的支持,它提供了过滤器ProtocolCodecFilter支持编码和解码过程,可以查看包org.apache.mina.filter.codec下的代码。

看下此过滤器的调用,代码很简单:

Java代码  
  1. // 加入编解码过滤器  
  2. chain.addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));  

 实现原理

ProtocolCodecFilter包含一个编码器和解码器工厂:

Java代码  
  1. //编码器和解码器工厂  
  2. private final ProtocolCodecFactory factory;  

 此工厂可以通过构造方法传入,具体构造方法可以具体看源码,比较简单,此处不做详细介绍。

主要看下解码和编码过程,解码应该是消息接收到,我们程序对消息进行处理时进行的,此时我们想到ProtocolCodecFilter应该覆盖messageReceived方法。编码应该是发送消息时,需要将我们的业务数据结构转换为二进制数据,此时我们想到ProtocolCodecFilter应该覆盖filterWrite方法。

解码过程

前面已经说了,解码过程就是将二进制数据转换为我们可以识别的数据结构,所以messageReceived方法一开始就有个判断:

Java代码  
  1. //对于解码,消息类型必须是IoBuffer类型的,如果不是,转向下个filter  
  2. if (!(message instanceof IoBuffer)) {  
  3.     nextFilter.messageReceived(session, message);  
  4.     return;  
  5. }  

 解码的核心操作:

Java代码  
  1. //处理消息,如果buffer中还有数据,就处理数据  
  2. while (in.hasRemaining()) {  
  3.     int oldPos = in.position();  
  4.         try {  
  5.         synchronized (decoderOut) {  
  6.             //进行解码操作。  
  7.             decoder.decode(session, in, decoderOut);  
  8.         }  
  9. …………  
  10. }  

 我们需要实现解码器ProtocolDecoder接口,主要实现解码方法decode。可以参考TextLineDecoder类的实现,下面代码是本人实际项目中的实现:

Java代码  
  1. public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
  2.     final int packHeadLength = 2;  
  3.     // 先获取上次的处理上下文,其中可能有未处理完的数据  
  4.     Context ctx = getContext(session);  
  5.     // 先把当前buffer中的数据追加到Context的buffer当中  
  6.     ctx.append(in);  
  7.     // 把position指向0位置,把limit指向原来的position位置  
  8.     IoBuffer buf = ctx.getBuffer();  
  9.     buf.flip();  
  10.     // 当前剩余长度大于2  
  11.     while (buf.remaining() >= packHeadLength) {  
  12.         buf.mark();  
  13.         if (ByteConvertUtil.toHex(buf.get()).equalsIgnoreCase("eb")) {  
  14.             if (ByteConvertUtil.toHex(buf.get()).equalsIgnoreCase("93")) {  
  15.                 buf.reset();  
  16.                 if(buf.remaining()<11){  
  17.                     break;  
  18.                 }  
  19.                 byte[] dataArray = new byte[11];  
  20.                 buf.get(dataArray, 0, 11);  
  21.   
  22.                 if (SensorData.checkData(dataArray)) {  
  23.                     SensorData data = new SensorData(dataArray);  
  24.                     out.write(data);  
  25.                     // 回应客户端  
  26.                     byte[] b = new byte[2];  
  27.                     b[0] = ByteConvertUtil.uniteBytes("eb");  
  28.                     b[1] = ByteConvertUtil.uniteBytes("93");  
  29.                     session.write(IoBuffer.wrap(b));  
  30.                 }  
  31.             } else {  
  32.                 continue;  
  33.             }  
  34.         } else {  
  35.             continue;  
  36.         }  
  37.     }  
  38.     //断包处理,将剩余数据放入CONTEXT中  
  39.     if (buf.hasRemaining()) {  
  40.         IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);  
  41.         temp.put(buf);  
  42.         temp.flip();  
  43.         buf.clear();  
  44.         buf.put(temp);  
  45.     } else {  
  46.         buf.clear();  
  47.     }  
  48. }  

 顺便说下,我们最好要把我们的的数据包的格式提前定义好,了解了数据包的格式我们才能更好的进行数据的编解码。定义好数据包格式一方面方便编解码,另一方面可以解决下面要说的粘包和断包的问题。

数据包的定义有很多种方式,这里说下我所用过的两种方式:

1.固定消息长度,消息头+消息体+校验码。此方式相对简单,表示的内容也比较少。

2.不定消息长度,消息头+消息长度+消息体。此方式可以无限消息长度,比较灵活。

解码出一个消息体后,需要将数据通过ProtocolDecoderOutput的write方法写入到队列(queue)里面去:

Java代码  
  1. public void write(Object message) {  
  2.     if (message == null) {  
  3.         throw new IllegalArgumentException("message");  
  4.     }  
  5.     //将消息写入队列  
  6.     messageQueue.add(message);  
  7. }  

 真正执行消息向下传递是通过flush方法:

Java代码  
  1. public void flush(NextFilter nextFilter, IoSession session) {  
  2.     Queue<Object> messageQueue = getMessageQueue();  
  3.     // 取出队列里面的消息向下传递  
  4.     while (!messageQueue.isEmpty()) {  
  5.         nextFilter.messageReceived(session, messageQueue.poll());  
  6.     }  
  7. }  

 编码过程

看了上面的解码过程,编码过程就不难理解了,编码过程只不过是解码过程的逆向过程,同样在filterWrite方法里有消息类型的判断:

Java代码  
  1. //消息如果已经是IoBuffer,就不需要再进行编码  
  2. if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {  
  3.     nextFilter.filterWrite(session, writeRequest);  
  4.     return;  
  5. }  

 编码:

Java代码  
  1. // 进行数据编码  
  2. encoder.encode(session, message, encoderOut);  

 此处编码实现可以参考TextLineEncoder的编码实现,比较简单,此处就不多做解释了。

同样编码也是通过write到一个队列中,然后通过flush写入到后面的过滤器中的。

 

二、Mina对粘包和断包的处理

 

上面说了mina对编解码的支持,在解码过程中,不得不面对的一个问题就是TCP的粘包和断包,先说下什么是粘包和断包。

TCP通讯是面向数据流的通讯,我们将数据流理解为一支竹竿,数据包就相当于竹竿中的每一节,那么我们的解码过程就相当于对竹竿进行分解的过程。竹竿就是多个数据包的“粘包”,断包就是指竹节中间断开,我们需要将它拼接成为一个完整的竹节,如果不能拼接起来就要废弃这部分。

粘包:

 

 

断包:

 

 

 

 

对粘包的处理相对比较简单,只需要依据数据包的格式进行数据流的分割即可;对于断包的处理我们需要将断包的数据保存起来,等待接收下次的数据进行拼接。

通常情况下我们要考虑粘包和断包同时出现的情况下的解码代码编写。有两种实现方式:

1.继承CumulativeProtocolDecoder类,实现doDecode方法。

2.实现ProtocolDecoder接口,自己解决粘包和断包的问题。

先看下CumulativeProtocolDecoder的实现。

它有一个成员变量BUFFER:

Java代码  
  1. //存放断包数据  
  2. private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");  

 doDecode方法一方面判断数据包是否符合解码要求(数据包可能过短,数据包格式不合要求都可能不能通过解码要求),不符合刚返回false;另一方面对于符合解码要求的数据进行数据解码,并返回true。可以参考ImageRequestDecoder类的实现。

 

看下它的decode方法实现:

Java代码  
  1. public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
  2.     if (!session.getTransportMetadata().hasFragmentation()) {  
  3.         while (in.hasRemaining()) {  
  4.             // 判断是否符合解码要求,不符合则中断并返回  
  5.             if (!doDecode(session, in, out)) {  
  6.                 break;  
  7.             }  
  8.         }  
  9.         return;  
  10.     }  
  11.   
  12.     boolean usingSessionBuffer = true;  
  13.     // 取得上次断包数据  
  14.     IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);  
  15.     // If we have a session buffer, append data to that; otherwise  
  16.     // use the buffer read from the network directly.  
  17.     if (buf != null) { // 如果有断包数据  
  18.         boolean appended = false;  
  19.         // Make sure that the buffer is auto-expanded.  
  20.         if (buf.isAutoExpand()) {  
  21.             try {  
  22.                 // 将断包数据和当前传入的数据进行拼接  
  23.                 buf.put(in);  
  24.                 appended = true;  
  25.             } catch (IllegalStateException e) {  
  26.                 // A user called derivation method (e.g. slice()),  
  27.                 // which disables auto-expansion of the parent buffer.  
  28.             } catch (IndexOutOfBoundsException e) {  
  29.                 // A user disabled auto-expansion.  
  30.             }  
  31.         }  
  32.   
  33.         if (appended) {  
  34.             buf.flip();// 如果是拼接的数据,将buf置为读模式  
  35.         } else {  
  36.             // Reallocate the buffer if append operation failed due to  
  37.             // derivation or disabled auto-expansion.  
  38.             //如果buf不是可自动扩展的buffer,刚通过数据拷贝的方式将断包数据和当前数据进行拼接  
  39.             buf.flip();  
  40.             IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);  
  41.             newBuf.order(buf.order());  
  42.             newBuf.put(buf);  
  43.             newBuf.put(in);  
  44.             newBuf.flip();  
  45.             buf = newBuf;  
  46.   
  47.             // Update the session attribute.  
  48.             session.setAttribute(BUFFER, buf);  
  49.         }  
  50.     } else {  
  51.         buf = in;  
  52.         usingSessionBuffer = false;  
  53.     }  
  54.   
  55.     for (;;) {  
  56.         int oldPos = buf.position();  
  57.         boolean decoded = doDecode(session, buf, out);// 进行数据的解码操作  
  58.         if (decoded) {  
  59.             // 如果符合解码要求并进行了解码操作,  
  60.             // 则当前position和解码前的position不可能一样  
  61.             if (buf.position() == oldPos) {  
  62.                 throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");  
  63.             }  
  64.             // 如果已经没有数据,则退出循环  
  65.             if (!buf.hasRemaining()) {  
  66.                 break;  
  67.             }  
  68.         } else {// 如果不符合解码要求,则退出循环  
  69.             break;  
  70.         }  
  71.     }  
  72.     // if there is any data left that cannot be decoded, we store  
  73.     // it in a buffer in the session and next time this decoder is  
  74.     // invoked the session buffer gets appended to  
  75.     if (buf.hasRemaining()) {  
  76.         if (usingSessionBuffer && buf.isAutoExpand()) {  
  77.             buf.compact();  
  78.         } else {  
  79.             //如果还有没处理完的数据(一般为断包),刚将此数据存入session中,以便和下次数据进行拼接。
  80.             storeRemainingInSession(buf, session);  
  81.         }  
  82.     } else {  
  83.         if (usingSessionBuffer) {  
  84.             removeSessionBuffer(session);  
  85.         }  
  86.     }  
  87. }  

 上面的处理过程可以这样理解:

 

1.取得断包数据,如果有断包数据,就和当前数据拼接。

2.进行数据解码操作。

3.将可以进行解码操作的数据解码完成后,如果还有数据,则将剩余数据存入session中,等待下次数据到来,从步骤1开始再次执行。

通过继承ProtocolDecoder,实现decode方法,自己处理粘包和断包的方式其实和CumulativeProtocolDecoder类的实现原理是类似的,此处实现可以参考类TextLineDecoder,内部类Context保存了上下文信息,同样是保存在了sesion中的,具体实现方式大家可以仔细阅读代码。

 

 

三、总结

 

 

基于TCP的通讯协议才有可能产生粘包和断包的情况,粘包和断包的产生有多种原因,处理好粘包和断包的问题是网络编程必然面对的情况,对于这块的处理,大家如果有什么好的想法可以一起讨论。

每天进步一点点,不做无为的码农。。。。。

转载于:https://www.cnblogs.com/burgeen/p/3618052.html

相关文章:

  • 团队作业7——第二次项目冲刺-Beta版本项目计划
  • mysql登陆密码忘记,解决办法
  • VMPlayer Ubuntu 16.04 Copy and Paste with Host 主机与宿机之间的复制粘贴
  • MyISAM 与 InnoDB 的区别
  • 牛逼的 弹出层 layer !!!
  • mysql中OPTIMIZE TABLE的作用
  • Speed up your eclipse as a super fast IDE
  • grep 正则表达式 vim及相关知识
  • 自动部署ubuntu系统时ks.cfg和ks.seed有什么不同
  • Android学习系列(19)--App离线下载
  • oracle12c管理作业资源的一种方式
  • uva 11121(-2进制)
  • IDEA编码编译不通过
  • ❲很有料❳系统负载能力浅析
  • mysql更新一个表中的某个字段值等于另一个表的某个字段值
  • [译]前端离线指南(上)
  • 【从零开始安装kubernetes-1.7.3】2.flannel、docker以及Harbor的配置以及作用
  • extjs4学习之配置
  • IIS 10 PHP CGI 设置 PHP_INI_SCAN_DIR
  • Java比较器对数组,集合排序
  • JAVA并发编程--1.基础概念
  • orm2 中文文档 3.1 模型属性
  • puppeteer stop redirect 的正确姿势及 net::ERR_FAILED 的解决
  • SAP云平台运行环境Cloud Foundry和Neo的区别
  • Vue2.x学习三:事件处理生命周期钩子
  • 从零搭建Koa2 Server
  • 从零到一:用Phaser.js写意地开发小游戏(Chapter 3 - 加载游戏资源)
  • 翻译--Thinking in React
  • 工作中总结前端开发流程--vue项目
  • 基于webpack 的 vue 多页架构
  • 看完九篇字体系列的文章,你还觉得我是在说字体?
  • 深入 Nginx 之配置篇
  • 使用 5W1H 写出高可读的 Git Commit Message
  • 推荐一个React的管理后台框架
  • 移动端 h5开发相关内容总结(三)
  • 掌握面试——弹出框的实现(一道题中包含布局/js设计模式)
  • 做一名精致的JavaScripter 01:JavaScript简介
  • AI算硅基生命吗,为什么?
  • 仓管云——企业云erp功能有哪些?
  • #HarmonyOS:软件安装window和mac预览Hello World
  • #Java第九次作业--输入输出流和文件操作
  • #我与Java虚拟机的故事#连载15:完整阅读的第一本技术书籍
  • (4)STL算法之比较
  • (windows2012共享文件夹和防火墙设置
  • (实战篇)如何缓存数据
  • (心得)获取一个数二进制序列中所有的偶数位和奇数位, 分别输出二进制序列。
  • (原创)boost.property_tree解析xml的帮助类以及中文解析问题的解决
  • (转)人的集合论——移山之道
  • ***利用Ms05002溢出找“肉鸡
  • .htaccess 强制https 单独排除某个目录
  • .net framework 4.0中如何 输出 form 的name属性。
  • .NET/C# 检测电脑上安装的 .NET Framework 的版本
  • .NET/C# 使用反射注册事件
  • .net程序集学习心得
  • .NET简谈互操作(五:基础知识之Dynamic平台调用)