当前位置: 首页 > news >正文

NIO源码阅读

  自己对着源码敲一遍练习,写上注释。发现NIO编程难度好高啊。。虽然很复杂,但是NIO编程的有点还是很多:

  1、客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECTION等待后续结果,不需要像BIO的客户端一样被同步阻塞。

  2、SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/O通信模型就可以处理其他的链路,不需要同步等待这个链路可用。

  3、线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,没有连接句柄的限制,那么Selector线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降。所以它非常适合做高性能、高负载的网络服务器。

  TimeClient:

 1 package nio;
 2 
 3 public class TimeClient {
 4     public static void main(String args[]){
 5         int port = 8080;
 6         if(args != null && args.length > 0){
 7             try{
 8                 port = Integer.valueOf(args[0]);
 9             }catch(NumberFormatException e){
10                 //采用默认值
11             }
12         }
13         new Thread(new TimeClientHandle("120.0.0.1",port),"TimeClient-001").start();
14     }
15 }

TimeClientHandler:

  1 package nio;
  2 
  3 import java.io.IOException;
  4 import java.net.InetSocketAddress;
  5 import java.nio.ByteBuffer;
  6 import java.nio.channels.SelectionKey;
  7 import java.nio.channels.Selector;
  8 import java.nio.channels.SocketChannel;
  9 import java.util.Iterator;
 10 import java.util.Set;
 11 
 12 public class TimeClientHandle implements Runnable{
 13     private String host;
 14     private int port;
 15     private Selector selector;
 16     private SocketChannel socketChannel;
 17     private volatile boolean stop;
 18     
 19     public TimeClientHandle(String host,int port){
 20         this.host = host == null ? "127.0.0.1" : host;
 21         this.port = port;
 22         try{
 23             selector = Selector.open();
 24             socketChannel = SocketChannel.open();
 25             socketChannel.configureBlocking(false);
 26         }catch(IOException e){
 27             e.printStackTrace();
 28             System.exit(1);
 29         }
 30     }
 31     
 32     
 33     public void run() {
 34         //发送请求连接
 35         try{
 36             doConnect();
 37         }catch(IOException e){
 38             e.printStackTrace();
 39             System.exit(1);
 40         }
 41         while(!stop){
 42             try{
 43                 selector.select(1000);
 44                 Set<SelectionKey> selectedKeys = selector.selectedKeys();
 45                 Iterator<SelectionKey> it = selectedKeys.iterator();
 46                 SelectionKey key = null;
 47                 //当有就绪的Channel时,执行handleInput(key)方法
 48                 while(it.hasNext()){
 49                     key = it.next();
 50                     it.remove();
 51                     try{
 52                         handleInput(key);
 53                     }catch(Exception e){
 54                         if(key != null){
 55                         key.cancel();
 56                             if(key.channel() != null){
 57                                 key.channel().close();
 58                             }
 59                         }
 60                     }
 61                 }
 62             }catch(Exception e){
 63                 e.printStackTrace();
 64                 System.exit(1);
 65             }
 66         }
 67         
 68         //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
 69         if(selector != null){
 70             try{
 71                 selector.close();
 72             }catch(IOException e){
 73                 e.printStackTrace();
 74             }
 75         }
 76 
 77     }
 78     
 79     
 80     private void handleInput(SelectionKey key) throws IOException{
 81         if(key.isValid()){
 82             SocketChannel sc = (SocketChannel)key.channel();
 83             //判断是否连接成功
 84             if(key.isConnectable()){
 85                 if(sc.finishConnect()){
 86                     sc.register(selector, SelectionKey.OP_READ);
 87                 }else{
 88                     System.exit(1);
 89                 }
 90             }
 91             
 92             if(key.isReadable()){
 93                 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
 94                 int readBytes = sc.read(readBuffer);
 95                 if(readBytes > 0){
 96                     readBuffer.flip();
 97                         byte[] bytes = new byte[readBuffer.remaining()];
 98                         readBuffer.get(bytes);
 99                         String body = new String(bytes,"UTF-8");
100                         System.out.println("Now is :" + body);
101                         this.stop = true;
102                 }else if(readBytes < 0){
103                     //对端链路关闭
104                     key.cancel();
105                     sc.close();
106                 }else{
107                     ; //读到0字节,忽略
108                 }
109             }
110         }
111     }
112     
113     private void doConnect() throws IOException{
114         //如果直接连接成功,则注册到多路复用器上,发送请求信息,读应答
115         if(socketChannel.connect(new InetSocketAddress(host,port))){
116             socketChannel.register(selector, SelectionKey.OP_READ);
117             doWrite(socketChannel);
118         }else{
119             //说明服务器没有返回TCP祸首应答消息,但这并不代表连接失败,当服务器返回TCP syn-ack消息后,Selector就能够轮训这个SocketChannel处于连接就绪状态
120             socketChannel.register(selector, SelectionKey.OP_CONNECT);
121         }
122     }
123     
124     private void doWrite(SocketChannel sc) throws IOException{
125         byte[] req = "QUERY TIME ORDER".getBytes();
126         ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
127         writeBuffer.put(req);
128         writeBuffer.flip();
129         sc.write(writeBuffer);
130         if(!writeBuffer.hasRemaining()){
131             System.out.println("Send order 2 server succeed.");
132         }
133     }
134 
135 }

TimeServer:

 1 package nio;
 2 
 3 import java.io.IOException;
 4 
 5 public class TimeServer {
 6     
 7     public static void main(String[] args) throws IOException{
 8         int port = 8080;
 9         if(args != null && args.length >0){
10             try{
11                 port = Integer.valueOf(args[0]);
12             }catch(NumberFormatException e){
13                 //采用默认值
14             }
15         }
16         //多路复用类,是一个独立的线程,负责轮训多路复用器Selctor,处理多个客户端的并发接入。
17         MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
18         new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
19         }
20 }

MultiplexerTimeServer:

  1 package nio;
  2 
  3 import java.io.IOException;
  4 import java.net.InetSocketAddress;
  5 import java.nio.ByteBuffer;
  6 import java.nio.channels.SelectionKey;
  7 import java.nio.channels.Selector;
  8 import java.nio.channels.ServerSocketChannel;
  9 import java.nio.channels.SocketChannel;
 10 import java.util.Iterator;
 11 import java.util.Set;
 12 
 13 public class MultiplexerTimeServer implements Runnable {
 14     
 15     private Selector selector;
 16     
 17     private ServerSocketChannel servChannel;
 18     
 19     private volatile boolean stop;
 20 
 21     public MultiplexerTimeServer(int port){
 22         try{
 23             
 24             selector = Selector.open();
 25             servChannel.configureBlocking(false);
 26             //将ServerSocketChannel 设置为异步非阻塞,backlog设置为1024 
 27             servChannel.socket().bind(new InetSocketAddress(port),1024);
 28             //将ServerSocket Channel注册到Selector,监听SelectionKey.OP_ACCEPT操作位,如果初始化失败,则退出
 29             servChannel.register(selector,SelectionKey.OP_ACCEPT);
 30             System.out.println("The time server is start in port:" + port);
 31         }catch(IOException e){
 32             e.printStackTrace();
 33             System.exit(1);
 34         }
 35     }
 36     
 37     public void stop(){
 38         this.stop = true;
 39     }
 40     
 41     public void run() {
 42         while(!stop){
 43             try{
 44                 //遍历时间设置1秒,每隔一秒唤醒一次,当有处于就绪状态的Channel时,selector将返回就绪状态的Channel的SelectionKey集合
 45                 selector.select(1000);
 46                 Set<SelectionKey> selectedKeys = selector.selectedKeys();
 47                 Iterator<SelectionKey> it = selectedKeys.iterator();
 48                 SelectionKey key = null;
 49                 //通过对就绪状态的Channel集合进行迭代,可以进行网络的异步读写操作
 50                 while(it.hasNext()){
 51                     key = it.next();
 52                     it.remove();
 53                     try{
 54                         handleInput(key);
 55                     }catch(Exception e){
 56                         if(key != null){
 57                             key.cancel();
 58                             if(key.channel() != null){
 59                                 key.channel().close();
 60                             }
 61                         }
 62                     }
 63                 }
 64             }catch(Throwable t){
 65                 t.printStackTrace();
 66             }
 67         }
 68         
 69         //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
 70         if(selector != null){
 71             try{
 72                 selector.close();
 73             }catch(IOException e){
 74                 e.printStackTrace();
 75             }
 76         }
 77     }
 78     
 79     //处理新接入的请求消息
 80     private void handleInput(SelectionKey key) throws IOException{
 81         if(key.isValid()){
 82             
 83             //根据SelectionKey的操作位进行判断即可获知网络事件的类型,通过accept接收客户端的连接请求并创建SocketChannel实例,完成上述操作相当于
 84             //完成了TCP的三次握手,TCP物理链路正式建立
 85             if(key.isAcceptable()){
 86                 ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
 87                 SocketChannel sc = ssc.accept();
 88                 sc.configureBlocking(false);
 89                 //Add the new connection tothe selector
 90                 sc.register(selector, SelectionKey.OP_READ);
 91             }
 92             
 93             if(key.isReadable()){
 94                 //Read the data
 95                 
 96                 SocketChannel sc = (SocketChannel)key.channel();
 97                 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
 98                 int readBytes = sc.read(readBuffer);
 99                 if(readBytes > 0){
100                     //将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作
101                     readBuffer.flip();
102                     byte[] bytes = new byte[readBuffer.remaining()];
103                     readBuffer.get(bytes);
104                     String body = new String(bytes,"UTF-8");
105                     System.out.println("The time server receive order: + body");
106                     String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
107                     doWrite(sc,currentTime);
108                 }else if(readBytes < 0){
109                     //对端链路关闭
110                     key.cancel();
111                     sc.close();
112                 }else{
113                     ; //读到0字节,忽略
114                 }
115             }
116         }
117     }
118     
119     private void doWrite(SocketChannel channel,String response) throws IOException{
120         if(response != null && response.trim().length() >0){
121             byte[] bytes = response.getBytes();
122             ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
123             writeBuffer.put(bytes);
124             writeBuffer.flip();
125             channel.write(writeBuffer);
126         }
127     }
128 }

 

相关文章:

  • 上传和设置Mime类型
  • SAP S/4 HANA新变化-FI数据模型
  • 线程与异常
  • 【转载】一个优秀求职者应主动问的一些问题
  • 关于Flux,Vuex,Redux的思考
  • 文字跑马灯
  • 我掌握的linux防火墙知识
  • HttpResponseMessage 调用.net web api
  • CentOS下Samba文件服务器的安装与配置
  • 跟着百度学PHP[4]OOP面对对象编程-12-抽象类
  • 工作中用到的自定义控件
  • MySQL传统主从复制(第一弹)
  • MySQL数据库中日期中包涵零值的问题
  • 添加第三方源需要执行更新的时候报错,倒入公钥
  • JavaScript总结3
  • JavaScript-如何实现克隆(clone)函数
  • 「译」Node.js Streams 基础
  • 【跃迁之路】【641天】程序员高效学习方法论探索系列(实验阶段398-2018.11.14)...
  • 30天自制操作系统-2
  • android 一些 utils
  • java2019面试题北京
  • JavaScript 无符号位移运算符 三个大于号 的使用方法
  • JavaScript对象详解
  • JavaScript设计模式之工厂模式
  • KMP算法及优化
  • Laravel Mix运行时关于es2015报错解决方案
  • laravel5.5 视图共享数据
  • Meteor的表单提交:Form
  • VirtualBox 安装过程中出现 Running VMs found 错误的解决过程
  • win10下安装mysql5.7
  • 阿里研究院入选中国企业智库系统影响力榜
  • 分享几个不错的工具
  • 关于Flux,Vuex,Redux的思考
  • 经典排序算法及其 Java 实现
  • 使用common-codec进行md5加密
  • 我是如何设计 Upload 上传组件的
  • 小程序开发之路(一)
  • 译有关态射的一切
  • #LLM入门|Prompt#1.8_聊天机器人_Chatbot
  • (C#)一个最简单的链表类
  • (HAL)STM32F103C6T8——软件模拟I2C驱动0.96寸OLED屏幕
  • (免费领源码)python+django+mysql线上兼职平台系统83320-计算机毕业设计项目选题推荐
  • (转)大道至简,职场上做人做事做管理
  • (转)微软牛津计划介绍——屌爆了的自然数据处理解决方案(人脸/语音识别,计算机视觉与语言理解)...
  • .NET 指南:抽象化实现的基类
  • .NET/MSBuild 中的发布路径在哪里呢?如何在扩展编译的时候修改发布路径中的文件呢?
  • .net6解除文件上传限制。Multipart body length limit 16384 exceeded
  • .Net8 Blazor 尝鲜
  • .netcore 6.0/7.0项目迁移至.netcore 8.0 注意事项
  • .net下简单快捷的数值高低位切换
  • .NET中使用Redis (二)
  • .ui文件相关
  • ?php echo $logosrc[0];?,如何在一行中显示logo和标题?
  • @column注解_MyBatis注解开发 -MyBatis(15)
  • @selector(..)警告提示