当前位置: 首页 > news >正文

BIO,NIO,AIO编程实战

写在前面

源码 。
关于IO分类以及IO模型等理论知识,可以参考io之io分类和io模型这篇文章。本文主要来实现Java中相关IO模型实现程序。

1:BIO

blocking io,是Java io中对阻塞IO模型的具体实现。

因为不管是server端还是client端,都需要发送消息给对端,所以我们先来定义一个通道的处理器类负责完成消息发送的工作:

package com.dahuyou.io.model.bio;import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.Charset;public class ChannelHandler {private Socket socket;private Charset charset;public ChannelHandler(Socket socket, Charset charset) {this.socket = socket;this.charset = charset;}public void writeAndFlush(Object msg) {OutputStream out = null;try {out = socket.getOutputStream();out.write((msg.toString()).getBytes(charset));out.flush();} catch (IOException e) {e.printStackTrace();}}public Socket socket() {return socket;}}

再有就是client和server又是有所不同的,哪里不同呢?一个是通道建立成功是行为,以及读取到消息时的行为可能是不同的,所以再来定义一个适配器类来适配这些不同,并且在该类中依赖ChannleHandler,从而拥有向通道中发送消息的能力,代码如下:

package com.dahuyou.io.model.bio;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.charset.Charset;public abstract class ChannelAdapter extends Thread {private Socket socket;private ChannelHandler channelHandler;private Charset charset;public ChannelAdapter(Socket socket, Charset charset) {this.socket = socket;this.charset = charset;while (!socket.isConnected()) {break;}channelHandler = new ChannelHandler(this.socket, charset);channelActive(channelHandler);}@Overridepublic void run() {try {BufferedReader input = new BufferedReader(new InputStreamReader(this.socket.getInputStream(), charset));String str = null;while ((str = input.readLine()) != null) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}channelRead(channelHandler, str);}} catch (IOException e) {e.printStackTrace();}}// 链接通知抽象类public abstract void channelActive(ChannelHandler ctx);// 读取消息抽象类public abstract void channelRead(ChannelHandler ctx, Object msg);}

其中channelActive通道建立方法和channelRead通道数据读取方法作为钩子方法供子类来提供各自场景下的具体实现。

接着,来定义具体的client类:

package com.dahuyou.io.model.bio.client;import com.dahuyou.io.model.bio.ChannelAdapter;
import com.dahuyou.io.model.bio.ChannelHandler;
import java.net.Socket;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;public class BioClientHandler extends ChannelAdapter {public BioClientHandler(Socket socket, Charset charset) {super(socket, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {System.out.println("链接报告LocalAddress:" + ctx.socket().getLocalAddress());ctx.writeAndFlush("hi! BioClient to msg for you \r\n");}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);ctx.writeAndFlush("hi, 我是client,我已经收到你的消息Success!\r\n");}}

定义具体的server类:

package com.dahuyou.io.model.bio.server;import com.dahuyou.io.model.bio.ChannelAdapter;
import com.dahuyou.io.model.bio.ChannelHandler;import java.net.Socket;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;public class BioServerHandler extends ChannelAdapter {public BioServerHandler(Socket socket, Charset charset) {super(socket, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {System.out.println("链接报告LocalAddress:" + ctx.socket().getLocalAddress());ctx.writeAndFlush("hi! BioServer to msg for you \r\n");}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);ctx.writeAndFlush("hi 我是server, 我已经收到你的消息Success!\r\n");}}

接下来就可以定义server和client的服务启动类了,定义server启动类:

package com.dahuyou.io.model.bio.server;import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;//public class BioServer extends Thread {
public class BioServer {private ServerSocket serverSocket = null;public static void main(String[] args) {BioServer bioServer = new BioServer();
//        bioServer.start();bioServer.startServer();}private void startServer() {try {serverSocket = new ServerSocket();serverSocket.bind(new InetSocketAddress(7397));System.out.println("bio server start suc!");while (true) {Socket socket = serverSocket.accept();
//                BioServerHandler handler = new BioServerHandler(socket, Charset.forName("GBK"));BioServerHandler handler = new BioServerHandler(socket, Charset.forName("UTF-8"));handler.start();}} catch (IOException e) {e.printStackTrace();}}/*@Overridepublic void run() {try {serverSocket = new ServerSocket();serverSocket.bind(new InetSocketAddress(7397));System.out.println("bio server start suc!");while (true) {Socket socket = serverSocket.accept();
//                BioServerHandler handler = new BioServerHandler(socket, Charset.forName("GBK"));BioServerHandler handler = new BioServerHandler(socket, Charset.forName("UTF-8"));handler.start();}} catch (IOException e) {e.printStackTrace();}}*/
}

定义client启动类:

package com.dahuyou.io.model.bio.client;import java.io.IOException;
import java.net.Socket;
import java.nio.charset.Charset;public class BioClient {public static void main(String[] args) {try {Socket socket = new Socket("192.168.10.91", 7397);System.out.println("bio client start suc!");BioClientHandler bioClientHandler = new BioClientHandler(socket, Charset.forName("utf-8"));bioClientHandler.start();} catch (IOException e) {e.printStackTrace();}}}

运行server:
在这里插入图片描述
运行client:
在这里插入图片描述
运行过程中:
在这里插入图片描述
附UML图:
在这里插入图片描述

2:NIO

new io,是Java io中对多路复用IO模型的具体实现。

因为不管是server端还是client端,都需要发送消息给对端,所以我们先来定义一个通道的处理器类负责完成消息发送的工作:

package com.dahuyou.io.model.nio;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;public class ChannelHandler {private SocketChannel channel;private Charset charset;public ChannelHandler(SocketChannel channel, Charset charset) {this.channel = channel;this.charset = charset;}public void writeAndFlush(Object msg) {try {byte[] bytes = msg.toString().getBytes(charset);ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();channel.write(writeBuffer);} catch (IOException e) {e.printStackTrace();}}public SocketChannel channel() {return channel;}
}

再有就是client和server又是有所不同的,哪里不同呢?一个是通道建立成功是行为,以及读取到消息时的行为可能是不同的,所以再来定义一个适配器类来适配这些不同,并且在该类中依赖ChannleHandler,从而拥有向通道中发送消息的能力,代码如下:

package com.dahuyou.io.model.nio;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;public abstract class ChannelAdapter extends Thread {private Selector selector;private ChannelHandler channelHandler;private Charset charset;public ChannelAdapter(Selector selector, Charset charset) {this.selector = selector;this.charset = charset;}@Overridepublic void run() {while (true) {try {Thread.sleep(1000);selector.select(1000);  //Selects a set of keys whose corresponding channels are ready for I/OSet<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {key = it.next();it.remove();handleInput(key);}} catch (Exception ignore) {}}}private void handleInput(SelectionKey key) throws IOException {if (!key.isValid()) return;// 客户端SocketChannelClass<?> superclass = key.channel().getClass().getSuperclass();if (superclass == SocketChannel.class){SocketChannel socketChannel = (SocketChannel) key.channel();if (key.isConnectable()) {if (socketChannel.finishConnect()) {channelHandler = new ChannelHandler(socketChannel, charset);channelActive(channelHandler);socketChannel.register(selector, SelectionKey.OP_READ);} else {System.exit(1);}}}// 服务端ServerSocketChannelif (superclass == ServerSocketChannel.class){if (key.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);channelHandler = new ChannelHandler(socketChannel, charset);channelActive(channelHandler);}}if (key.isReadable()) {SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = socketChannel.read(readBuffer);if (readBytes > 0) {readBuffer.flip();byte[] bytes = new byte[readBuffer.remaining()];readBuffer.get(bytes);channelRead(channelHandler, new String(bytes, charset));} else if (readBytes < 0) {key.cancel();socketChannel.close();}}}// 链接通知抽象类public abstract void channelActive(ChannelHandler ctx);// 读取消息抽象类public abstract void channelRead(ChannelHandler ctx, Object msg);}

其中channelActive通道建立方法和channelRead通道数据读取方法作为钩子方法供子类来提供各自场景下的具体实现。

接着,来定义具体的client处理器类,server处理器类:

package com.dahuyou.io.model.nio.client;import com.dahuyou.io.model.nio.ChannelAdapter;
import com.dahuyou.io.model.nio.ChannelHandler;
import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;public class NioClientHandler extends ChannelAdapter {public NioClientHandler(Selector selector, Charset charset) {super(selector, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {try {System.out.println("链接报告LocalAddress:" + ctx.channel().getLocalAddress());ctx.writeAndFlush("hi! NioClient to msg for you \r\n");} catch (IOException e) {e.printStackTrace();}}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);ctx.writeAndFlush("hi i am client, already receive your message!\r\n");}}
package com.dahuyou.io.model.nio.server;import com.dahuyou.io.model.nio.ChannelAdapter;
import com.dahuyou.io.model.nio.ChannelHandler;import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;public class NioServerHandler extends ChannelAdapter {public NioServerHandler(Selector selector, Charset charset) {super(selector, charset);}@Overridepublic void channelActive(ChannelHandler ctx) {try {System.out.println("链接报告LocalAddress:" + ctx.channel().getLocalAddress());ctx.writeAndFlush("hi! NioServer to msg for you \r\n");} catch (IOException e) {e.printStackTrace();}}@Overridepublic void channelRead(ChannelHandler ctx, Object msg) {System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);ctx.writeAndFlush("hi i am nio server!\r\n");}}

server,client main类:

package com.dahuyou.io.model.nio.client;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;public class NioClient {public static void main(String[] args) throws IOException {Selector selector = Selector.open();SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);boolean isConnect = socketChannel.connect(new InetSocketAddress("127.0.0.1", 7397));System.out.println("nio client started suc!");if (isConnect) {socketChannel.register(selector, SelectionKey.OP_READ);} else {socketChannel.register(selector, SelectionKey.OP_CONNECT);}new NioClientHandler(selector, Charset.forName("GBK")).start();}}
package com.dahuyou.io.model.nio.server;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.charset.Charset;public class NioServer {private Selector selector;private ServerSocketChannel socketChannel;public static void main(String[] args) throws IOException {new NioServer().bind(7397);System.out.println("nio server started suc!");}public void bind(int port) {try {selector = Selector.open();socketChannel = ServerSocketChannel.open();socketChannel.configureBlocking(false);socketChannel.socket().bind(new InetSocketAddress(port), 1024);socketChannel.register(selector, SelectionKey.OP_ACCEPT);new NioServerHandler(selector, Charset.forName("GBK")).start();} catch (IOException e) {e.printStackTrace();}}}

运行测试:
在这里插入图片描述
附UML图:
在这里插入图片描述

3:AIO

主要类:

AsynchronousServerSocketChannel:服务端的异步通道类
AsynchronousSocketChannel:客户端的异步通道类
CompletionHandler:事件回调规范接口,连接建立事件,消息可读事件等

所以,异步其实就是基于事件驱动的方式来进行编程了,到处都是回调这种,不是很符合人类的线性思维,需要习惯下。
整体代码结构和bio以及nio比较类似,不同之处在于server类,定义了初始化ChannelInitializer,来做一下初始化的工作,其中数据读取的监听类初始化如下:

public class AioServerChannelInitializer extends ChannelInitializer {@Overrideprotected void initChannel(AsynchronousSocketChannel channel) throws Exception {
//        channel.read(ByteBuffer.allocate(1024), 10, TimeUnit.SECONDS, null, new AioServerHandler(channel, Charset.forName("GBK")));ByteBuffer buffer = ByteBuffer.allocate(1024);channel.read(buffer, buffer, new AioServerHandler(channel, Charset.forName("GBK")));}}

这样当有数据可读时就会调用AioServerHandler,当然AioServerHandler也是CompletionHandler的子类了,也是一个事件回调类。
server启动类如下:

public class AioServer extends Thread {private AsynchronousServerSocketChannel serverSocketChannel;@Overridepublic void run() {try {serverSocketChannel = AsynchronousServerSocketChannel.open(AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 10));serverSocketChannel.bind(new InetSocketAddress(7397));System.out.println("dahuyou-study-netty aio server start done.");// 等待CountDownLatch latch = new CountDownLatch(1);// 通过AioServerChannelInitializer,初始化数据读取的处理器(实现了接口CompletionHandler)serverSocketChannel.accept(this, new AioServerChannelInitializer());// 防止退出latch.await();} catch (Exception e) {e.printStackTrace();}}public AsynchronousServerSocketChannel serverSocketChannel() {return serverSocketChannel;}public static void main(String[] args) {new AioServer().start();}}

client启动类如下:

package com.dahuyou.io.model.aio.client;import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.Future;public class AioClient {public static void main(String[] args) throws Exception {AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
//        Future<Void> future = socketChannel.connect(new InetSocketAddress("127.0.0.1", 7397));
//        socketChannel.connect(new InetSocketAddress("127.0.0.1", 7397));// aio的a体现:指定一个回调类,建立连接成功时回调socketChannel.connect(new InetSocketAddress("127.0.0.1", 7397), null, new CompletionHandler<Void, Object>() {@Overridepublic void completed(Void result, Object attachment) {System.out.println("connect to nio server suc!");ByteBuffer allocate = ByteBuffer.allocate(1024);socketChannel.read(allocate, allocate, new AioClientHandler(socketChannel, Charset.forName("GBK")));}@Overridepublic void failed(Throwable exc, Object attachment) {}});System.out.println("dahuyou-study-netty aio client start done.");
//        future.get();// 读数据// clientChannel.read(readBuffer, readBuffer, new ReadHandler(clientChannel, latch));
//        socketChannel.read(ByteBuffer.allocate(1024), null, new AioClientHandler(socketChannel, Charset.forName("GBK")));Thread.sleep(10000000);}}

运行cient输出:

dahuyou-study-netty aio client start done.
connect to nio server suc!
链接报告信息:/127.0.0.1:7397
nio client receive: congratulations, connected to aio server suc!

server对应输出:

dahuyou-study-netty aio server start done.
链接报告信息:/127.0.0.1:63537
nio server receive:nio client send back msg!

程序有点问题,就是,server只能给client发一个消息,用netassit测试也有这个问题,对这块不太熟,哪位大哥大姐有空帮给看看,发现问题还请留言告知,感谢!!!,问题对应的代码位置如下:
在这里插入图片描述

写在后面

参考文章列表

io之io分类和io模型 。

UML一一 类图关系 (泛化、实现、依赖、关联、聚合、组合) 。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 游戏开发设计模式之责任链模式
  • MyBatis 源码解析:配置文件结构与自定义实现详解
  • 等保测评入门
  • VScode误删文件恢复或恢复之前版本记录
  • 银河麒麟V10如何安装本地deb软件包?(以安装wps为例)
  • 数仓工具使用Docker部署DolphinScheduler 3.2.0 (分布式任务调度系统)-单机模式
  • -Wl,-rpath= 编译器链接器指定动态库路径 与 LD_LIBRARY_PATH
  • 期末九天从入门到精通操作数据库(mysql)
  • 猫头虎 分享:Python库 SymPy 的简介、安装、用法详解入门教程 ‍
  • 鹏城杯 2022 取证writeup
  • OD C卷 - 结对编程
  • 计算机毕业设计 高校学术交流平台 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试
  • python实用教程(二):安装配置Pycharm及使用(Win10)
  • 【MySQL】 黑马 MySQL进阶 笔记
  • 解决 VMware 中 Ubuntu文件系统磁盘空间不足
  • [PHP内核探索]PHP中的哈希表
  • 【剑指offer】让抽象问题具体化
  • JS基础篇--通过JS生成由字母与数字组合的随机字符串
  • Laravel Mix运行时关于es2015报错解决方案
  • Node 版本管理
  • Python学习之路13-记分
  • React-flux杂记
  • SegmentFault 社区上线小程序开发频道,助力小程序开发者生态
  • vue 配置sass、scss全局变量
  • 等保2.0 | 几维安全发布等保检测、等保加固专版 加速企业等保合规
  • 猴子数据域名防封接口降低小说被封的风险
  • 基于Javascript, Springboot的管理系统报表查询页面代码设计
  • 坑!为什么View.startAnimation不起作用?
  • 那些被忽略的 JavaScript 数组方法细节
  • 判断客户端类型,Android,iOS,PC
  • 我感觉这是史上最牛的防sql注入方法类
  • 你学不懂C语言,是因为不懂编写C程序的7个步骤 ...
  • ​香农与信息论三大定律
  • #传输# #传输数据判断#
  • $.ajax中的eval及dataType
  • (11)MATLAB PCA+SVM 人脸识别
  • (33)STM32——485实验笔记
  • (C11) 泛型表达式
  • (Java岗)秋招打卡!一本学历拿下美团、阿里、快手、米哈游offer
  • (二)Pytorch快速搭建神经网络模型实现气温预测回归(代码+详细注解)
  • (附源码)ssm高校实验室 毕业设计 800008
  • (附源码)ssm捐赠救助系统 毕业设计 060945
  • (附源码)计算机毕业设计SSM智能化管理的仓库管理
  • (剑指Offer)面试题41:和为s的连续正数序列
  • (切换多语言)vantUI+vue-i18n进行国际化配置及新增没有的语言包
  • .bat批处理(六):替换字符串中匹配的子串
  • .bat批处理(七):PC端从手机内复制文件到本地
  • .desktop 桌面快捷_Linux桌面环境那么多,这几款优秀的任你选
  • .mat 文件的加载与创建 矩阵变图像? ∈ Matlab 使用笔记
  • .Net - 类的介绍
  • .NET : 在VS2008中计算代码度量值
  • .NET C# 使用 iText 生成PDF
  • .NET Framework 服务实现监控可观测性最佳实践
  • .NET 中选择合适的文件打开模式(CreateNew, Create, Open, OpenOrCreate, Truncate, Append)
  • .NET/C# 使用 SpanT 为字符串处理提升性能