当前位置: 首页 > news >正文

netty 学习 (3)发送对象

Netty中,通讯的双方建立连接后,会把数据按照ByteBuf的方式进行传输,例如http协议中,就是通过HttpRequestDecoder对ByteBuf数据流进行处理,转换成http的对象。基于这个思路,我自定义一种通讯协议:Server和客户端直接传输java对象。 实现的原理是通过Encoder把java对象转换成ByteBuf流进行传输,通过Decoder把ByteBuf转换成java对象进行处理。 参看:http://blog.csdn.net/u013252773/article/details/21608951

Netty中,通讯的双方建立连接后,会把数据按照ByteBuf的方式进行传输,例如http协议中,就是通过HttpRequestDecoder对ByteBuf数据流进行处理,转换成http的对象。基于这个思路,我自定义一种通讯协议:Server和客户端直接传输java对象。

实现的原理是通过Encoder把java对象转换成ByteBuf流进行传输,通过Decoder把ByteBuf转换成java对象进行处理,处理逻辑如下图所示:

使用的jar包:

使用的log4j.xml文件:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

<?xmlversion="1.0"?>

<!DOCTYPElog4j:configuration SYSTEM "log4j.dtd">

<log4j:configurationxmlns:log4j="http://jakarta.apache.org/log4j/">

    <appendername="CONSOLE"class="org.apache.log4j.ConsoleAppender">

        <layoutclass="org.apache.log4j.PatternLayout">

            <paramname="ConversionPattern"value="[%-5p] [%d] [%t] [%c] %m%n"/>

        </layout>

    </appender>

     

    <appendername="FILE"class="org.apache.log4j.DailyRollingFileAppender">

        <paramname="File"value="./log/netty.log"/>

        <layoutclass="org.apache.log4j.PatternLayout">

            <paramname="ConversionPattern"value="[%-5p] [%d] [%t] [%c] %m%n"/>

        </layout>

    </appender>

     

    <appendername="FILE_ERR"class="org.apache.log4j.DailyRollingFileAppender">

        <paramname="File"value="./log/netty_err.log"/>

        <paramname="Threshold"value="ERROR"/>

        <layoutclass="org.apache.log4j.PatternLayout">

            <paramname="ConversionPattern"value="[%-5p] [%d] [%t] [%c] %m%n"/>

        </layout>

    </appender>      

     

    <loggername="io.netty"additivity="false">

        <levelvalue="INFO,DEBUG"/>

        <appender-refref="FILE"/>

        <appender-refref="FILE_ERR"/>

        <appender-refref="CONSOLE"/>

    </logger>

    <loggername="com.yao"additivity="false">

        <levelvalue="INFO,DEBUG"/>

        <appender-refref="FILE"/>

        <appender-refref="FILE_ERR"/>

        <appender-refref="CONSOLE"/>

    </logger>

     

    <root>

         

      <levelvalue="debug"/>

        <appender-refref="FILE"/>

        <appender-refref="CONSOLE"/>

        <appender-refref="FILE_ERR"/>

    </root>

</log4j:configuration>

传输的java bean为Person:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

packagecom.yao.nettyobject;

importjava.io.Serializable;

// 必须实现Serializable接口

publicclass Person implementsSerializable{

    privatestatic final long   serialVersionUID    = 1L;

    privateString  name;

    privateString  sex;

    privateint     age;

    publicString toString() {

        return"name:" + name + " sex:" + sex + " age:" + age;

    }

    publicString getName() {

        returnname;

    }

    publicvoid setName(String name) {

        this.name = name;

    }

    publicString getSex() {

        returnsex;

    }

    publicvoid setSex(String sex) {

        this.sex = sex;

    }

    publicint getAge() {

        returnage;

    }

    publicvoid setAge(intage) {

        this.age = age;

    }

}

Server端类:Server PersonDecoder BusinessHandler

1、Server:启动netty服务

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

packagecom.yao.nettyobject;

importio.netty.bootstrap.ServerBootstrap;

importio.netty.channel.ChannelFuture;

importio.netty.channel.ChannelInitializer;

importio.netty.channel.ChannelOption;

importio.netty.channel.EventLoopGroup;

importio.netty.channel.nio.NioEventLoopGroup;

importio.netty.channel.socket.SocketChannel;

importio.netty.channel.socket.nio.NioServerSocketChannel;

publicclass Server {

    publicvoid start(intport) throwsException {

        EventLoopGroup bossGroup = newNioEventLoopGroup();

        EventLoopGroup workerGroup = newNioEventLoopGroup();

        try{

            ServerBootstrap b = newServerBootstrap();

            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

                    .childHandler(newChannelInitializer<SocketChannel>() {

                                @Override

                                publicvoid initChannel(SocketChannel ch) throwsException {

                                    //解码

                                    ch.pipeline().addLast(newPersonDecoder());

                                    //业务处理

                                    ch.pipeline().addLast(newBusinessHandler());

                                }

                            }).option(ChannelOption.SO_BACKLOG,128)

                    .childOption(ChannelOption.SO_KEEPALIVE,true);

            ChannelFuture f = b.bind(port).sync();

            f.channel().closeFuture().sync();

        }finally{

            workerGroup.shutdownGracefully();

            bossGroup.shutdownGracefully();

        }

    }

    publicstatic void main(String[] args) throwsException {

        Server server = newServer();

        server.start(8000);

    }

}

2、PersonDecoder:把ByteBuf流转换成Person对象,其中ByteBufToBytes是读取ButeBuf的工具类,上一篇文章中提到过,在此不在详述。ByteObjConverter是byte和obj的互相转换的工具。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

packagecom.yao.nettyobject;

importio.netty.buffer.ByteBuf;

importio.netty.channel.ChannelHandlerContext;

importio.netty.handler.codec.ByteToMessageDecoder;

importjava.util.List;

publicclass PersonDecoder extendsByteToMessageDecoder {

    @Override

    protectedvoid decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throwsException {

        ByteBufToBytes read = newByteBufToBytes();

        Object obj = ByteObjConverter.byteToObject(read.read(in));

        out.add(obj);

    }

}

3、BusinessHandler 读取Person信息,并打印

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

packagecom.yao.nettyobject;

importio.netty.channel.ChannelHandlerContext;

importio.netty.channel.ChannelInboundHandlerAdapter;

importorg.apache.commons.logging.Log;

importorg.apache.commons.logging.LogFactory;

publicclass BusinessHandler extendsChannelInboundHandlerAdapter {

    privateLog logger  = LogFactory.getLog(BusinessHandler.class);

    @Override

    publicvoid channelRead(ChannelHandlerContext ctx, Object msg) throwsException {

        Person person = (Person) msg;

        logger.info("BusinessHandler read msg from client :" + person);

    }

    @Override

    publicvoid channelReadComplete(ChannelHandlerContext ctx) throwsException {

        ctx.flush();

    }

     

    @Override

    publicvoid exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {

         

    }

}

Client端的类:Client ClientInitHandler PersonEncoder

1、Client 建立与Server的连接

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

packagecom.yao.nettyobject;

importio.netty.bootstrap.Bootstrap;

importio.netty.channel.ChannelFuture;

importio.netty.channel.ChannelInitializer;

importio.netty.channel.ChannelOption;

importio.netty.channel.EventLoopGroup;

importio.netty.channel.nio.NioEventLoopGroup;

importio.netty.channel.socket.SocketChannel;

importio.netty.channel.socket.nio.NioSocketChannel;

publicclass Client {

    publicvoid connect(String host, intport) throwsException {

        EventLoopGroup workerGroup = newNioEventLoopGroup();

        try{

            Bootstrap b = newBootstrap();

            b.group(workerGroup);

            b.channel(NioSocketChannel.class);

            b.option(ChannelOption.SO_KEEPALIVE,true);

            b.handler(newChannelInitializer<SocketChannel>() {

                @Override

                publicvoid initChannel(SocketChannel ch) throwsException {

                    //编码

                    ch.pipeline().addLast(newPersonEncoder());

                    //

                    ch.pipeline().addLast(newClientInitHandler());

                }

            });

            ChannelFuture f = b.connect(host, port).sync();

            f.channel().closeFuture().sync();

        }finally{

            workerGroup.shutdownGracefully();

        }

    }

    publicstatic void main(String[] args) throwsException {

        Client client = newClient();

        client.connect("127.0.0.1",8000);

    }

}

2、ClientInitHandler 向Server发送Person对象

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

packagecom.yao.nettyobject;

importio.netty.channel.ChannelHandlerContext;

importio.netty.channel.ChannelInboundHandlerAdapter;

importorg.apache.commons.logging.Log;

importorg.apache.commons.logging.LogFactory;

publicclass ClientInitHandler extendsChannelInboundHandlerAdapter {

    privatestatic Log  logger  = LogFactory.getLog(ClientInitHandler.class);

    @Override

    publicvoid channelActive(ChannelHandlerContext ctx) throwsException {

        logger.info("HelloClientIntHandler.channelActive");

        Person person = newPerson();

        person.setName("yaokj");

        person.setSex("man");

        person.setAge(30);

        ctx.write(person);

        ctx.flush();

    }

}

3、PersonEncoder 把Person对象转换成ByteBuf进行传送

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

packagecom.yao.nettyobject;

importio.netty.buffer.ByteBuf;

importio.netty.channel.ChannelHandlerContext;

importio.netty.handler.codec.MessageToByteEncoder;

publicclass PersonEncoder extendsMessageToByteEncoder<Person> {

    @Override

    protectedvoid encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throwsException {

        byte[] datas = ByteObjConverter.objectToByte(msg);

        out.writeBytes(datas);

        ctx.flush();

    }

}

工具类:ByteObjConverter

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

packagecom.yao.nettyobject;

importjava.io.ByteArrayInputStream;

importjava.io.ByteArrayOutputStream;

importjava.io.IOException;

importjava.io.ObjectInputStream;

importjava.io.ObjectOutputStream;

publicclass ByteObjConverter {

    publicstatic Object byteToObject(byte[] bytes) {

        Object obj = null;

        ByteArrayInputStream bi = newByteArrayInputStream(bytes);

        ObjectInputStream oi = null;

        try{

            oi = newObjectInputStream(bi);

            obj = oi.readObject();

        }catch(Exception e) {

            e.printStackTrace();

        }finally{

            try{

                bi.close();

            }catch(IOException e) {

                e.printStackTrace();

            }

            try{

                oi.close();

            }catch(IOException e) {

                e.printStackTrace();

            }

        }

        returnobj;

    }

    publicstatic byte[] objectToByte(Object obj) {

        byte[] bytes = null;

        ByteArrayOutputStream bo = newByteArrayOutputStream();

        ObjectOutputStream oo = null;

        try{

            oo = newObjectOutputStream(bo);

            oo.writeObject(obj);

            bytes = bo.toByteArray();

        }catch(Exception e) {

            e.printStackTrace();

        }finally{

            try{

                bo.close();

            }catch(IOException e) {

                e.printStackTrace();

            }

            try{

                oo.close();

            }catch(IOException e) {

                e.printStackTrace();

            }

        }

        returnbytes;

    }

}

工具类:ByteBufToBytes

?

1

2

3

4

5

6

7

8

9

10

11

12

packagecom.yao.nettyobject;

importio.netty.buffer.ByteBuf;

publicclass ByteBufToBytes {

    publicbyte[] read(ByteBuf datas) {

        byte[] bytes = newbyte[datas.readableBytes()];

        datas.readBytes(bytes);

        returnbytes;

    }

}

通过上述代码,实现了Server端与Client端直接使用person对象进行通信的目的。基于此,可以构建更为复杂的场景:Server端同时支撑多种协议,不同的协议采用不同的Decoder进行解析,解析结果保持统一,这样业务处理类可以保持接口一致。下一节将编写这样一个案例。

本例中需要注意的事项是:

1、Person对象必须实现Serializable接口,否则不能进行序列化。

2、PersonDecoder读取ByteBuf数据的时候,并没有对多次流式数据进行处理,而是简单的一次性接收,如果数据量大的情况下,可能会出现数据不完整,这个问题会在后续的学习中解决。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • netty 学习 (2)Handler的执行顺序
  • netty 学习 (1)
  • Java设计模式——工厂设计模式
  • Java开发中的23种设计模式详解(转)
  • CMMI学习
  • NetConf协议说明
  • HashMap 与 ConcurrentHashMap
  • java-策略模式
  • SNMP4J简介
  • SNMP 使用SNMP4J V2进行TRAP
  • JTable的清空小技巧以及JTable的详细介绍
  • JFrame简单的例子
  • org.apache.catalina.deploy.WebXml addFilter
  • Tomcat version 6.0 only supports J2EE 1.2, 1.3, 1.4, and Java EE 5 Web modules
  • myEclipse中的web项目直接引入到eclipse中运行
  • CentOS7简单部署NFS
  • Codepen 每日精选(2018-3-25)
  • DataBase in Android
  • ERLANG 网工修炼笔记 ---- UDP
  • Golang-长连接-状态推送
  • iOS小技巧之UIImagePickerController实现头像选择
  • JavaScript 一些 DOM 的知识点
  • Promise面试题2实现异步串行执行
  • PyCharm搭建GO开发环境(GO语言学习第1课)
  • Shadow DOM 内部构造及如何构建独立组件
  • Storybook 5.0正式发布:有史以来变化最大的版本\n
  • 分布式任务队列Celery
  • 给新手的新浪微博 SDK 集成教程【一】
  • 源码之下无秘密 ── 做最好的 Netty 源码分析教程
  • 在Unity中实现一个简单的消息管理器
  • 字符串匹配基础上
  • ​html.parser --- 简单的 HTML 和 XHTML 解析器​
  • ​LeetCode解法汇总2583. 二叉树中的第 K 大层和
  • ​ssh-keyscan命令--Linux命令应用大词典729个命令解读
  • ​如何使用QGIS制作三维建筑
  • (2020)Java后端开发----(面试题和笔试题)
  • (C语言)编写程序将一个4×4的数组进行顺时针旋转90度后输出。
  • (C语言)深入理解指针2之野指针与传值与传址与assert断言
  • (C语言)输入自定义个数的整数,打印出最大值和最小值
  • (八)Docker网络跨主机通讯vxlan和vlan
  • (二十九)STL map容器(映射)与STL pair容器(值对)
  • (附源码)spring boot基于Java的电影院售票与管理系统毕业设计 011449
  • (黑马C++)L06 重载与继承
  • (一)基于IDEA的JAVA基础1
  • .Net Remoting常用部署结构
  • .NET程序集编辑器/调试器 dnSpy 使用介绍
  • .NET企业级应用架构设计系列之应用服务器
  • .NET中分布式服务
  • .vimrc 配置项
  • /usr/local/nginx/logs/nginx.pid failed (2: No such file or directory)
  • @RequestMapping 的作用是什么?
  • [2018][note]用于超快偏振开关和动态光束分裂的all-optical有源THz超表——
  • [BZOJ2850]巧克力王国
  • [C++数据结构之看懂就这一篇]图(上)
  • [C语言]-基础知识点梳理-编译、链接、预处理