当前位置: 首页 > news >正文

简易RPC框架-私有协议栈

HTTP协议

客户机与服务端之间的数据交互需要遵守一定的约定,比如协议版本,数据类型,是否有缓存,是否有压缩等,只有在这些约定的基础上才能相互之间愉快的工作。

Netty通信过程中的编解码

这时说的是基于TCP/IP的Netty之间的通信。TCP/IP协议下客户端与服务端之间要进行数据交互,一般需要将数据转换成二进制格式,直接传java bean是不能支持的。在RPC模式下客户端在向服务端发起请求前需要将数据做编码,服务端在接收客户端发的数据后需要做解码之后才能正常工作。

  • 解码流程

  • 编码流程

Netty 私有协议栈

为了更好的控制RPC客户端与服务端之间的通信,也可以编写私有的协议栈来支撑。

定义消息体

类似HTTP协议,包含头信息以及内容信息。


public class RpcMessage implements Serializable {

    private RpcMessageHeader messageHeader;

    private Object messageBody;

}

头信息,包含内容体长度,消息类型等信息。可以根据消息类型来做不同的业务,比如区分是心跳信息还是业务或者是监控之类的信息。


public class RpcMessageHeader implements Serializable {
    private int length;

    private int type;
   
}

定义解码器

因为TCP/IP协议容易出现粘包拆包现象,这里为了简单直接选择继承组件提供的LengthFieldBasedFrameDecoder,只需要重写下面的方法即可:


 public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame=(ByteBuf)super.decode(ctx,in);
        if(null==frame){
            return null;
        }

        RpcMessage message=new RpcMessage();
        RpcMessageHeader messageHeader=new RpcMessageHeader();
        messageHeader.setLength(frame.readInt());
        message.setMessageHeader(messageHeader);

        byte[] data = new byte[message.getMessageHeader().getLength()];
        frame.readBytes(data);

        Object obj = ProtoStuffSerializeUtil.deserialize(data, genericClass);
        message.setMessageBody(obj);
        return message;
    }

定义编码器

编码器继承MessageToByteEncoder,将对象转换成字节的编码器


public class RpcEncoder extends MessageToByteEncoder<RpcMessage>

重点是下面的编码函数,在ByteBuf中输出数据长度以及数据体,如有其它需要可以补充其它的字段,比如消息类型。


 public void encode(ChannelHandlerContext ctx, RpcMessage in, ByteBuf out) throws Exception {
        if(null==in){
            throw new RpcException("RpcMessage is null");
        }
        if (genericClass.isInstance(in.getMessageBody())) {
            byte[] data = ProtoStuffSerializeUtil.serialize(in.getMessageBody());
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }

ServerHandle

  • 修改服务端执行器消息实体类型为新定义的RpcMessage

public class RpcServerInvoker extends AbstractInvoker<RpcMessage> 
  • 修改服务端回调

从服务端方法获取到返回的结果后,重新封装成消息对象(RpcMessage)发送给客户端。


protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage message) {

        this.executor.execute(new Runnable() {
            @Override
            public void run() {
                RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);
                RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation((RpcRequest) message.getMessageBody()));
                RpcMessage responseMessage=new RpcMessage();
                byte[] data = ProtoStuffSerializeUtil.serialize(response);
                RpcMessageHeader messageHeader=new RpcMessageHeader();
                messageHeader.setLength(data.length);
                responseMessage.setMessageHeader(messageHeader);
                responseMessage.setMessageBody(response);
                channelHandlerContext.writeAndFlush(responseMessage);
            }
        });

    }

ClientHandle

  • 修改客户端执行器消息实体类型为新定义的RpcMessage

public class RpcClientInvoker extends AbstractInvoker<RpcMessage>
  • 修改客户端回调方法

接收的返回结果修改为RpcMessage,从body属性中获取原来的RpcResponse对象


public void channelRead0(ChannelHandlerContext ctx, RpcMessage message) {
        RpcResponse response=(RpcResponse) message.getMessageBody();
        String requestId = response.getRequestId();
        ResponseFuture responseFuture = pendingRPC.get(requestId);
        if (responseFuture != null) {
            pendingRPC.remove(requestId);
            responseFuture.done(response);
        }
    }
  • 修改发送请求的消息对象,组装成RpcMessage发送

public ResponseFuture invoke(RpcInvocation invocation) {
        RpcRequest request=this.getRpcRequest();
        ResponseFuture responseFuture = new ResponseFuture(request);
        pendingRPC.put(request.getRequestId(), responseFuture);
        RpcMessage message=new RpcMessage();
        byte[] data = ProtoStuffSerializeUtil.serialize(request);
        RpcMessageHeader messageHeader=new RpcMessageHeader();
        messageHeader.setLength(data.length);
        message.setMessageHeader(messageHeader);
        message.setMessageBody(request);
        channel.writeAndFlush(message);
        return responseFuture;
    }

本文源码

https://github.com/jiangmin168168/jim-framework

文中代码是依赖上述项目的,如果有不明白的可下载源码

引用

  • 文中插图来自来网络
  • 文中的思路参考了Netty权威指南

转载于:https://www.cnblogs.com/ASPNET2008/p/7588822.html

相关文章:

  • apt软件管理
  • SPSS超详细操作:分层回归(hierarchical multiple regression)
  • position: absolute;绝对定位水平居中问题
  • Java 深复制和浅复制
  • 【highlight.js】页面代码高亮插件
  • mxnet的训练过程——从python到C++
  • Nengo 神经网络
  • Linux正则和grep命令
  • Azure 中 Linux 虚拟机的大小
  • 【bzoj1758】[Wc2010]重建计划
  • ros 如何使用 openni2_launch
  • Git应用实践(二)
  • 软件项目管理第2次作业:豆瓣测评
  • 【个人训练】(POJ1276)Cash Machine
  • [转] logback 常用配置详解(序)logback 简介
  • [微信小程序] 使用ES6特性Class后出现编译异常
  • 〔开发系列〕一次关于小程序开发的深度总结
  • angular学习第一篇-----环境搭建
  • AWS实战 - 利用IAM对S3做访问控制
  • Java到底能干嘛?
  • Map集合、散列表、红黑树介绍
  • PhantomJS 安装
  • RxJS 实现摩斯密码(Morse) 【内附脑图】
  • SpringCloud集成分布式事务LCN (一)
  • Vue.js-Day01
  • 高度不固定时垂直居中
  • 前端设计模式
  • 深入浅出Node.js
  • 使用API自动生成工具优化前端工作流
  • 智能网联汽车信息安全
  • Android开发者必备:推荐一款助力开发的开源APP
  • 阿里云ACE认证之理解CDN技术
  • ​一些不规范的GTID使用场景
  • #define
  • #我与Java虚拟机的故事#连载09:面试大厂逃不过的JVM
  • (心得)获取一个数二进制序列中所有的偶数位和奇数位, 分别输出二进制序列。
  • (一)搭建springboot+vue前后端分离项目--前端vue搭建
  • (一)使用IDEA创建Maven项目和Maven使用入门(配图详解)
  • (轉貼) 資訊相關科系畢業的學生,未來會是什麼樣子?(Misc)
  • (最简单,详细,直接上手)uniapp/vue中英文多语言切换
  • .CSS-hover 的解释
  • .NET 8 编写 LiteDB vs SQLite 数据库 CRUD 接口性能测试(准备篇)
  • .NET CF命令行调试器MDbg入门(一)
  • .net framework 4.0中如何 输出 form 的name属性。
  • .NET Micro Framework 4.2 beta 源码探析
  • .Net程序帮助文档制作
  • .NET企业级应用架构设计系列之结尾篇
  • .Net中的集合
  • @JSONField或@JsonProperty注解使用
  • @select 怎么写存储过程_你知道select语句和update语句分别是怎么执行的吗?
  • [ vulhub漏洞复现篇 ] ThinkPHP 5.0.23-Rce
  • [Android学习笔记]ScrollView的使用
  • [Apio2012]dispatching 左偏树
  • [C#C++]类CLASS
  • [C/C++随笔] char与unsigned char区别