当前位置: 首页 > news >正文

NIO基础

文章目录

  • NIO基础
    • 1. 三大组件
      • 1.1 Channel
      • 1.2 Buffer
      • 1.3 Selector
    • 2. ByteBuffer
      • 2.1 使用方式
      • 2.2 常见方法
      • 2.3 解决黏包问题
    • 3. 网络编程
      • 3.1 阻塞
      • 3.2 非阻塞
      • 3.3 多路复用
      • 3.4 利用多线程优化

NIO基础

1. 三大组件

1.1 Channel

channel 有一点类似于 stream,它就是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而之前的 stream 要么是输入,要么是输出,channel 比 stream 更为底层

channel
buffer

常见的 Channel 有:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

1.2 Buffer

Buffer 是一块可以写入数据,然后可以从中读取数据的内存块。

它与数组类似,但是提供了对数据的结构化访问,并且可以跟踪已经读写了多少数据。

常见的 buffer 有:

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

1.3 Selector

Selector 是 NIO 中实现多路复用的关键组件,允许单个线程管理多个 Channel。

Selector 能够监控多个注册到它的 Channel 上是否有事件发生(如连接建立、数据到达等)。这样,一个单独的线程可以通过轮询 Selector 来判断一组 Channel 是否有就绪状态的事件,从而只需要少量的线程就能处理大量的客户端请求。

使用 Selector 可以显著减少系统资源的开销,特别适用于需要同时处理大量网络连接的情况。

selector
thread
channel
channel
channel

2. ByteBuffer

2.1 使用方式

  1. 向 buffer 写入数据,例如调用 channel.read(buffer)
  2. 调用 flip() 切换至读模式
  3. 从 buffer 读取数据,例如调用 buffer.get()
  4. 调用 clear() 或 compact() 切换至写模式
  5. 重复 1~4 步骤

示例:

@Slf4j
public class BufferDemo {public static void main(String[] args) {try (RandomAccessFile file =new RandomAccessFile("test.txt", "rw")){ByteBuffer buffer = ByteBuffer.allocate(10);FileChannel channel = file.getChannel();while (true){int read = channel.read(buffer);if(read == -1)break;buffer.flip();while(buffer.hasRemaining()){log.info("读取到:{}",(char)buffer.get());}buffer.clear();}} catch (Exception e) {throw new RuntimeException(e);}}
}

2.2 常见方法

分配空间:

Bytebuffer buf = ByteBuffer.allocate(16);

写入数据有两种办法:

  • 调用 channel 的 read 方法
  • 调用 buffer 自己的 put 方法
int readBytes = channel.read(buf);
buf.put((byte)127);

读取数据同样有两种办法:

  • 调用 channel 的 write 方法
  • 调用 buffer 自己的 get 方法
int writeBytes = channel.write(buf);
byte b = buf.get();

get 方法会让 position 读指针向后走,如果想重复读取数据

  • 可以调用 rewind 方法将 position 重新置为 0
  • 或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针

mark 和 reset:

mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的位置

字符串与 ByteBuffer 互转:

ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好");
ByteBuffer buffer2 = Charset.forName("utf-8").encode("你好");debug(buffer1);
debug(buffer2);CharBuffer buffer3 = StandardCharsets.UTF_8.decode(buffer1);
System.out.println(buffer3.getClass());
System.out.println(buffer3.toString());

2.3 解决黏包问题

将错乱的数据恢复成原始的按 \n 分隔的数据:

public static void main(String[] args) {ByteBuffer source = ByteBuffer.allocate(32);source.put("Hello,world\nI'm zhangsan\nHo".getBytes());split(source);source.put("w are you?\nhaha!\n".getBytes());split(source);
}private static void split(ByteBuffer source) {source.flip();int oldLimit = source.limit();for (int i = 0; i < oldLimit; i++) {if (source.get(i) == '\n') {System.out.println(i);ByteBuffer target = ByteBuffer.allocate(i + 1 - source.position());// 0 ~ limitsource.limit(i + 1);target.put(source); // 从source 读,向 target 写debugAll(target);source.limit(oldLimit);}}source.compact();
}

3. 网络编程

3.1 阻塞

阻塞模式下,相关方法都会导致线程暂停

  • ServerSocketChannel.accept 会在没有连接建立时让线程暂停
  • SocketChannel.read 会在没有数据可读时让线程暂停
  • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置
// 使用 nio 来理解阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信log.debug("connecting...");SocketChannel sc = ssc.accept(); // 阻塞方法,线程停止运行log.debug("connected... {}", sc);channels.add(sc);for (SocketChannel channel : channels) {// 5. 接收客户端发送的数据log.debug("before read... {}", channel);channel.read(buffer); // 阻塞方法,线程停止运行buffer.flip();debugRead(buffer);buffer.clear();log.debug("after read...{}", channel);}
}

3.2 非阻塞

  • 非阻塞模式下,相关方法都会不会让线程暂停
    • 在 ServerSocketChannel.accept 在没有连接建立时,会返回 null,继续运行
    • SocketChannel.read 在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept
    • 写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去
  • 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了 cpu
// 使用 nio 来理解非阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,但sc是nullif (sc != null) {log.debug("connected... {}", sc);sc.configureBlocking(false); // 非阻塞模式channels.add(sc);}for (SocketChannel channel : channels) {// 5. 接收客户端发送的数据int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,read 返回 0if (read > 0) {buffer.flip();debugRead(buffer);buffer.clear();log.debug("after read...{}", channel);}}
}

3.3 多路复用

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

  • 多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用
  • 如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证
    • 有可连接事件时才去连接
    • 有可读事件才去读取
    • 有可写事件才去写入

好处

  • 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功
  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换

处理read事件服务器代码:

public static void main(String[] args) throws IOException {// 1. 创建 selector, 管理多个 channelSelector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);// 2. 建立 selector 和 channel 的联系(注册)// SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件SelectionKey sscKey = ssc.register(selector, 0, null);// key 只关注 accept 事件sscKey.interestOps(SelectionKey.OP_ACCEPT);log.debug("sscKey:{}", sscKey);ssc.bind(new InetSocketAddress(8080));while (true) {// 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行// select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理selector.select();// 4. 处理事件, selectedKeys 内部包含了所有发生的事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, readwhile (iter.hasNext()) {SelectionKey key = iter.next();// 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题iter.remove();log.debug("key: {}", key);// 5. 区分事件类型if (key.isAcceptable()) { // 如果是 acceptServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(16); // attachment// 将一个 byteBuffer 作为附件关联到 selectionKey 上SelectionKey scKey = sc.register(selector, 0, buffer);scKey.interestOps(SelectionKey.OP_READ);log.debug("{}", sc);log.debug("scKey:{}", scKey);} else if (key.isReadable()) { // 如果是 readtry {SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel// 获取 selectionKey 上关联的附件ByteBuffer buffer = (ByteBuffer) key.attachment();int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1if(read == -1) {key.cancel();} else {split(buffer);// 需要扩容if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);buffer.flip();newBuffer.put(buffer); // 0123456789abcdef3333\nkey.attach(newBuffer);}}} catch (IOException e) {e.printStackTrace();key.cancel();  // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)}}}}
}

处理write时间服务端代码:

public static void main(String[] args) throws IOException {Selector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.bind(new InetSocketAddress(8080));ssc.configureBlocking(false);ssc.register(selector,SelectionKey.OP_ACCEPT);while(true){selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while(iterator.hasNext()){SelectionKey key = iterator.next();try {if(key.isAcceptable()){SocketChannel sc = ssc.accept();sc.configureBlocking(false);StringBuilder sb = new StringBuilder();for(int i=0;i<10000;i++){sb.append("a");}ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());sc.write(buffer);if(buffer.hasRemaining()){sc.register(selector,SelectionKey.OP_WRITE,buffer);}}else if(key.isWritable()){ByteBuffer attachment = (ByteBuffer)key.attachment();SocketChannel sc = (SocketChannel)key.channel();sc.write(attachment);if(!attachment.hasRemaining()){key.cancel();}}} catch (IOException e) {e.printStackTrace();key.cancel();}}}}

3.4 利用多线程优化

public class ChannelDemo7 {public static void main(String[] args) throws IOException {new BossEventLoop().register();}@Slf4jstatic class BossEventLoop implements Runnable {private Selector boss;private WorkerEventLoop[] workers;private volatile boolean start = false;AtomicInteger index = new AtomicInteger();public void register() throws IOException {if (!start) {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.bind(new InetSocketAddress(8080));ssc.configureBlocking(false);boss = Selector.open();SelectionKey ssckey = ssc.register(boss, 0, null);ssckey.interestOps(SelectionKey.OP_ACCEPT);workers = initEventLoops();new Thread(this, "boss").start();log.debug("boss start...");start = true;}}public WorkerEventLoop[] initEventLoops() {//        EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];for (int i = 0; i < workerEventLoops.length; i++) {workerEventLoops[i] = new WorkerEventLoop(i);}return workerEventLoops;}@Overridepublic void run() {while (true) {try {boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();SocketChannel sc = c.accept();sc.configureBlocking(false);log.debug("{} connected", sc.getRemoteAddress());workers[index.getAndIncrement() % workers.length].register(sc);}}} catch (IOException e) {e.printStackTrace();}}}}@Slf4jstatic class WorkerEventLoop implements Runnable {private Selector worker;private volatile boolean start = false;private int index;private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();public WorkerEventLoop(int index) {this.index = index;}public void register(SocketChannel sc) throws IOException {if (!start) {worker = Selector.open();new Thread(this, "worker-" + index).start();start = true;}tasks.add(() -> {try {SelectionKey sckey = sc.register(worker, 0, null);sckey.interestOps(SelectionKey.OP_READ);worker.selectNow();} catch (IOException e) {e.printStackTrace();}});worker.wakeup();}@Overridepublic void run() {while (true) {try {worker.select();Runnable task = tasks.poll();if (task != null) {task.run();}Set<SelectionKey> keys = worker.selectedKeys();Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();if (key.isReadable()) {SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(128);try {int read = sc.read(buffer);if (read == -1) {key.cancel();sc.close();} else {buffer.flip();log.debug("{} message:", sc.getRemoteAddress());debugAll(buffer);}} catch (IOException e) {e.printStackTrace();key.cancel();sc.close();}}iter.remove();}} catch (IOException e) {e.printStackTrace();}}}}
}

相关文章:

  • Python机器学习:数据预处理与清洗的打开方式
  • 【羊毛资源】华为云开发者云主机免费申请使用指南
  • 线程池:线程池的实现 | 日志
  • 【机器学习(七)】分类和回归任务-K-近邻 (KNN)算法-Sentosa_DSML社区版
  • uniapp微信小程序,获取上一页面路由
  • YOLO11震撼发布!
  • 服装时尚与动漫游戏的跨界联动:创新运营与策划策略研究
  • erlang学习:Linux命令学习8
  • 完整网络模型训练(一)
  • 思科dhcp的配置
  • Elasticsearch学习笔记(3)
  • C++:一文搞懂友元类(friend class)
  • SpringMVC源码-AbstractHandlerMethodMapping处理器映射器将@Controller修饰类方法存储到处理器映射器
  • uniapp中uni.request的统一封装 (ts版)
  • 【易上手快捷开发新框架技术】用Flet从零开始分步骤循序渐进编程实现购物清单助手手机应用app示例掰烂嚼碎深度讲解源代码IDE运行和调试通过截图为证
  • create-react-app做的留言板
  • Java基本数据类型之Number
  • Java应用性能调优
  • js中的正则表达式入门
  • PHP变量
  • PHP面试之三:MySQL数据库
  • puppeteer stop redirect 的正确姿势及 net::ERR_FAILED 的解决
  • React-flux杂记
  • vue中实现单选
  • 初探 Vue 生命周期和钩子函数
  • 解决iview多表头动态更改列元素发生的错误
  • 目录与文件属性:编写ls
  • 我与Jetbrains的这些年
  • 大数据全解:定义、价值及挑战
  • ​3ds Max插件CG MAGIC图形板块为您提升线条效率!
  • ​Java并发新构件之Exchanger
  • ​queue --- 一个同步的队列类​
  • ‌[AI问答] Auto-sklearn‌ 与 scikit-learn 区别
  • (C语言版)链表(三)——实现双向链表创建、删除、插入、释放内存等简单操作...
  • (delphi11最新学习资料) Object Pascal 学习笔记---第13章第1节 (全局数据、栈和堆)
  • (Git) gitignore基础使用
  • (实测可用)(3)Git的使用——RT Thread Stdio添加的软件包,github与gitee冲突造成无法上传文件到gitee
  • (四)linux文件内容查看
  • (转)ORM
  • (转)大道至简,职场上做人做事做管理
  • (转)关于pipe()的详细解析
  • (转)甲方乙方——赵民谈找工作
  • (状压dp)uva 10817 Headmaster's Headache
  • .“空心村”成因分析及解决对策122344
  • .net 8 发布了,试下微软最近强推的MAUI
  • .net core 6 redis操作类
  • .net core MVC 通过 Filters 过滤器拦截请求及响应内容
  • .net core 外观者设计模式 实现,多种支付选择
  • .NET 的静态构造函数是否线程安全?答案是肯定的!
  • .net 流——流的类型体系简单介绍
  • .NET/C# 使窗口永不获得焦点
  • .NET/C# 使用 #if 和 Conditional 特性来按条件编译代码的不同原理和适用场景
  • .vue文件怎么使用_vue调试工具vue-devtools的安装
  • /etc/shadow字段详解
  • ::before和::after 常见的用法