NettyのEventLoopChannel
Netty的重要组件:EventLoop、Channel、Future & Promise、Handler & Pipeline、ByteBuf
本篇主要介绍Netty的EventLoop和Channel组件。
1、Netty入门案例
服务器端的创建,主要分为以下步骤:
- 创建serverBootstrap对象。
- 配置服务器的线程模型。可以指定两个线程模型,parentGroup是专门负责接收连接的线程模型,childGroup是处理读写事件的工作线程模型。
- 设置 Channel 类型,这里使用NioServerSocketChannel,是基于NIO实现的,还有其他的实现如下:
- 配置 Channel 初始化器,使用ChannelInitializer初始化NioSocketChannel,在这里我们配置了处理字符串编码以及打印字符串。
- 绑定端口。
ServerBootstrap serverBootstrap = new ServerBootstrap();//接受连接的线程 or 工作线程serverBootstrap.group(new NioEventLoopGroup())//服务器ServerSocketChannel的实现 是NIO 还是 BIO.channel(NioServerSocketChannel.class).childHandler(//初始化与客户端进行数据读写的通道,并且添加别的handler 在连接建立后回调new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel channel) throws Exception {//处理编码(解码 )channel.pipeline().addLast(new StringDecoder());//自己的业务逻辑,比如打印字符串channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);}});}}).bind(8080);
客户端的创建步骤与服务器端创建的步骤类似,但是在connect方法后需要加上.sync(),以及调用writeAndFlush()方法进行收发数据。(为什么会这样后面会说明)
new Bootstrap().group(new NioEventLoopGroup())//客户端SocketChannel的实现.channel(NioSocketChannel.class)//初始化与服务器进行数据读写的通道,并且添加别的handler 在连接建立后回调.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//字符输出编码ch.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress("localhost",8080)).sync()//阻塞方法 连接建立成功后才会执行.channel().writeAndFlush("netty");//收发数据,会调用ch.pipeline().addLast(new StringEncoder());所添加的处理器
启动程序看一下效果(先启动服务器,再启动客户端)
2、EventLoop
下面我们开始介绍Netty中的第一个组件,配置服务器的线程模型时设置的EventLoop。
EventLoop类关系图:
我们首先看下EventLoop的NIO实现:NioEventLoopGroup的构造方法:
可以看到它有多个重载的构造方法:
在构造方法中可以指定线程的数量,如果使用的是无参构造方法,那么默认传递的线程数是0,但是会把0传递给其他的构造方法:
最后调用父类的构造:
在父类的构造中,会判断传入的线程参数是否为0。
目前很显然条件成立,就会获取成员变量DEFAULT_EVENT_LOOP_THREADS 并且再次作为参数调用父类的构造。成员变量DEFAULT_EVENT_LOOP_THREADS 会在静态代码块中被赋值。
如果能获取到"io.netty.eventLoopThreads" key对应的value,就以该值为准,否则线程数是当前cpu核心数*2 。
//NioEventLoopGroup构造方法指定线程数 如果不指定为CPU可运行核心数 * 2
NioEventLoopGroup loopGroup = new NioEventLoopGroup(2);
一个 EventLoop由一个单独的线程驱动,它不断轮询 I/O 事件并执行相应的任务:
log.debug("{}",loopGroup.next());
log.debug("{}",loopGroup.next());
log.debug("{}",loopGroup.next());
并且EventLoop的内部维护了一个任务队列,可以提交任务到这个队列中,由 EventLoop的线程顺序执行。
向EventLoop提交任务:
//执行普通任务
loopGroup.next().submit(() -> log.debug("run..."));
//执行定时任务
loopGroup.next().scheduleAtFixedRate((Runnable) () -> log.debug("run with schedule..."),0,1, TimeUnit.SECONDS);
下面通过一个案例来加深对EventLoop的理解:
改造最初的入门案例中的代码,主要体现在.group方法,这次传递了两个参数,将负责连接的EventLoop和负责读写的EventLoop分离开,并且给负责读写的EventLoop设置了两个线程。
//boss只负责接受连接 1个线程 worker负责读写 2个线程
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
服务端的代码和之前的相同,我们先启动一个服务端,再启动三个客户端:
三个连接(channel)由两个EventLoop轮询处理,并且每个连接(channel)和EventLoop是绑定的,一个EventLoop可以负责多条消息的处理。
如图所示:
同时EventLoop也有不同的实现:
案例进一步细化,我们可以再设置一个EventLoop 专门处理其他任务:
EventLoopGroup eventLoop = new DefaultEventLoopGroup();
改造.childHandler:
.childHandler(new ChannelInitializer<NioSocketChannel>() {/*** 工序有多道,合在一起就是 pipeline,* pipeline 负责发布事件(读、读取完成...)传播给每个 handler,* handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)* @param ch* @throws Exception*/@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast("handle1",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = ((ByteBuf) msg);log.debug(byteBuf.toString(Charset.defaultCharset()));//责任链模式,将消息传递个下一个处理器ctx.fireChannelRead(msg);}}).addLast(eventLoop,"handle2",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = ((ByteBuf) msg);log.debug(byteBuf.toString(Charset.defaultCharset()));}});}})
启动一个客户端:
相当于链式调用:
小结:
- 在创建EventLoopGroup实例时,可以指定线程数,如果没有指定,默认使用cpu核心数*2。
- 调用ServerBootstrap的.group时,可以将EventLoop细化成为专门处理连接以及负责读写的。
- 可以利用pipeline()的.addLast,链式组合多个EventLoopGroup,实现不同的功能。
- 每一个EventLoop可以轮询处理多个Channel事件,但是会和Channel绑定,线程间相互独立。
3、Channel
Channel在Netty中代表一个可以执行I/O操作(如读、写、连接、绑定等)的对象。它可以是一个套接字连接、文件、管道等。
Netty支持多种类型的Channel,包括NioSocketChannel(用于客户端连接)、NioServerSocketChannel(用于服务端绑定)以及其他专门用途的Channel如EmbeddedChannel等。
Channel的I/O操作都是异步的,会返回一个ChannelFuture对象,用于表示操作的结果:
ChannelFuture channelFuture = new Bootstrap()// 。。。。。。.connect(new InetSocketAddress("localhost", 8080));
并且每个Channel都有一个与之关联的ChannelPipeline。Pipeline中包含多个ChannelHandler,每个Handler负责特定的处理逻辑。
为什么在案例代码中,客户端写出数据之前必须要调用.sync()方法?
因为Channel的I/O操作都是异步的,是主线程调用了.connect() 方法,但是建立连接是在NioSocketChannel所在线程。.sync()方法的作用就是让主线程在此处阻塞,等到NioSocketChannel所在线程建立完成连接,主线程才会继续向下执行。(如果不使用.sync()方法,主线程会在连接没有建立完成的时候继续执行后续代码,服务端无法正常接受消息。)
与此类似的还有.close()方法,如果我们想在断开连接后执行一段自己的逻辑:
客户端代码:
//... new BootStrap...//接受控制台输入,q则断开连接new Thread(()->{Scanner scanner = new Scanner(System.in);while (true){String str = scanner.nextLine();if ("q".equals(str)){//关闭channel连接channel.close();break;}channel.writeAndFlush(str);}},"input").start();
可以将执行断开后逻辑的代码放在input 线程的channel.close();后,或者主线程中吗?
答案是否定的:
- 如果放在主线程中,那么input 线程和主线程是并行执行的,无法控制先后顺序。
- 如果放在input 线程的channel.close(); 后,也是不行的。因为channel.close(); 方法也是异步调用,由NioSocketChannel所在线程负责关闭连接:
解决该问题有两个方案:
方案一的思路和解决.connect() 方法异步调用的类似,都是使用.sync()方法阻塞主线程,等待input 线程中的channel.close(); 执行完成后再由主线程处理连接关闭后释放资源
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
log.debug("连接关闭后释放资源");
第二种方案是通过调用ChannelFuture的.addListener 方法,添加一个监听器,由nioEventLoopGroup所在线程关闭连接后,处理连接关闭后释放资源(释放资源和处理后续都是nioEventLoopGroup同一线程。):
closeFuture.addListener((ChannelFutureListener) future -> log.debug("连接关闭后释放资源"));