当前位置: 首页 > news >正文

JavaNIO——单线程(笔记)

文章目录

  • 一、 三大组件
    • 1.1 Channel & Buffer
    • 1.2 Selector
  • 二、 ByteBuffer字节缓存
    • 2.1 结构
    • 2.2 堆内存与直接内存
    • 2.3 读与写
    • 2.4 Scattering Reads与Gathering Writes
    • 2.5 简单处理黏包与半包
  • 三、FileChannel文件编程
    • 3.1 读取
    • 3.2 写入
    • 3.3 关闭
    • 3.4 位置
    • 3.5 大小
    • 3.6 强制写入
    • 3.7 两个Channel传输数据
    • 3.8 Path
      • 3.8.1 操作
      • 3.8.2 遍历目录文件
      • 3.8.3 遍历文件下的Jar包
      • 3.8.4 批量删除
      • 3.8.5 批量复制
  • 四、ServerSocketChannel网络编程
    • 4.1 模拟阻塞模式
    • 4.2 非阻塞模式
    • 4.3 nio-selector处理accept
    • 4.4 nio-selector处理read
      • 4.4.1 用完key为什么要移除
      • 4.4.2 处理异常断开与正常断开
      • 4.4.3 消息边界问题与附件
        • 4.4.3.1 ByteBuffer 大小的分配
      • 4.4.4 写入内容与写入内容过多
    • 4.5 方法
  • 参考文献

non-blocking io 非阻塞IO

一、 三大组件

1.1 Channel & Buffer

Channel:是指数据传输的双向通道。
Buffer:数据暂存区域,暂存Channel中的数据。是应用程序、文件、网络之间的桥梁。

Channel有一点类似于Stream,是读写数据的双向通道。可以从Channel将数据读入Buffer,也可以Buffer数据写入Channel。Stream要么写入要么输出,Channel比Stream更底层。

常见的Channel有:

  • FileChannel:文件传输通道
  • DatagramChannel:UDP网络编程时的通道
  • SocketChannel:TCP数据传输通道,客户端与服务器都可以用
  • ServerSocketChannel:专用于服务器TCP传输通道

常见Buffer有:

  • ByteBuffer:
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

Buffer是一个抽象类,所有上面所有的Buffer都是它的子类

public abstract class Buffer{
	//mark <= position <= limit <= capacity
	private int mark = -1;
	private int position = 0;
	private int limit;
	private int capacity;
	//直接缓冲区实现子类的数据内存地址
	long address;
}

1.2 Selector

选择器。我们由如下场景来理解。

对于服务器,一个客户端连接,一个socket。我们该如何管理socket?

  1. 多线程版本

每一个socket由线程来管理。

thread1
socket1
thread2
socket2
thread3
socket3

一旦连接多起来,线程也会很多,因为线程本身占用资源较多,将导致服务器被线程暂用过多,很快就会内存溢出。

因此就会有如下问题:

  • 内存占用高
  • 线程上下文切换成本高
  • 只适合连接数少的场景
  1. 线程池版本

有线程池限制线程数目,避免上述问题

结束连接断开
接收新的socket
treadPool
thread1
thread2
thread3
thread4
socket1
socket3
socket2
socket4

缺点

  • 阻塞模式下,线程仅能处理一个socket连接。如果socket什么都没做,该线程只能等待,不能处理其他请求。只有socket断开后,才可以处理新的socket。
  • 仅适合短连接场景
  1. selector版
thread
selector
channel1
channel2
channel3

selector是一个用于检测所有需求的工具。
多路复用。配合线程管理多个channel,获取channel上发生的事件,这些channel工作在非阻塞模式下,不会让线程一直在一个线程上等待、适合连接数多,流量低的场景(low traffic)

调用selector的select()会阻塞直到channel发生了读写就绪事件,这些事件发生,select方法就会返回这些时间交给thread来处理。

二、 ByteBuffer字节缓存

创建一个项目,并创建如下测试文件
在这里插入图片描述
接着在单元测试里面创建一个测试文件
在这里插入图片描述

package com.yjx23332.netty.test;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class TestByteBuffer {
    public static void main(String[] args){
        //FileChannel
        //输入输出流读取 或者 RandomAccessFile 读取
        //相对路径,从更目录开始
        try(FileChannel fileChannel = new FileInputStream("data.txt").getChannel()){
            //准备缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(30);//单位:字节
            //read:从Channel中读出数据,写入byteBuffer
            fileChannel.read(byteBuffer);
            //打印Buffer
            byteBuffer.flip();//切换至读模式
            while(byteBuffer.hasRemaining()){//是否还有剩余维度数据
                byte b = byteBuffer.get();//1次1个字节
                System.out.print((char)b);
            }
        }catch(IOException ioException){

        }
    }
}

在这里插入图片描述
如果内容大于缓冲区,读取不完,则用

package com.yjx23332.netty.test;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class TestByteBuffer {
    public static void main(String[] args){
        //FileChannel
        //输入输出流读取 或者 RandomAccessFile 读取
        try(FileChannel fileChannel = new FileInputStream("data.txt").getChannel()){
            //准备缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(10);//单位:
            while(true){
                //从Channel中读出数据,向buffer中写入,返回字节数,-1:为空
                int len = fileChannel.read(byteBuffer);
                if(len == -1){//内容为空
                    break;
                }
                //打印Buffer
                byteBuffer.flip();//切换至读模式
                while(byteBuffer.hasRemaining()){//是否还有剩余维度数据
                    byte b = byteBuffer.get();//1次1个字节
                    System.out.print((char)b);
                }
                //切换为写模式
                //byteBuffer.compact(); 也可以进行切换为写模式
                byteBuffer.clear();
            }
        }catch(IOException ioException){

        }
    }
}

2.1 结构

  • capacity:容量
  • position:读写指针
  • limit:读写限制
    在这里插入图片描述
    写模式,写入a,b,c,d,e
    在这里插入图片描述

flip动作,变为读模式
在这里插入图片描述

读取3字节
在这里插入图片描述

clear操作,切换为写模式并清空
在这里插入图片描述
compact操作
在这里插入图片描述
我们导入如下依赖,和编写如下工具类

		<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.6.5</version>
    </parent>
    <groupId>org.example</groupId>
    <artifactId>untitled</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
    </dependencies>
</project>
package com.yjx23332.netty.test;

import io.netty.util.internal.MathUtil;
import io.netty.util.internal.StringUtil;

import java.nio.ByteBuffer;

import static io.netty.util.internal.MathUtil.isOutOfBounds;
import static io.netty.util.internal.StringUtil.NEWLINE;

public class ByteBufferUtil {
   private static final char[] BYTE2CHAR = new char[256];
   private static final char[] HEXDUMP_TABLE = new char[256 * 4];
   private static final String[] HEXPADDING = new String[16];
   private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
   private static final String[] BYTE2HEX = new String[256];
   private static final String[] BYTEPADDING = new String[16];

   static {
       final char[] DIGITS = "0123456789abcdef".toCharArray();
       for (int i = 0; i < 256; i++) {
           HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
           HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
       }

       int i;

       // Generate the lookup table for hex dump paddings
       for (i = 0; i < HEXPADDING.length; i++) {
           int padding = HEXPADDING.length - i;
           StringBuilder buf = new StringBuilder(padding * 3);
           for (int j = 0; j < padding; j++) {
               buf.append("   ");
           }
           HEXPADDING[i] = buf.toString();
       }

       // Generate the lookup table for the start-offset header in each row (up to 64KiB).
       for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
           StringBuilder buf = new StringBuilder(12);
           buf.append(StringUtil.NEWLINE);
           buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
           buf.setCharAt(buf.length() - 9, '|');
           buf.append('|');
           HEXDUMP_ROWPREFIXES[i] = buf.toString();
       }

       // Generate the lookup table for byte-to-hex-dump conversion
       for (i = 0; i < BYTE2HEX.length; i++) {
           BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
       }

       // Generate the lookup table for byte dump paddings
       for (i = 0; i < BYTEPADDING.length; i++) {
           int padding = BYTEPADDING.length - i;
           StringBuilder buf = new StringBuilder(padding);
           for (int j = 0; j < padding; j++) {
               buf.append(' ');
           }
           BYTEPADDING[i] = buf.toString();
       }

       // Generate the lookup table for byte-to-char conversion
       for (i = 0; i < BYTE2CHAR.length; i++) {
           if (i <= 0x1f || i >= 0x7f) {
               BYTE2CHAR[i] = '.';
           } else {
               BYTE2CHAR[i] = (char) i;
           }
       }
   }

   /**
    * 打印所有内容
    * @param buffer
    */
   public static void debugAll(ByteBuffer buffer) {
       int oldlimit = buffer.limit();
       buffer.limit(buffer.capacity());
       StringBuilder origin = new StringBuilder(256);
       appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
       System.out.println("+--------+-------------------- all ------------------------+----------------+");
       System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
       System.out.println(origin);
       buffer.limit(oldlimit);
   }

   /**
    * 打印可读取内容
    * @param buffer
    */
   public static void debugRead(ByteBuffer buffer) {
       StringBuilder builder = new StringBuilder(256);
       appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
       System.out.println("+--------+-------------------- read -----------------------+----------------+");
       System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
       System.out.println(builder);
   }

   private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
       if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) {
           throw new IndexOutOfBoundsException(
                   "expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
                           + ") <= " + "buf.capacity(" + buf.capacity() + ')');
       }
       if (length == 0) {
           return;
       }
       dump.append(
               "         +-------------------------------------------------+" +
                       StringUtil.NEWLINE + "         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |" +
                       StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+");

       final int startIndex = offset;
       final int fullRows = length >>> 4;
       final int remainder = length & 0xF;

       // Dump the rows which have 16 bytes.
       for (int row = 0; row < fullRows; row++) {
           int rowStartIndex = (row << 4) + startIndex;

           // Per-row prefix.
           appendHexDumpRowPrefix(dump, row, rowStartIndex);

           // Hex dump
           int rowEndIndex = rowStartIndex + 16;
           for (int j = rowStartIndex; j < rowEndIndex; j++) {
               dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
           }
           dump.append(" |");

           // ASCII dump
           for (int j = rowStartIndex; j < rowEndIndex; j++) {
               dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
           }
           dump.append('|');
       }

       // Dump the last row which has less than 16 bytes.
       if (remainder != 0) {
           int rowStartIndex = (fullRows << 4) + startIndex;
           appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

           // Hex dump
           int rowEndIndex = rowStartIndex + remainder;
           for (int j = rowStartIndex; j < rowEndIndex; j++) {
               dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
           }
           dump.append(HEXPADDING[remainder]);
           dump.append(" |");

           // Ascii dump
           for (int j = rowStartIndex; j < rowEndIndex; j++) {
               dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
           }
           dump.append(BYTEPADDING[remainder]);
           dump.append('|');
       }

       dump.append(StringUtil.NEWLINE +
               "+--------+-------------------------------------------------+----------------+");
   }

   private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
       if (row < HEXDUMP_ROWPREFIXES.length) {
           dump.append(HEXDUMP_ROWPREFIXES[row]);
       } else {
           dump.append(StringUtil.NEWLINE);
           dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
           dump.setCharAt(dump.length() - 9, '|');
           dump.append('|');
       }
   }

   public static short getUnsignedByte(ByteBuffer buffer, int index) {
       return (short) (buffer.get(index) & 0xFF);
   }
}

接下来试试使用它

package com.yjx23332.netty.test;


import java.nio.ByteBuffer;

import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;

public class TestByteBuffer {
    public static void main(String[] args){
        ByteBuffer byteBuffer = ByteBuffer.allocate(10);
        byteBuffer.put((byte) 0x61);
        debugAll(byteBuffer);
    }

}

在这里插入图片描述

我们试验一下我们上面的理论

package com.yjx23332.netty.test;


import java.nio.ByteBuffer;

import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;

public class TestByteBuffer {
    public static void main(String[] args){
        ByteBuffer byteBuffer = ByteBuffer.allocate(10);
        byteBuffer.put(new byte[]{ 0x61,0x63,0x64,'a','b','c'});
        debugAll(byteBuffer);
        //获取position的结果,即index=6的值,也就是空
        //get()会让position移动
        System.out.println(byteBuffer.get());
        //获取index=1的值
        //该方式不会移动position
        System.out.println(byteBuffer.get(0));
        byteBuffer.flip();
        System.out.println(byteBuffer.get());
        System.out.println(byteBuffer.get());
        byteBuffer.compact();
        debugAll(byteBuffer);
    }

}

2.2 堆内存与直接内存

package com.yjx23332.netty.test;


import java.nio.ByteBuffer;


public class TestByteBuffer {
    public static void main(String[] args){
        /**
         * HeapByteBuffer
         * 使用的是Java堆内存,效率较低
         * 会受到GC回收的影响
         * */
        System.out.println(ByteBuffer.allocate(16).getClass());
        /**
         * DirectByteBuffer
         * 使用的是直接内存,效率较高,因为少一次数据的拷贝
         * 不会收到GC回收影响
         * 分配内存效率较低,因为不受GC管理,因此要自己管理,避免内存占用
         * */
        System.out.println(ByteBuffer.allocateDirect(16).getClass());
    }

}


2.3 读与写

package com.yjx23332.netty.test;


import java.nio.ByteBuffer;

import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;

public class TestByteBuffer {
    public static void main(String[] args){
        ByteBuffer byteBuffer = ByteBuffer.allocate(10);
        byteBuffer.put(new byte[]{ 'a','b','c','d','e','f','g'});
        byteBuffer.flip();
        /**
         * 读4个字节
         * get(new byte[])会让position移动
         */
        System.out.println(byteBuffer.get(new byte[4]));
        debugAll(byteBuffer);
		        
		//该方式不会移动position
		System.out.println((char) byteBuffer.get(4));
		debugAll(byteBuffer);

        /**
         * 将 position重新设为0
         * */
        byteBuffer.rewind();
        //get()会让position移动
        System.out.println((char)byteBuffer.get());

        /**
         * mark & reset
         * 增强rewind
         * mark 做一个标记,记录position的位置
         * reset 将position重置到mark的位置
         * */
        System.out.println((char) byteBuffer.get());
        System.out.println((char) byteBuffer.get());
        byteBuffer.mark();
        System.out.println((char) byteBuffer.get());
        System.out.println((char) byteBuffer.get());
        byteBuffer.reset();
        System.out.println((char) byteBuffer.get());
        System.out.println((char) byteBuffer.get());

    }
}


package com.yjx23332.netty.test;


import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;

public class TestByteBuffer {
    public static void main(String[] args){
        // 1. 字符串转为 ByteBuffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(16);
        //默认使用操作系统编码
        byteBuffer.put("hello".getBytes());
        //指定编码集
        //byteBuffer.put("hello".getBytes(StandardCharsets.UTF_8));
        //使用
        debugAll(byteBuffer);

        //2. Charset
        //转换后,自动切换到读模式
        //使用操作系统默认字符集 Charset.defaultCharset()
        //指定编码集
        ByteBuffer byteBuffer1 = StandardCharsets.UTF_8.encode("hello");
        debugAll(byteBuffer1);

        //3.wrap,自动切换到读模式
        ByteBuffer byteBuffer2 = ByteBuffer.wrap("hello".getBytes());
        debugAll(byteBuffer2);

        //byteBuffer是写模式,此时会有问题
        System.out.println(StandardCharsets.UTF_8.decode(byteBuffer).toString());
        System.out.println(StandardCharsets.UTF_8.decode(byteBuffer1).toString());
        System.out.println(StandardCharsets.UTF_8.decode(byteBuffer2).toString());
    }

}

2.4 Scattering Reads与Gathering Writes

Scattering Reads分散读取:把一个文件分别读取到多个ByteBuffer之中(读取的个数已知)。
准备如下文件
在这里插入图片描述

package com.yjx23332.netty.test;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;

public class TestScatteringReads {
    public static void main(String[] args){
        try(FileChannel channel = new RandomAccessFile("words.txt","r").getChannel()){
            ByteBuffer b1 = ByteBuffer.allocate(3);
            ByteBuffer b2 = ByteBuffer.allocate(3);
            ByteBuffer b3 = ByteBuffer.allocate(5);
            channel.read(new ByteBuffer[]{b1,b2,b3});
            b1.flip();
            b2.flip();
            b3.flip();
            debugAll(b1);
            debugAll(b2);
            debugAll(b3);
        }catch (IOException ioException){

        }
    }
}

Gathering Writes集中写入

package com.yjx23332.netty.test;


import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;


public class TestGatheringWrites {
    public static void main(String[] args){
        ByteBuffer byteBuffer1 = StandardCharsets.UTF_8.encode("hello");
        ByteBuffer byteBuffer2 = StandardCharsets.UTF_8.encode("world");
        ByteBuffer byteBuffer3 = StandardCharsets.UTF_8.encode("你好世界");

        try(FileChannel fileChannel = new RandomAccessFile("words2.txt","rw").getChannel()){
            fileChannel.write(new ByteBuffer[]{byteBuffer1,byteBuffer2,byteBuffer3});

        }catch (IOException ioException){

        }

    }

}

在这里插入图片描述

2.5 简单处理黏包与半包

在给服务器发送信息时
黏包:两个消息合在一起。发生原因:提高发送效率,一次性发多个消息。
半包:一个消息被截成两段。发送原因:数据过多,一个包装不下只能分开。

package com.yjx23332.netty.test;

import java.nio.ByteBuffer;

import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;

public class TestByteBufferExam {

    public static void main(String[] args){
        /**
         * 模拟分两次接收到消息
         * */
        ByteBuffer source = ByteBuffer.allocate(32);
        /**
         * 黏包
         * */
        source.put("hello,world\nI'm zhangsan\nHo".getBytes());
        split(source);
        /**
         * 半包
         * */
        source.put("w are you?\n".getBytes());
        split(source);
    }

    private  static  void split(ByteBuffer source){
        source.flip();
        for(int i = 0;i < source.limit();i++){
            // 找到一条完整消息
            if(source.get(i) == '\n'){
                int length = i + 1 - source.position();
                // 把这条完整消息存入新的 ByteBuffer
                ByteBuffer target = ByteBuffer.allocate(length);
                //从source读,向target写
                for(int j = 0; j < length;j++)
                    target.put(source.get());
                debugAll(target);
            }
        }
        //没有找到说明半包
        source.compact();
    }
}

三、FileChannel文件编程

FileChannel只能工作在阻塞模式下,这里和IO是一致的。

我们不能直接获取FileChannel,必须通过FileInputStream、FileOutputStream或者RandomAccessFile来获取FileChannel,它们都有getChannel方法。

  • 通过FileInputStream获取的channel只能读
  • 通过FileOutputStream获取的channel只能写
  • 通过RandomAccessFile是否能读写根据构造RandomAccessFile时的读写模式决定.

3.1 读取

从channel读取数据填充ByteBuffer,返回值读到了多少字节,-1表示到达文件末尾
流程可以参考1.3开头

int readByte = channel.read(buffer);

3.2 写入

ByteBuffer buffer = ...;
buffer.put(...);//存入数据
buffer.flip();//切换读写模式
while(buffer.hasRemaining()){
	channel.write(buffer);
}

3.3 关闭

channel必须关闭,但调用了FileInputStream、FileOutputStream或者RandomAccessFile的close方法,也会间接的调用close方法。

3.4 位置

获取当前位置

long pos = channel.position();

设置当前位置

long newPos = ...;
channel.position(newPos);

如果设置为文件末尾,这时读取会返回-1,这是写入会追加内容
但是如果超过了末尾,在写入时,新内容和与原末尾之间会有空洞(00)

3.5 大小

size方法可获取文件大小

3.6 强制写入

操作系统出于性能的考虑,会将数据缓存,不是立即写入磁盘,可以用force(true)方法,将文件内容和元数据(文件的权限等信息)立即写入磁盘。

3.7 两个Channel传输数据

x.transferTo():x传给…数据
x.transferFrom():从…传给x数据

package com.yjx23332.netty.test;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;

public class TestFileChannelTransferTo {
    public static void main(String[] args){
        try(
                FileChannel from = new FileInputStream("data.txt").getChannel();
                FileChannel to = new FileOutputStream("to.txt").getChannel();
                ){
            /**
             * 效率高,底层利用操作系统的零拷贝进行优化
             * 一次最多传输2G
             * */
            from.transferTo(0,from.size(),to);
        }
        catch (IOException ioException){
            ioException.printStackTrace();
        }
    }
}

在这里插入图片描述
由于一次最多传输2G,于是我们需要进行改进

package com.yjx23332.netty.test;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;

public class TestFileChannelTransferTo {
    public static void main(String[] args){
        try(
                FileChannel from = new FileInputStream("data.txt").getChannel();
                FileChannel to = new FileOutputStream("to.txt").getChannel();
                ){
            /**
             * 效率高,底层利用操作系统的零拷贝进行优化
             * 一次最多传输2G
             * */
            long size = from.size();
            for(long left = size; left > 0;){
                /**
                 * 返回实际传输字符
                 * @param 位置,个数,目标
                 */
                System.out.println("position:" + (size - left) + ",left:" + left);
                left = left - from.transferTo(size - left,left,to);
            }
        }
        catch (IOException ioException){
            ioException.printStackTrace();
        }
    }
}

3.8 Path

jdk7引入了Path和Paths类

  • Path用来表示文件路径
  • Paths是工具类,用来获取Path实例

3.8.1 操作

Path source = Paths.get("1.txt");		//相对路径 使用user.dir环境变量来定位 1.txt
Path source = Paths.get("d:\\1.txt");	//绝对路径 代表了 d:\1.txt
Path source = Paths.get("d:/1.txt");	//绝对路径 代表了 d:\1.txt
Path source = Paths.get("d:\\data","projects");		//代表了 d:\data\projects
  • . :当前路径
  • …: 上级路径

检查文件是否存在

Files.exists(path)

创建一级目录
已经存在则报异常,且只能创建一级目录

Files.createDirectory(path);

创建多级目录

Files.createDirectories(path);

拷贝文件
如果文件存在,则会报异常FileAlreadyExistsException
用的操作系统的实现

Files.copy(sourcePath,targetPath);

如果希望用source 覆盖掉target,则要用StandardCopyOption.REPLACE_EXISTING
StandardCopyOption可以放多个,用‘,’隔开即可

Files.copy(source,target, StandardCopyOption.REPLACE_EXISTING);

移动文件
StandardCopyOption.ATOMIC_MOVE保证移动的原子性

 Files.move(source,target, StandardCopyOption.ATOMIC_MOVE);

删除目录、文件
如果文件不存在,则会抛异常NoSuchFileException
如果目录中有内容,则会抛异常 DirectoryNotEmptyException

Files.delete(target);

3.8.2 遍历目录文件

package com.yjx23332.netty.test;

import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.atomic.AtomicInteger;

public class TestFilesWalkFileTree {
    public  static void main(String[] args) throws IOException {
        AtomicInteger dirCount = new AtomicInteger();
        AtomicInteger fileCount = new AtomicInteger();
       	// 访问者模式
        Files.walkFileTree(Paths.get("路径"),new SimpleFileVisitor<Path>(){
            @Override
            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                System.out.println("====>"+dir);
                dirCount.incrementAndGet();
                return super.preVisitDirectory(dir, attrs);
            }

            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                System.out.println("【"+file+"】");
                fileCount.incrementAndGet();
                return super.visitFile(file, attrs);
            }
        });
        System.out.println("dir count:" + dirCount);
        System.out.println("file count:" + fileCount);
    }
}

最后发现多了一个文件夹,是因为windows找到的文件夹不包含最外层的文件夹
在这里插入图片描述

3.8.3 遍历文件下的Jar包

package com.yjx23332.netty.test;

import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.atomic.AtomicInteger;

public class TestFilesWalkFileTree {
    public  static void main(String[] args) throws IOException {
        AtomicInteger jarCount = new AtomicInteger();
        Files.walkFileTree(Paths.get("C:\\Program Files\\Java"),new SimpleFileVisitor<Path>(){
            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                if(file.toString().endsWith(".jar")){
                    System.out.println(file);
                    jarCount.incrementAndGet();
                }
                return super.visitFile(file, attrs);
            }
        });
        System.out.println("jar count:" + jarCount);
    }
}

在这里插入图片描述

3.8.4 批量删除

注意该方式删除会直接删除,不会进入回收站,谨慎使用。


package com.yjx23332.netty.test;

import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;

public class TestFilesWalkFileTree {
    public  static void main(String[] args) throws IOException {
        Files.walkFileTree(Paths.get("路径"),new SimpleFileVisitor<Path>(){
            @Override
            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                System.out.println("=====> 进入 dir:" + dir);
                return super.preVisitDirectory(dir, attrs);
            }

            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                Files.delete(file);
                System.out.println("=====> 删除 file:" + file);
                return super.visitFile(file, attrs);
            }

            @Override
            public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
                Files.delete(dir);
                System.out.println("=====> 删除 dir:" + dir);
                return super.postVisitDirectory(dir, exc);
            }
        });
    }
}

3.8.5 批量复制

package com.yjx23332.netty.test;

import java.io.IOException;
import java.nio.file.*;



public class TestFilesWalkFileTree {
    public  static void main(String[] args) throws IOException {
        String source = "D:\\test1";
        String target = "D:\\test2";
        Files.walk(Paths.get(source)).forEach(path -> {
            try {
                String targetName = path.toString().replace(source, target);
                //如果是目录
                if (Files.isDirectory(path)) {
                    Files.createDirectory(Paths.get(targetName));
                } else {//如果是文件
                    Files.copy(path, Paths.get(targetName));
                }
            }catch (IOException ioException){
                ioException.printStackTrace();
            }
        });
    }
}

四、ServerSocketChannel网络编程

为方便打印,我们引入

  		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

4.1 模拟阻塞模式

我们故意在以下代码中使用多次阻塞模式
同时以debug模式,运行下列两个代码文件
客户端在此处打上断点
在这里插入图片描述

package com.yjx23332.netty.test;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;

import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //阻塞模式
        //0. 声明字节缓存
        ByteBuffer buffer = ByteBuffer.allocate(16);
        //1.创建服务器
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        //2.绑定监听端口
        serverSocketChannel.bind(new InetSocketAddress(8080));

        //3.连接集合
        List<SocketChannel> channels = new ArrayList<>();
        while(true){
            //4. accept建立与客户端连接,SocketChannel用来与客户端通信
            log.debug("connecting...");
            /**
             * 在没有连接建立时,线程停止运行,阻塞在这里
             * 每一次建立连接之后,才会继续执行
             * */
            SocketChannel socketChannel = serverSocketChannel.accept();
            log.debug("connected...{}",socketChannel);
            channels.add(socketChannel);
            for(SocketChannel channel:channels){
                //5. 接收客户端发送的数据
                log.debug("before read..{}",channel);
                /**
                 * 也是阻塞方法,线程也会在这里等待读入数据
                 * 客户端没有发送数据,则会在这里停止
                 */
                channel.read(buffer);
                //6. 调试buffer
                buffer.flip();
                debugAll(buffer);
                buffer.clear();
                log.debug("after read...{}",channel);
            }
        }

    }
}

package com.yjx23332.netty.test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;

public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost",8080));
        System.out.println("waiting");
    }
}

在客户端中,对其通道我们手动填写数据,看服务器反应。
在这里插入图片描述
发送如下数据
在这里插入图片描述
服务器部分
在这里插入图片描述

如果我们再次发送,将无法继续执行。因为在accept处阻塞了。正常情况,我们用一个线程来管理一个accept,这就和我们之前的线程和线程池的架构。

4.2 非阻塞模式

修改服务端

package com.yjx23332.netty.test;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;


import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //非阻塞模式
        //0. 声明字节缓存
        ByteBuffer buffer = ByteBuffer.allocate(16);
        //1.创建服务器
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //2. 切换为非阻塞模式
        serverSocketChannel.configureBlocking(false);

        //3.绑定监听端口
        serverSocketChannel.bind(new InetSocketAddress(8080));

        //4.连接集合
        List<SocketChannel> channels = new ArrayList<>();
        while(true){
            //5. accept建立与客户端连接,SocketChannel用来与客户端通信
            //log.debug("connecting...");
            /**
             * 在没有连接建立时,线程将不会停下来
             * 此时返回的是null
             * */
            SocketChannel socketChannel = serverSocketChannel.accept();
            if(socketChannel != null){
                log.debug("connected...{}",socketChannel);
                //6. 设置为非阻塞模式
                socketChannel.configureBlocking(false);
                channels.add(socketChannel);
            }
            for(SocketChannel channel:channels){
                //7. 接收客户端发送的数据
                //log.debug("before read..{}",channel);
                /**
                 * 客户端没有发送数据,也不会在这里停止
                 * 返回内容为0
                 */
                int read = channel.read(buffer);
                if(read == 0)
                    continue;
                //8. 调试buffer
                buffer.flip();
                debugAll(buffer);
                buffer.clear();
                log.debug("after read...{}",channel);
            }
        }

    }
}

在这里插入图片描述

4.3 nio-selector处理accept

在没有连接和输入时,CPU就一直在那里空转,浪费资源。通过Selector让CPU在没有事件要处理时,就休息。
首先我们先了解一下,事件类型

事件解释
accept有连接请求时,触发
connect客户端侧连接建立后,触发
read客户端有数据发出,服务器有数据可读时,触发
write可写事件,服务器写入时,触发
package com.yjx23332.netty.test;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;


import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //1. 创建selector,管理多个Channel
        Selector selector = Selector.open();

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);

        //2. 建立 selector 和 channel 的联系 (注册)
        /**
         * SelectionKey 就是事件发生后,通过他可以知道事件发生了以及是哪个channel事件关注了
         * 此处就是serverSocketChannel关注了SelectionKey.OP_ACCEPT事件,注册在selector
         * @param selector,关注的事件(0就是都不关注),附件
         * */
        SelectionKey sscKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT,null);
        log.debug("register key:{}",sscKey);
        serverSocketChannel.bind(new InetSocketAddress(8080));
        while(true){
            //3. select 方法,事件未发生阻塞在此处。一旦事件发生,线程恢复,继续运行
            selector.select();
            //4. 处理事件,获取事件keys
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while(iter.hasNext()){
                SelectionKey key = iter.next();
                log.debug("key:{}",key);
                ServerSocketChannel SSChannel = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = SSChannel.accept();
                log.debug("连接建立:{}",socketChannel);

            }
            log.debug("connected");
        }

    }
}

可以看到,有多个连接,但他们的key是一致的。

在这里插入图片描述

  • 有未处理完的事件将不会阻塞,如果我们在
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while(iter.hasNext()){
                SelectionKey key = iter.next();
                log.debug("key:{}",key);
//                ServerSocketChannel SSChannel = (ServerSocketChannel) key.channel();
//                SocketChannel socketChannel = SSChannel.accept();
//                log.debug("连接建立:{}",socketChannel);

            }

后什么都不做,他会认为我们还没有处理完,就会继续处理,就不会阻塞
在这里插入图片描述

  • 反之,处理之后,就会阻塞

我们也可以取消事件

key.cancel();

也就是说,事件要么处理要么取消,不能置之不理。

4.4 nio-selector处理read

package com.yjx23332.netty.test;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;


import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //1. 创建selector,管理多个Channel
        Selector selector = Selector.open();

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);

        //2. 建立 selector 和 channel 的联系 (注册)
        SelectionKey sscKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT,null);
        log.debug("register key:{}",sscKey);
        serverSocketChannel.bind(new InetSocketAddress(8080));
        while(true){
            //3. select 方法,事件未发生阻塞在此处。一旦事件发生,线程恢复,继续运行
            selector.select();
            //4. 处理事件,获取事件keys
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while(iter.hasNext()){
                SelectionKey key = iter.next();
                log.debug("key:{}",key);
                //5. 区分事件
                if(key.isAcceptable()){
                    ServerSocketChannel SSChannel = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = SSChannel.accept();
                    socketChannel.configureBlocking(false);
                    log.debug("连接建立:{}",socketChannel);
                    SelectionKey scKey = socketChannel.register(selector,SelectionKey.OP_READ,null);
                    log.debug("register read key:{}",scKey);
                }
                else if(key.isReadable()){
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(16);
                    socketChannel.read(buffer);
                    buffer.flip();
                    debugAll(buffer);
                }

            }
        }

    }
}

运行后传入值使用read之后会报错,这是因为我们没有移除key。可以看到,我们这里报错报的是我们accept事件发生了,但是此时我们没有连接请求,因此可以加入的为空。
在这里插入图片描述

4.4.1 用完key为什么要移除

那么为什么会触发accept事件?
注意,下图中绘制有遗漏,accept是属于sscKey,read是属于scKey。
在这里插入图片描述

package com.yjx23332.netty.test;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;


import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //1. 创建selector,管理多个Channel
        Selector selector = Selector.open();

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);

        //2. 建立 selector 和 channel 的联系 (注册)
        SelectionKey sscKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT,null);
        log.debug("register key:{}",sscKey);
        serverSocketChannel.bind(new InetSocketAddress(8080));
        while(true){
            //3. select 方法,事件未发生阻塞在此处。一旦事件发生,线程恢复,继续运行
            selector.select();
            //4. 处理事件,获取事件keys
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while(iter.hasNext()){
                SelectionKey key = iter.next();
                //5. 处理 key 时,从集合中移除
                iter.remove();
                log.debug("key:{}",key);
                //6. 区分事件
                if(key.isAcceptable()){
                    ServerSocketChannel SSChannel = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = SSChannel.accept();
                    socketChannel.configureBlocking(false);
                    log.debug("连接建立:{}",socketChannel);
                    SelectionKey scKey = socketChannel.register(selector,SelectionKey.OP_READ,null);
                    log.debug("register read key:{}",scKey);
                }
                else if(key.isReadable()){
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(16);
                    socketChannel.read(buffer);
                    buffer.flip();
                    debugAll(buffer);
                }

            }
        }

    }
}

成功处理
在这里插入图片描述

4.4.2 处理异常断开与正常断开

我们发现,如果强制关闭Client,服务器会报错关闭。这显然是有问题的,因为我们不能因为一个客户端掉线就断了整个服务。

那么为什么会报错?

  • 当客户端连接异常断开时,为了让服务器知道断开了连接,会产生OP_READ事件。但此时消息无法被读取,因此报读写错误。
  • 如果只是简单的try,catch,就会认为没有处理,接着又会进入到下一次事件触发中,然后read又一次报错,一直循环下去。
  • 因此我们还要cancel掉该事件

对于正常断开,也会触发OP_READ事件,但是它的消息是正规的,不过结果是长度-1。我们通过判断是否为-1来决定是否是断开连接。


package com.yjx23332.netty.test;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;


import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //1. 创建selector,管理多个Channel
        Selector selector = Selector.open();

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);

        //2. 建立 selector 和 channel 的联系 (注册)
        SelectionKey sscKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT,null);
        log.debug("register key:{}",sscKey);
        serverSocketChannel.bind(new InetSocketAddress(8080));
        while(true){
            //3. select 方法,事件未发生阻塞在此处。一旦事件发生,线程恢复,继续运行
            selector.select();
            //4. 处理事件,获取事件keys
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while(iter.hasNext()){
                SelectionKey key = iter.next();
                //5. 处理 key 时,从集合中移除
                iter.remove();
                log.debug("key:{}",key);
                //6. 区分事件
                if(key.isAcceptable()){
                    ServerSocketChannel SSChannel = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = SSChannel.accept();
                    socketChannel.configureBlocking(false);
                    log.debug("连接建立:{}",socketChannel);
                    SelectionKey scKey = socketChannel.register(selector,SelectionKey.OP_READ,null);
                    log.debug("register read key:{}",scKey);
                }
                else if(key.isReadable()){
                    try {
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(16);
                        if(socketChannel.read(buffer) == -1){
                            key.cancel();
                            continue;
                        }
                        buffer.flip();
                        debugAll(buffer);
                    }catch (IOException ioException){
                        ioException.printStackTrace();
                        key.cancel(); //注销该事件,从selector集合中删除
                    }

                }

            }
        }

    }
}


package com.yjx23332.netty.test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;

public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost",8080));
        System.out.println("waiting");
        socketChannel.close();
    }
}

4.4.3 消息边界问题与附件

消息边界,因为消息的长度不缺定,预先创建的缓存过小,导致一个消息被分为两次传输。或者缓存过大,两个消息合在一起了。(半包和黏包)

解决思路

  1. 固定消息长度,数据包大小一样,服务器按预定长度读取,缺点就是浪费带宽
  2. 按找分隔符号分割字符。我们通过分隔符好来确定是否获取完整。同时需要一个临时ByteBuffer,但如果消息比ByteBuffer长,同样要考虑扩容。需要一个字节一个字节地找,因此效率也不是很高
  3. 报消息分为两部分,前半部分存储了后续内容的长度,随后在发送内容。服务器先读一个整型(比如)那么服务器就分配该整型的大小,随后再接收内容。缺点是如果内容过大,则影响server吞吐量。

第三种方式,很类似TLV和LTV格式:type类型、Length长度、Value数据。在type类型,Length长度已知情况下,较方便的分配和获取消息。

  • Http 1.1 是TLV格式
  • Http 2.0 是LTV格式

当读不完的时候,会自动读取多次。

我们使用方法2,方法3则在Netty部分。

package com.yjx23332.netty.test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost",8080));
        socketChannel.write(Charset.defaultCharset().encode("hello\nworld!\nThis\nis\na\nnew\nday\n"));
        socketChannel.close();
    }
}
package com.yjx23332.netty.test;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;


import static com.yjx23332.netty.test.ByteBufferUtil.debugAll;

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //1. 创建selector,管理多个Channel
        Selector selector = Selector.open();

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);

        //2. 建立 selector 和 channel 的联系 (注册)
        SelectionKey sscKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, null);
        log.debug("register key:{}", sscKey);
        serverSocketChannel.bind(new InetSocketAddress(8080));
        while (true) {
            //3. select 方法,事件未发生阻塞在此处。一旦事件发生,线程恢复,继续运行
            selector.select();
            //4. 处理事件,获取事件keys
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                //5. 处理 key 时,从集合中移除
                iter.remove();
                log.debug("key:{}", key);
                //6. 区分事件
                if (key.isAcceptable()) {
                    ServerSocketChannel SSChannel = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = SSChannel.accept();
                    socketChannel.configureBlocking(false);
                    log.debug("连接建立:{}", socketChannel);
                    //7. 添加附件
                    /**
                     * buffer与socketChannel关联,buffer的声明周期,将和绑定的SelectionKey相同
                     * */
                    SelectionKey scKey = socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(4));
                    log.debug("register read key:{}", scKey);
                } else if (key.isReadable()) {
                    try {
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        //8. 获取附件
                        /**
                         * 如果容量不够,就创建一个新的去重新关联即可
                         * key.attach(buffer)
                         * */
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        if (socketChannel.read(buffer) == -1) {
                            key.cancel();
                            continue;
                        }
                        split(buffer);
                        //9. 如果切割后,当前位置和最大位置相同,说明需要扩容
                        if(buffer.position() == buffer.limit()){
                            ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() << 1);
                            buffer.flip();
                            newBuffer.put(buffer);
                            key.attach(newBuffer);
                        }
                    } catch (IOException ioException) {
                        ioException.printStackTrace();
                        key.cancel(); //注销该事件,从selector集合中删除
                    }

                }

            }
        }
    }
    private  static  void split(ByteBuffer source){
        source.flip();
        for(int i = 0;i < source.limit();i++){
            // 找到一条完整消息
            if(source.get(i) == '\n'){
                int length = i + 1 - source.position();
                // 把这条完整消息存入新的 ByteBuffer
                ByteBuffer target = ByteBuffer.allocate(length);
                //从source读,向target写
                for(int j = 0; j < length;j++)
                    target.put(source.get());
                debugAll(target);
            }
        }
        //没有找到说明半包
        source.compact();
    }
}

4.4.3.1 ByteBuffer 大小的分配

每个channel都需要记录可能被切分的消息,因为Bytebuffer不能被多个channel共同使用,因此需要为Chanel维护一个独立的ByteBuffer

ByteBuffer不能太大,我们上面只考虑扩容,而Netty还做到了缩容。

  • 一种思路是先分配较小的buffer,如果过发现数据不够,在分配更大的。有点事消息连续容易处理,但是会耗费性能
  • 可以用数组组成buffer,发现不够,就把多出来的数据写入新的数组,与前面的消息不连续解析比较复杂,优点是避免了拷贝引起的性能损耗

4.4.4 写入内容与写入内容过多

当我们写入数据过多的时候,网络缓冲区会写满,于是就会不断尝试。我们可以通过SelectionKey来判断,如果有空间可以继续写的时候,再继续。而不是不断地尝试。

package com.yjx23332.netty.test;


import org.springframework.expression.spel.ast.Selection;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

public class WriteServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);

        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        serverSocketChannel.bind(new InetSocketAddress("localhost",8080));
        while(true){
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while(iter.hasNext()){
                SelectionKey key = iter.next();
                iter.remove();
                if(key.isAcceptable()){
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    SelectionKey scKey = socketChannel.register(selector,0,null);
                    //1. 向客户端发送大量数据
                    StringBuilder stringBuilder = new StringBuilder();
                    for(int i = 0;i < 300000000;i++){
                        stringBuilder.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(stringBuilder.toString());
                    //2. 先写入尝试全部写入channel
                    int write = socketChannel.write(buffer);
                    //输出实际写入多少
                    System.out.println(write);
                    //3. 如果过缓冲区还有剩余内容
                    if(buffer.hasRemaining()){
                        //4. 关注可写事件
                        /**
                         * 避免新的关注把原来的关注覆盖了,我们可以用如下两种方式进行
                         * */
                        scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
                        //scKey.interestOps(scKey.interestOps() | SelectionKey.OP_WRITE);
                        //5. 将未写完的数据挂在到其中
                        scKey.attach(buffer);
                    }
                }
                else if(key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    int write = socketChannel.write(buffer);
                    System.out.println(write);
                    //6. 清理工作
                    if (!buffer.hasRemaining()){
                        key.attach(null);
                        //7. 不再关注可写事件
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                    }
                }
            }
        }
    }
}


package com.yjx23332.netty.test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;


public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost",8080));
        //接收数据
        int count = 0;
        ByteBuffer buffer = ByteBuffer.allocate(1024*1024);
        while(true){
            count += socketChannel.read(buffer);
            System.out.println(count);
            buffer.clear();
        }
    }
}

4.5 方法

阻塞直到绑定发生

int count = selector.select();

阻塞直到绑定事件发生 ,或者超时

int count = selector.select(long timeout);

不会则色,不管有没有事件发生,都立即返回,自己根据返回值进行判

int count = selector.selectNow();

count是事件数目。

参考文献

[1]黑马程序员Netty全套教程

相关文章:

  • SpringBoot工程中Dubbo多协议使用方式
  • 阿里首次开源 Java 10万字八股文,Github仅一天星标就超60K
  • 凯文凯利10条人生建议,送给迷茫的你!(上)
  • The DAO事件始末
  • 工业智能网关BL110应用之六: 支持PLC,Modbus,BACnet,电表等协议列表
  • Jackson公司蛋白质印迹指南丨样品制备
  • 数字IC必学之《Skill入门教程》
  • Operator 基础原理和概念
  • Math对象常用的方法
  • MySQL高级篇——锁
  • 【编程题】【Scratch二级】2022.06 画正方形
  • 【学习笔记】内存的连续分配管理方式
  • 图解 cv2.HoughLines cv2.line 参数原理
  • 使用简易网络实现二分类
  • XAML笔记
  • css的样式优先级
  • javascript面向对象之创建对象
  • Leetcode 27 Remove Element
  • LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
  • node.js
  • Python - 闭包Closure
  • react 代码优化(一) ——事件处理
  • Spring Boot MyBatis配置多种数据库
  • SpringBoot几种定时任务的实现方式
  • UMLCHINA 首席专家潘加宇鼎力推荐
  • vue2.0一起在懵逼的海洋里越陷越深(四)
  • 动态规划入门(以爬楼梯为例)
  • 码农张的Bug人生 - 初来乍到
  • 漂亮刷新控件-iOS
  • ​【原创】基于SSM的酒店预约管理系统(酒店管理系统毕业设计)
  • ​flutter 代码混淆
  • (二)Eureka服务搭建,服务注册,服务发现
  • (附源码)spring boot公选课在线选课系统 毕业设计 142011
  • (附源码)计算机毕业设计ssm本地美食推荐平台
  • (五)Python 垃圾回收机制
  • (终章)[图像识别]13.OpenCV案例 自定义训练集分类器物体检测
  • .NET CORE 3.1 集成JWT鉴权和授权2
  • .NET Core使用NPOI导出复杂,美观的Excel详解
  • .NET WebClient 类下载部分文件会错误?可能是解压缩的锅
  • .NET 除了用 Task 之外,如何自己写一个可以 await 的对象?
  • .NET 使用 JustAssembly 比较两个不同版本程序集的 API 变化
  • .sh 的运行
  • @NestedConfigurationProperty 注解用法
  • @SuppressWarnings(unchecked)代码的作用
  • @Valid和@NotNull字段校验使用
  • [AR Foundation] 人脸检测的流程
  • [BZOJ1040][P2607][ZJOI2008]骑士[树形DP+基环树]
  • [C#]猫叫人醒老鼠跑 C#的委托及事件
  • [C++]C++基础知识概述
  • [cb]UIGrid+UIStretch的自适应
  • [github全教程]github版本控制最全教学------- 大厂找工作面试必备!
  • [IDF]被改错的密码
  • [LeetCode] Max Points on a Line
  • [Linux]进程信号(信号入门 | 信号产生的方式 | 信号捕捉初识)
  • [moka同学笔记]yii表单dropdownlist样式