当前位置: 首页 > news >正文

NettyのEventLoopChannel

Netty的重要组件:EventLoop、Channel、Future & Promise、Handler & Pipeline、ByteBuf

本篇主要介绍Netty的EventLoop和Channel组件。

1、Netty入门案例

        服务器端的创建,主要分为以下步骤:

  • 创建serverBootstrap对象。
  • 配置服务器的线程模型。可以指定两个线程模型,parentGroup是专门负责接收连接的线程模型,childGroup是处理读写事件的工作线程模型。

      

  • 设置 Channel 类型,这里使用NioServerSocketChannel,是基于NIO实现的,还有其他的实现如下:

  • 配置 Channel 初始化器,使用ChannelInitializer初始化NioSocketChannel,在这里我们配置了处理字符串编码以及打印字符串。
  • 绑定端口。
  ServerBootstrap serverBootstrap = new ServerBootstrap();//接受连接的线程  or 工作线程serverBootstrap.group(new NioEventLoopGroup())//服务器ServerSocketChannel的实现 是NIO 还是 BIO.channel(NioServerSocketChannel.class).childHandler(//初始化与客户端进行数据读写的通道,并且添加别的handler 在连接建立后回调new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel channel) throws Exception {//处理编码(解码 )channel.pipeline().addLast(new StringDecoder());//自己的业务逻辑,比如打印字符串channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);}});}}).bind(8080);

        客户端的创建步骤与服务器端创建的步骤类似,但是在connect方法后需要加上.sync(),以及调用writeAndFlush()方法进行收发数据。(为什么会这样后面会说明)

 new Bootstrap().group(new NioEventLoopGroup())//客户端SocketChannel的实现.channel(NioSocketChannel.class)//初始化与服务器进行数据读写的通道,并且添加别的handler 在连接建立后回调.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//字符输出编码ch.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress("localhost",8080)).sync()//阻塞方法 连接建立成功后才会执行.channel().writeAndFlush("netty");//收发数据,会调用ch.pipeline().addLast(new StringEncoder());所添加的处理器

        启动程序看一下效果(先启动服务器,再启动客户端)

2、EventLoop

        下面我们开始介绍Netty中的第一个组件,配置服务器的线程模型时设置的EventLoop。

        EventLoop类关系图:

        我们首先看下EventLoop的NIO实现:NioEventLoopGroup的构造方法:

        可以看到它有多个重载的构造方法:

        在构造方法中可以指定线程的数量,如果使用的是无参构造方法,那么默认传递的线程数是0,但是会把0传递给其他的构造方法:

        最后调用父类的构造:

        在父类的构造中,会判断传入的线程参数是否为0。

        目前很显然条件成立,就会获取成员变量DEFAULT_EVENT_LOOP_THREADS 并且再次作为参数调用父类的构造。成员变量DEFAULT_EVENT_LOOP_THREADS 会在静态代码块中被赋值。

        如果能获取到"io.netty.eventLoopThreads" key对应的value,就以该值为准,否则线程数是当前cpu核心数*2

//NioEventLoopGroup构造方法指定线程数 如果不指定为CPU可运行核心数 * 2
NioEventLoopGroup loopGroup = new NioEventLoopGroup(2);

         一个 EventLoop由一个单独的线程驱动,它不断轮询 I/O 事件并执行相应的任务:

log.debug("{}",loopGroup.next());
log.debug("{}",loopGroup.next());
log.debug("{}",loopGroup.next());

        并且EventLoop的内部维护了一个任务队列,可以提交任务到这个队列中,由 EventLoop的线程顺序执行。

        向EventLoop提交任务:

//执行普通任务
loopGroup.next().submit(() -> log.debug("run..."));
//执行定时任务
loopGroup.next().scheduleAtFixedRate((Runnable) () -> log.debug("run with schedule..."),0,1, TimeUnit.SECONDS);

        下面通过一个案例来加深对EventLoop的理解:

        改造最初的入门案例中的代码,主要体现在.group方法,这次传递了两个参数,将负责连接的EventLoop和负责读写的EventLoop分离开,并且给负责读写的EventLoop设置了两个线程。

 //boss只负责接受连接 1个线程 worker负责读写 2个线程
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))

        服务端的代码和之前的相同,我们先启动一个服务端,再启动三个客户端:

        三个连接(channel)由两个EventLoop轮询处理,并且每个连接(channel)和EventLoop是绑定的,一个EventLoop可以负责多条消息的处理。

        如图所示:

        同时EventLoop也有不同的实现:

        案例进一步细化,我们可以再设置一个EventLoop 专门处理其他任务:

EventLoopGroup eventLoop = new DefaultEventLoopGroup();

         改造.childHandler:

  .childHandler(new ChannelInitializer<NioSocketChannel>() {/*** 工序有多道,合在一起就是 pipeline,* pipeline 负责发布事件(读、读取完成...)传播给每个 handler,* handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)* @param ch* @throws Exception*/@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast("handle1",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = ((ByteBuf) msg);log.debug(byteBuf.toString(Charset.defaultCharset()));//责任链模式,将消息传递个下一个处理器ctx.fireChannelRead(msg);}}).addLast(eventLoop,"handle2",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = ((ByteBuf) msg);log.debug(byteBuf.toString(Charset.defaultCharset()));}});}})

        启动一个客户端:

        相当于链式调用:

小结:

  • 在创建EventLoopGroup实例时,可以指定线程数,如果没有指定,默认使用cpu核心数*2。
  • 调用ServerBootstrap的.group时,可以将EventLoop细化成为专门处理连接以及负责读写的。
  • 可以利用pipeline()的.addLast,链式组合多个EventLoopGroup,实现不同的功能。
  • 每一个EventLoop可以轮询处理多个Channel事件,但是会和Channel绑定,线程间相互独立。

3、Channel

        Channel在Netty中代表一个可以执行I/O操作(如读、写、连接、绑定等)的对象。它可以是一个套接字连接、文件、管道等。

        Netty支持多种类型的Channel,包括NioSocketChannel(用于客户端连接)、NioServerSocketChannel(用于服务端绑定)以及其他专门用途的Channel如EmbeddedChannel等。

        Channel的I/O操作都是异步的,会返回一个ChannelFuture对象,用于表示操作的结果:

 ChannelFuture channelFuture = new Bootstrap()// 。。。。。。.connect(new InetSocketAddress("localhost", 8080));

        并且每个Channel都有一个与之关联的ChannelPipeline。Pipeline中包含多个ChannelHandler,每个Handler负责特定的处理逻辑。

为什么在案例代码中,客户端写出数据之前必须要调用.sync()方法?

        因为Channel的I/O操作都是异步的,是主线程调用了.connect() 方法,但是建立连接是在NioSocketChannel所在线程。.sync()方法的作用就是让主线程在此处阻塞,等到NioSocketChannel所在线程建立完成连接,主线程才会继续向下执行。(如果不使用.sync()方法,主线程会在连接没有建立完成的时候继续执行后续代码,服务端无法正常接受消息。)

        与此类似的还有.close()方法,如果我们想在断开连接后执行一段自己的逻辑:

        客户端代码:

        //... new BootStrap...//接受控制台输入,q则断开连接new Thread(()->{Scanner scanner = new Scanner(System.in);while (true){String str = scanner.nextLine();if ("q".equals(str)){//关闭channel连接channel.close();break;}channel.writeAndFlush(str);}},"input").start();

        可以将执行断开后逻辑的代码放在input 线程的channel.close();后,或者主线程中吗?

        答案是否定的:

  1. 如果放在主线程中,那么input 线程和主线程是并行执行的,无法控制先后顺序。
  2. 如果放在input 线程的channel.close(); 后,也是不行的。因为channel.close(); 方法也是异步调用,由NioSocketChannel所在线程负责关闭连接:

        解决该问题有两个方案:

        方案一的思路和解决.connect() 方法异步调用的类似,都是使用.sync()方法阻塞主线程,等待input 线程中的channel.close(); 执行完成后再由主线程处理连接关闭后释放资源

ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
log.debug("连接关闭后释放资源");

        第二种方案是通过调用ChannelFuture的.addListener 方法,添加一个监听器,由nioEventLoopGroup所在线程关闭连接后,处理连接关闭后释放资源(释放资源和处理后续都是nioEventLoopGroup同一线程。):

closeFuture.addListener((ChannelFutureListener) future -> log.debug("连接关闭后释放资源"));

相关文章:

  • 超高清图像生成新SOTA!清华唐杰教授团队提出Inf-DiT:生成4096图像比UNet节省5倍内存。
  • 银行数仓项目实战(一)--什么是数据仓库
  • 独立游戏之路:Tap篇 -- Unity 集成 TapTap 广告详细步骤
  • MySQL触发器基本结构
  • React@16.x(32)useDebugValue
  • flutter 环境搭建(windows)(先装 jdk 建议1.8起步)
  • Spring IOC的优、缺点
  • 视频合成渲染服务解决方案,数字人+PPT+视频云剪辑
  • 搭建RocketMQ主从异步集群
  • vue前段处理时间格式,设置开始时间为00:00:00,设置结束时间为23:59:59
  • Langchain-chatchat: Langchain基本概念
  • SaaS案例分享:成功构建销售渠道的实战经验
  • homework 2024.06.17 math, UI
  • 【Linux硬盘数据读取】WIN10访问linux分区解决方案:ext2fsd
  • 企业内部、与合作伙伴/客户文档协作如何高效安全地收集资料?
  • 《Java编程思想》读书笔记-对象导论
  • 【Redis学习笔记】2018-06-28 redis命令源码学习1
  • 【刷算法】求1+2+3+...+n
  • Java 实战开发之spring、logback配置及chrome开发神器(六)
  • leetcode讲解--894. All Possible Full Binary Trees
  • MYSQL如何对数据进行自动化升级--以如果某数据表存在并且某字段不存在时则执行更新操作为例...
  • MySQL数据库运维之数据恢复
  • node入门
  • 工作踩坑系列——https访问遇到“已阻止载入混合活动内容”
  • 工作中总结前端开发流程--vue项目
  • 基于webpack 的 vue 多页架构
  • 开放才能进步!Angular和Wijmo一起走过的日子
  • 使用Swoole加速Laravel(正式环境中)
  • 运行时添加log4j2的appender
  • 哈罗单车融资几十亿元,蚂蚁金服与春华资本加持 ...
  • #我与Java虚拟机的故事#连载03:面试过的百度,滴滴,快手都问了这些问题
  • #中国IT界的第一本漂流日记 传递IT正能量# 【分享得“IT漂友”勋章】
  • (2)STL算法之元素计数
  • (23)mysql中mysqldump备份数据库
  • (C)一些题4
  • (ZT)出版业改革:该死的死,该生的生
  • (第8天)保姆级 PL/SQL Developer 安装与配置
  • (二)基于wpr_simulation 的Ros机器人运动控制,gazebo仿真
  • (牛客腾讯思维编程题)编码编码分组打印下标题目分析
  • (转)程序员疫苗:代码注入
  • .【机器学习】隐马尔可夫模型(Hidden Markov Model,HMM)
  • .ai域名是什么后缀?
  • .NET : 在VS2008中计算代码度量值
  • .NET 跨平台图形库 SkiaSharp 基础应用
  • .net 无限分类
  • .Net7 环境安装配置
  • .net反编译工具
  • .NET中的十进制浮点类型,徐汇区网站设计
  • .so文件(linux系统)
  • @Autowired 与@Resource的区别
  • [ 英语 ] 马斯克抱水槽“入主”推特总部中那句 Let that sink in 到底是什么梗?
  • [Android]使用Android打包Unity工程
  • [C# 基础知识系列]专题十六:Linq介绍
  • [C#]winform基于opencvsharp结合Diffusion-Low-Light算法实现低光图像增强黑暗图片变亮变清晰
  • [C语言]——柔性数组