当前位置: 首页 > news >正文

Netty源码系列 之 EventLoop run()方法 源码

EventLoop[实现类为NioEventLoop,我们研究NioEventLoop即可]

EventLoop是一个单线程的线程池

核心作用:处理执行IO操作(accept,read,write事件),普通任务,定时任务

EventLoop封装了Selector复用器,Thread线程,以及任务队列

为什么EventLoop需要一个任务队列?

因为EventLoop是一个单线程的线程池,如果有多个任务请求过来时,可能同时处理不了这么多,所以需要暂时存储到一个任务队列中逐个慢慢处理。

我们上面说了EventLoop封装了Selector复用器,Thread线程,以及任务队列,如何证明?

先看EventLoop体系图,它肯定继承了各种类,各种父类的属性为EventLoop提供了以上属性 或者是 EventLoop自己这个类也会封装一些属性。

当然,由于EventLoop只是一个接口,所以当我们需要研究观察NioEventLoop的体系图:

如下分析:

1.

2.

NioEventLoop继承ScheduledExecutorService接口,所以该接口实现类就是NioEventLoop的父类

SingleThreadEventExecutor这个父类提供任务队列和线程这两个属性

3.AbstractScheduledEventExecutor类是NioEventLoop的父类,所以NioEventLoop继承拥有父类的所有属性

AbstractScheduledEventExecutor类为NioEventLoop提供用于存储定时任务的优先级队列这个属性

总结:

NioEventLoop实现了EventLoop接口,NioEventLoop是一个集大成者,它不仅仅是一个普通的线程对象,而是一个具有Selector,任务队列缓存功能等的优秀组件。

由此也可以看出,一个EventLoop[NioEventLoop]对应一个Selector复用器类

所以:EventLoop接口实现类NioEventLoop拥有以下核心属性:

1.private Selector selector; //包装后的Selector对象

2.private Selector unwrappedSelector;//未包装的Selector对象

3.private final Queue<Runnable> taskQueue;//负责存储未执行完的普通任务。因为NioEventLoop是单线程,如果有多个任务,那么任务队列是必须的。

4.private volatile Thread thread;//线程对象,真正执行代码逻辑的对象

5. PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;//用于存储定时任务的优先级队列。该队列具体底层的实现对应的数据结构HashWheelTimer

这么多属性共同铸就了NioEventLoop的功能特性

  • 问题:NioEventLoop中的Selector选择器什么时候被创建的?

在NioEventLoop的构造器中:

1.final SelectorTuple selectorTuple = openSelector();

该方法就是先获取windows操作系统(或其他操作系统)所对应的最原生的Selector对象,也就是未包装的Selector对象:unwrappedSelector。然后再通过反射技术把unwrappedSelector对应的属性等赋值给Selector,返回SelectorTuple对象,该对象包含Selector对象和unwrappedSelector对象

2.this.selector = selectorTuple.selector;

获取到包装后的Selector对象赋值给属性selector

3.this.unwrappedSelector = selectorTuple.unwrappedSelector;

获取到未包装的Selector对象给属性unwrappedSelector

进入openSelector方法:

openSelector方法:

1.把unwrappedSelector对应的属性赋值给selector

2.会把通过unwappedSelector拿到 selectedKey对应的HashSet集合,然后把该集合中的数据赋值给selector的SelectionKey[] keys;数组里。Netty底层反射完成。

补充:

Selector属性和unwrappedSelector属性二者所对应的对象有什么区别?

在Netty中,这两个属性的区别如下:

  1. private Selector selector: 这是Netty中的一个属性,表示经过包装的Selector对象。Netty在创建和管理Selector时,会对其进行包装,以提供更高级的功能和对底层Selector的优化。这个selector属性是Netty的EventLoop中使用的,用于处理网络事件。通过selector,Netty可以监听和处理多个Channel上的事件,并通过事件驱动的方式进行网络编程。
  2. private Selector unwrappedSelector: 这也是Netty中的一个属性,表示未经包装的原始底层Selector对象。这个unwrappedSelector属性是Netty的EventLoop中使用的,用于处理底层的I/O事件。Netty会使用自己的EventLoopGroup来创建和管理Selector,并将其包装成unwrappedSelector。通过unwrappedSelector,Netty可以对底层Selector进行更高级的操作和管理,例如处理空轮询、优化事件的触发和取消等。

总结起来,selector是经过Netty包装的Selector对象,用于处理网络事件,而unwrappedSelector是未经包装的底层Selector对象,用于处理底层的I/O事件。它们在Netty的EventLoop中扮演不同的角色,分别负责处理不同层次的事件。

Netty的Selector对底层的unwrappedSelector进行了封装,以提供更高级的操作和管理。其中包括处理空轮询的情况,优化事件的触发和取消等。

通过封装unwrappedSelector,Netty可以在底层unwrappedSelector的基础上实现一些额外的功能和优化,以提高网络编程的性能和可靠性,最终得出了NioEventLoop中的selector属性。这些功能和优化包括但不限于:

  1. 处理空轮询:当底层Selector在轮询时没有任何事件发生时,Netty会进行特殊处理,避免空轮询的问题,从而提高了事件的处理效率。【具体如何处理空轮询的?后续会分析到,使用的就是一个计数器计数,等到一定条件后,重构Selector】
  2. 优化事件的触发和取消:Netty可以根据具体的业务需求和网络情况,对事件的触发和取消进行优化。例如,可以通过调整事件的触发条件和取消条件,避免不必要的事件触发和处理,提高网络应用的性能和响应速度。
  • 自己写一个反射案例吧
@Slf4j
public class User {private String name;
}
package com.messi.netty_source_03.Test02;import lombok.extern.slf4j.Slf4j;import java.lang.reflect.Field;@Slf4j
public class TestUser {public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {User user = new User();Class<? extends User> userClazz = user.getClass();Field name = userClazz.getDeclaredField("name");log.debug("name {} ", name);//打破封装  让外部的其他类也可以访问User类的私有属性namename.setAccessible(true);//user.namename.set(user, "leomessi");log.debug("value is {} ",name.get(user));}
}

测试:

  • NioEventLoop中的两个属性:selector属性比unwrappedSelector属性,哪一个使用的多?

selector属性比unwrappedSelector属性用的多

这两者有啥区别:

1.unwrappedSelector是原生的操作系统分配的Selector对象,具有很多Bug,也是java原生NIO所使用的Selector复用器。所以NIO许多bug,如:jdk1.7之前的空轮询bug,莫名其妙的bug。

2.

通过封装unwrappedSelector,Netty可以在底层unwrappedSelector的基础上实现一些额外的功能和优化,以提高网络编程的性能和可靠性,最终得出了NioEventLoop中的selector属性。这些功能和优化包括但不限于:

  1. 处理空轮询:当底层Selector在轮询时没有任何事件发生时,Netty会进行特殊处理,避免空轮询的问题,从而提高了事件的处理效率。【具体如何处理空轮询的?后续会分析到,使用的就是一个计数器计数,等到一定条件后,重构Selector】
  2. 优化事件的触发和取消:Netty可以根据具体的业务需求和网络情况,对事件的触发和取消进行优化。例如,可以通过调整事件的触发条件和取消条件,避免不必要的事件触发和处理,提高网络应用的性能和响应速度。

到目前为止:我们可以发现,Selector做了什么优化?存储Selector上对应的SelectionKey时,原生JavaNIO使用的是Set集合,而Netty使用的是数组。啥是SelectionKey?在上一个小节的最后总结过。

演示如下:

NIO:

Netty:

总结:

这仅仅是Netty对于Selector的优化。后续还有FastThreadLocal,HashWheelTimer的优化。

Netty让SelectedKey存储形式从javaNIO中的Set集合类型,转变成数组类型。数组类型更加有利于提升数据的遍历查找的性能。但是你不可以说仅仅这一个优化,就让Netty成为了高性能框架。你也不能说在高并发场景下,使用数组的绝对优势。还是那句话,这一个优化只是Netty对javaNIO或java jdk原生提供的类型的优化之一,后续还有FastThreadLocal,HashWheelTimer,这两种数据结构也是Netty对jdk中原生数据结构的优化的代表作。正是因为这么多个优化的共同协作下,才让Netty成为了一个异步事件监听回调的高性能框架。

并且你不能说jdk原生的数据结构不好,而是在Netty通信所在的高并发场景下,我们需要选取适应场景所需的数据结构,如果原生数据结构不符合,那么需要做优化定制。

EventLoop[NioEventLoop]的Nio线程什么时候启动呢?&& 如何进行IO操作,普通任务处理,定时任务处理呢?&& Netty如何解决NIO-Selector空转的问题

  • 测试用例
package com.messi.netty_source_03.Test03;import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;import java.util.concurrent.TimeUnit;public class TestEventloop2 {public static void main(String[] args) {NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();//设置IoRatio//eventLoopGroup.setIoRatio(80);EventLoop eventLoop = eventLoopGroup.next();//一切准备好后,直接开始执行该任务eventLoop.execute(() -> {System.out.println("hello suns");});//定时任务。一切准备好后,也得再过200秒后才会执行该定时任务eventLoop.schedule(() -> {System.out.println("TestEventloop2.main");}, 200, TimeUnit.SECONDS);}
}

debug源码过程:

1.

2.

3.

4.

5.


 

6.进入doStartThread方法

核心:run方法【涉及IO,普通任务,定时任务时间的分配 && Netty如何解决NIO-Selector空转的问题】

接着上面进行debug,进入run方法:

1.

我们所做的逻辑就是:不要让selector.select()一直阻塞,因为我们还需要处理定时任务和普通任务。

2.

三种情况:

情况1:

IO操作全部做完后再执行普通任务。但是当普通任务执行的过程中,又有新的IO操作来了,如果任务还没有执行完,那么还是会执行未处理完的普通任务【为什么会这样?因为ranTasks=runAllTasks()没有传递参数,所以执行任务操作时,没有限制时间,会一直执行到任务处理结束】。显然,这种ioRatio==100的做法是不正确的。所以说,要设计一个比例,让普通任务不能完全阻塞影响到后续进入的IO网络通信的执行【因为网络通信是不能延迟的!就算普通任务没有执行完,网络通信IO也要先执行,所以说设置一个好的ioRatio比例值是非常重要的】

情况2:

ioRatio!=100,先执行完全部的IO操作。然后执行任务时会根据ioRatio计算出一个执行时间长度限制,即使任务到了时间限制后没有执行完毕,那么也不能再执行任务了,而是会跳转去执行IO操作,IO操作可以无限制时间的去执行,直到IO执行完毕【因为IO操作是网络通信,用户不能等】

这样就解决了情况1的问题【任务无截止时间的执行导致后续进来的IO执行的阻塞】

情况3:

此时只有任务需要执行,没有IO操作。会没有限制时间的执行所有的任务。

如何解决epoll空转问题?

在linux操作系统下的epoll模型,如果我们把java代码部署在linux操作系统上并且会调用epoll的话。

在原生jdk-NIO,从jdk1.6开始出现selector.select()空转[epoll空转]问题,号称在jdk1.7已经修复了,但是jdk1.8还是会存在空转的问题。只不过空转的几率会越来越小。修复了很多版本。但是记住:空转这个问题是一个极其偶发的事情,几率很低。

先说一下,什么叫做epoll空转?

所谓空转就是:当前没有IO事件或任务[普通任务或定时任务]时,本应该阻塞的selector.select()方法突然停止阻塞然后不断的执行while循环,所谓空轮询【空转】就是:没有意义的不断循环,因为此时压根就没有任务或IO事件需要你处理。一旦空轮询发生,也就是一直会while死循环,那么cpu占有率达到100%是迟早的事情

但是Netty解决了这个臭名昭著的问题。

如何解决的呢?就是在run方法中定义了一个计数器变量,当没有IO或任务执行时,但是该计数器变量在一定时间内增加到512后会执行重构selector的操作。重构后的selector可能就不会出现空转问题了,因为空转是一个极其偶发的事情,几率很低。但是有一点你需要知道:Netty并没有从本质上解决jdk原生NIO调用epoll时的空转问题,而是当出现空转时,我们重构selector,毕竟空转就是一个偶发几率低的事件,那么重构后很大可能就不会空转了。

直接看源码中如何解决的:

1.

2.

3.

Selector重构涉及到很多方面,不再过多描述。

EventLoop如何处理IO操作

NioEventLoop找到selector对象,selector找到对应所有的SelectionKey,SelectionKey再找到对应的attachment附件:Channel对象。拿到Channel对象可以获取到pipeline对象,进行网络IO操作。

EventLoop--->NioEventLoop--->selector--->SelectionKey[]

--->NioServerSocketChannel/NioSocketChannel[附件]--->pipeline--->网络IO操作

  • debug代码如下
package com.messi.netty_source_03.class_04;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;public class NettyServer {public static void main(String[] args) throws InterruptedException {EventLoopGroup eventLoopGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(eventLoopGroup);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());}});Channel channel = serverBootstrap.bind(8000).sync().channel();channel.closeFuture().sync();}}
package com.messi.netty_source_03.class_04;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;
import java.nio.charset.Charset;public class MyNettyClient {private static final Logger log = LoggerFactory.getLogger(MyNettyClient.class);public static void main(String[] args) throws InterruptedException {log.debug("myNettyClientStarter------");EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);Bootstrap group = bootstrap.group(eventLoopGroup);//32 ---> 1 IO操作 31线程bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isWritable()) {ByteBufAllocator alloc = ctx.alloc();ByteBuf buffer = alloc.buffer();buffer.writeCharSequence("xiaohei", Charset.defaultCharset());ctx.writeAndFlush(buffer);}}});}});Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();channel.closeFuture().sync();}
}
  • debug流程【主要记录IO操作的流程】

预备工作:

debug方式启动服务端

在服务端debug调试完成bind绑定操作后,【bind绑定的流程省略】。

bind流程:完成NioServerSocketChannel的注册,使得服务端完成初始化操作。给NioServerSocketChannel分配一个对应的SelectionKey。并且完成事件的注册。

服务端完成bind操作后,使用运行方式启动客户端

直接快进到IO操作的处理

1.selector.select()方法停止阻塞,向下找到IO事件处理的方法

2.

3.

如果对存储SelectionKey的结构有优化,那么使用数组存储SelectionKey,则走第一个if分支

如果对存储SelectionKey的结构无优化,还是使用Set集合存储SelectionKey,则走第二个else分支

知识点回顾:

附件是什么?

附件就是NioChannel,可以是NioServerSocketChannel,也可以是NioSocketChannel。通过这些Channel才可以拿到对应的pipeline管道,通过管道我们才可以进行相应的读写IO,连接的网络操作。正是通过这种附件的方式,NIO才和Netty完美的整合在了一起。

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

为什么在读数据的时候进行循环读取?

原因:我们知道从网卡接收的数据一开始是拷贝到recv-socket接收缓冲区的,我们在应用层进行开启ByteBuf进行读入socket缓冲区的数据,如果socket缓冲区的数据大于ByteBuf的大小,那么一次性是读取不完socket缓冲区的数据的。所以要循环读取。

所以fireChannelRead(byteBuf)方法被多次调用并不就是等同于Client客户端发了多次数据:

可能就像上面分析的那样,客户端只发了一次数据,服务端recv-socket缓冲区接收这份数据,但是由于服务端应用层的ByteBuf缓冲区过小,导致服务端需要处理读入多次才能处理完毕,导致channelRead方法被调用多次。

当然,也有可能是客户端发了多次数据,服务端channelRead方法被调用多次进行接收。

19.

doReadBytes方法

20.

四个条件,任意一个条件为false,那么就返回false,如果返回false,那么循环就会退出。

下面来说一下这四个条件的情况:

条件1:一般都为true

条件2:如果ByteBuf写满了,那么说明可能socket缓冲区数据还没读完,那么继续读,为true。如果ByteBuf没写满,说明socket缓冲区数据读完了,无需再继续读,为false。

条件3:do while循环最多执行16次,如果达到16次,循环退出即可。如果16次没有完成处理,那么也会退出,但是selector.select()依然会继续监听read方法,可以下一次再继续read。

为什么要设置一个边界值16?

为什么要循环16次后如果还没有处理完,那么就退出?

因为对于IO操作来说,它是一个强阻塞且数据量较大的操作。IO操作本身占用线程资源时间就长,很有可能16次处理不完,但是对于其他的操作而言,把线程资源分配给它们,很有可能CPU一瞬间就执行完毕了这些非IO任务。所以设置了一个边界值16,防止其他非IO的task一直处于饥饿等待。

其实这就是多路复用线程资源的思想,让一个线程资源给多个客户端的多个任务去使用。并且切记这里不是阻塞执行IO操作,只是打断,给其他任务使用执行一下啦。

但是一般循环16次都可以处理完socket缓冲区的数据。一般16次循环,并且在动态调整ByteBuf的情况下,可以读取处理16GB的socket缓冲区数据。大吧。

条件4:一般都为true

相关文章:

  • Swift Combine 发布者订阅者操作者 从入门到精通二
  • 蓝桥杯嵌入式第8届真题(完成) STM32G431
  • yt-dlp快速上手
  • vim最简单命令学习
  • Python脚本之操作Elasticsearch【二】
  • C# async/await的使用
  • kubernetes部署nacos2.3.0
  • 网站服务器中毒或是被入侵该怎么办?
  • Leetcode的AC指南 —— 栈与队列 :1047.删除字符串中的所有相邻重复项
  • YUM | 包安装 | 管理
  • Oracle Vagrant Box 扩展根文件系统
  • Android Build 依赖项
  • JVM 性能调优 - Java 虚拟机内存体系(1)
  • C++俄罗斯方块 -- 菜单展示和选择 -- 方法
  • AR人脸106240点位检测解决方案
  • 【挥舞JS】JS实现继承,封装一个extends方法
  • angular组件开发
  • css选择器
  • java多线程
  • Java面向对象及其三大特征
  • Laravel 中的一个后期静态绑定
  • ViewService——一种保证客户端与服务端同步的方法
  • -- 查询加强-- 使用如何where子句进行筛选,% _ like的使用
  • 利用DataURL技术在网页上显示图片
  • 聊聊hikari连接池的leakDetectionThreshold
  • 深度学习中的信息论知识详解
  • 我建了一个叫Hello World的项目
  • 一道面试题引发的“血案”
  • 阿里云服务器如何修改远程端口?
  • 阿里云重庆大学大数据训练营落地分享
  • 进程与线程(三)——进程/线程间通信
  • 京东物流联手山西图灵打造智能供应链,让阅读更有趣 ...
  • (+4)2.2UML建模图
  • (173)FPGA约束:单周期时序分析或默认时序分析
  • (C#)Windows Shell 外壳编程系列9 - QueryInfo 扩展提示
  • (floyd+补集) poj 3275
  • (板子)A* astar算法,AcWing第k短路+八数码 带注释
  • (非本人原创)我们工作到底是为了什么?​——HP大中华区总裁孙振耀退休感言(r4笔记第60天)...
  • (六)c52学习之旅-独立按键
  • (三)c52学习之旅-点亮LED灯
  • (删)Java线程同步实现一:synchronzied和wait()/notify()
  • (原)Matlab的svmtrain和svmclassify
  • .[backups@airmail.cc].faust勒索病毒的最新威胁:如何恢复您的数据?
  • .NET Core 将实体类转换为 SQL(ORM 映射)
  • .Net中ListT 泛型转成DataTable、DataSet
  • @Transient注解
  • [100天算法】-不同路径 III(day 73)
  • [AMQP Connection 127.0.0.1:5672] An unexpected connection driver error occured
  • [C#] 基于 yield 语句的迭代器逻辑懒执行
  • [EFI]Lenovo ThinkPad X280电脑 Hackintosh 黑苹果引导文件
  • [Flutter] extends、implements、mixin和 abstract、extension的使用介绍说明
  • [html] 动态炫彩渐变背景
  • [HTML]Web前端开发技术29(HTML5、CSS3、JavaScript )JavaScript基础——喵喵画网页
  • [LeetCode刷题笔记]1 - 两数之和(哈希表)
  • [LitCTF 2023]Http pro max plus