NIO 基础
什么是 NIO
- Java NIO 全称 Java non-blocking IO,指的是 JDK 提供的新 API。从 JDK 1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO,即 New IO,是
同步非阻塞
的。 - NIO 相关类都放在 java.nio 包下,并对原 java.io 包中很多类进行了改写。
- NIO 有三大核心部分:
Channel(管道)
、Buffer(缓冲区)
、Selector(选择器)
。 - NIO 是面向
缓冲区
编程的。数据读取到了一个它稍微处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞的高伸缩性网络。 - Java NIO 的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用数据,如果目前没有可用数据时,则说明不会获取,而不是保持线程阻塞,所以直到数据变为可以读取之前,该线程可以做其他事情。非阻塞写入同理。
三大核心组件
Channel 的基本介绍
NIO 的通道类似于流,但有如下区别:
- 通道是双向的可以进行读写,而流是单向的只能读,或者写
- 通道可以实现异步读写数据
- 通道可以从缓冲区读取数据,也可以写入数据到缓冲区
四种通道:
- FileChannel :从文件中读写数据
- DatagramChannel:通过 UDP 协议,读写网络中的数据
- SocketChannel:能通过 TCP 协议来读写网络中数据,常用于客户端
- ServerSocketChannel:监听 TCP 连接,对每个新的连接会创建一个 SocketChannel
Buffer(缓冲区)基本介绍
NIO 中的 Buffer 用于 NIO 通道(Channel)进行交互。
缓冲区本质上是一个可以读写数据的内存块,可以理解为是一个容器对象(含数组)
,该对象提供了一组方法
,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。
当向 Buffer 写入数据时,Buffer 会记录下写了多少数据,一旦要读取数据,需要通过flip()
方法将 Buffer 从写模式切换到读模式。在读模式下,可以读取之前写入到 Buffer 的所有数据。
当读完了所有数据,就需要清空缓存区,让它可以再次被写入。有两种方式能清空缓冲区,调用clear()
或者compact()
方法。
clear()方法会清空整个缓冲区。compact()方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。
Channel 提供从文件、网络读取数据的渠道,但是读取或者都必须经过 Buffer。在 Buffer 子类中维护着一个对应类型的数组,用来存放数据。
Selector 的基本介绍
- Java 的 NIO 使用了非阻塞的 I/O 方式。可以用一个线程处理若干个客户端连接,就会使用到 Selector(选择器)
- Selector 能够检测到多个注册通道上是否有事件发生(多个 Channel 以事件的形式注册到同一个 selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理
- 只有在连接真正有读写事件发生时,才会进行读写,减少了系统开销,并且不必为每个连接都创建一个线程,不用维护多个线程
- 避免了多线程之间上下文切换导致的开销
Selector 的特点
Netty 的 I/O 线程 NioEventLoop 聚合了 Selector(选择器 / 多路复用器),可以并发处理成百上千个客户端连接。
当线程从某客户端 Socket 通道进行读写时,若没有数据可用,该线程可以进行其他任务。
线程通常将非阻塞 I/O 的空闲时间用于其他通道上执行 I/O 操作,所以单独的线程可以管理多个输入输出通道。
由于读写操作都是非阻塞的,就可以充分提高 I/O 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构性能、弹性伸缩能力和可靠性都得到极大地提升。
ByteBuffer 的基本使用
核心依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.36.Final</version> </dependency>
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/28 * @Description ByteBuffer基本使用,读取文件内容并打印 */ public class ByteBufferTest { public static void main(String[] args) { //获取channel try (FileChannel channel = new FileInputStream("data.txt").getChannel()) { //创建ByteBuffer final ByteBuffer buffer = ByteBuffer.allocate(1024); //读取文件内容,并存入buffer channel.read(buffer); //切换为读模式 buffer.flip(); while (buffer.hasRemaining()) { System.out.print((char) buffer.get()); } //清空缓冲区,并重置为写模式 buffer.clear(); } catch (IOException e) { e.printStackTrace(); } } }
输出结果:
1234567890abc
ByteBuffer 的结构
Buffer 中定义了四个属性来提供所其包含的数据元素。
// Invariants: mark <= position <= limit <= capacity private int mark = -1; private int position = 0; private int limit; private int capacity;
- capacity:缓冲区的容量。通过构造函数赋予,一旦设置,无法更改
- limit:缓冲区的界限。位于 limit 后的数据不可读写。缓冲区的限制不能为负,并且不能大于其容量
- position:下一个读写位置的索引(类似 PC)。缓冲区的位置不能为负,并且不能大于 limit
- mark:记录当前 position 的值。position 被改变后,可以通过调用 reset() 方法恢复到 mark 的位置
在一开始的情况下,position 指向第一位写入位置,limit 和 capacity 则等于缓冲区的容量。
在写模式下,position 是写入位置,limit 等于容量,下图表示写入 4 个元素后的状态。
当调用flip()
方法切换为读模式后,position 切换为读取位置,limit 切换为读取限制。
当读取到 limit 位置后,则不可以继续读取。
当调用clear()
方法后,则回归最原始状态。
当调用 compact()方法时,需要注意:此方法为 ByteBuffer 的方法,而不是 Buffer 的方法。
- compact 会把未读完的数据向前压缩,然后切换到写模式
- 数据前移后,原位置的值并未清零,写时会覆盖之前的值
ByteBuffer 的常见方法
分配空间:allocate()
//java.nio.HeapByteBuffer java堆内存,读写效率较低,受到gc影响 System.out.println(ByteBuffer.allocate(1024).getClass()); //java.nio.DirectByteBuffer 直接内存,读写效率较高(少一次拷贝),不会受gc影响,分配内存效率较低,使用不当则可能会发生内存泄漏 System.out.println(ByteBuffer.allocateDirect(1024).getClass());
flip()
- flip()方法会切换对缓冲区的操作模式,由写->读 / 读->写
put()
- put()方法可以将一个数据放入到缓冲区中。
- 进行该操作后,postition 的值会+1,指向下一个可以放入的位置。
get()
- get()方法会读取缓冲区中的一个值
- 进行该操作后,position 会+1,如果超过了 limit 则会抛出异常
注意:get(i)方法不会改变 position 的值。
rewind()
- 该方法只能在读模式下使用
- rewind()方法后,会恢复 position、limit 和 capacity 的值,变为进行 get()前的值
clear()
- clear()方法会将缓冲区中的各个属性恢复为最初的状态,position = 0, capacity = limit
- 此时缓冲区的数据依然存在,处于“被遗忘”状态,下次进行写操作时会覆盖这些数据
mark()和 reset()
- mark()方法会将 postion 的值保存到 mark 属性中
- reset()方法会将 position 的值改为 mark 中保存的值
字符串和 ByteBuffer 相互转换
引入工具类:
import io.netty.util.internal.MathUtil; import io.netty.util.internal.StringUtil; import java.nio.ByteBuffer; /** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/28 * @Description 工具类 */ public class ByteBufferUtil { private static final char[] BYTE2CHAR = new char[256]; private static final char[] HEXDUMP_TABLE = new char[256 * 4]; private static final String[] HEXPADDING = new String[16]; private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4]; private static final String[] BYTE2HEX = new String[256]; private static final String[] BYTEPADDING = new String[16]; static { final char[] DIGITS = "0123456789abcdef".toCharArray(); for (int i = 0; i < 256; i++) { HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F]; HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F]; } int i; // Generate the lookup table for hex dump paddings for (i = 0; i < HEXPADDING.length; i++) { int padding = HEXPADDING.length - i; StringBuilder buf = new StringBuilder(padding * 3); for (int j = 0; j < padding; j++) { buf.append(" "); } HEXPADDING[i] = buf.toString(); } // Generate the lookup table for the start-offset header in each row (up to 64KiB). for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) { StringBuilder buf = new StringBuilder(12); buf.append(StringUtil.NEWLINE); buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L)); buf.setCharAt(buf.length() - 9, '|'); buf.append('|'); HEXDUMP_ROWPREFIXES[i] = buf.toString(); } // Generate the lookup table for byte-to-hex-dump conversion for (i = 0; i < BYTE2HEX.length; i++) { BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i); } // Generate the lookup table for byte dump paddings for (i = 0; i < BYTEPADDING.length; i++) { int padding = BYTEPADDING.length - i; StringBuilder buf = new StringBuilder(padding); for (int j = 0; j < padding; j++) { buf.append(' '); } BYTEPADDING[i] = buf.toString(); } // Generate the lookup table for byte-to-char conversion for (i = 0; i < BYTE2CHAR.length; i++) { if (i <= 0x1f || i >= 0x7f) { BYTE2CHAR[i] = '.'; } else { BYTE2CHAR[i] = (char) i; } } } /** * 打印所有内容 * * @param buffer */ public static void debugAll(ByteBuffer buffer) { int oldlimit = buffer.limit(); buffer.limit(buffer.capacity()); StringBuilder origin = new StringBuilder(256); appendPrettyHexDump(origin, buffer, 0, buffer.capacity()); System.out.println("+--------+-------------------- all ------------------------+----------------+"); System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit); System.out.println(origin); buffer.limit(oldlimit); } /** * 打印可读取内容 * * @param buffer */ public static void debugRead(ByteBuffer buffer) { StringBuilder builder = new StringBuilder(256); appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position()); System.out.println("+--------+-------------------- read -----------------------+----------------+"); System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit()); System.out.println(builder); } private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) { if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) { throw new IndexOutOfBoundsException( "expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length + ") <= " + "buf.capacity(" + buf.capacity() + ')'); } if (length == 0) { return; } dump.append( " +-------------------------------------------------+" + StringUtil.NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" + StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+"); final int startIndex = offset; final int fullRows = length >>> 4; final int remainder = length & 0xF; // Dump the rows which have 16 bytes. for (int row = 0; row < fullRows; row++) { int rowStartIndex = (row << 4) + startIndex; // Per-row prefix. appendHexDumpRowPrefix(dump, row, rowStartIndex); // Hex dump int rowEndIndex = rowStartIndex + 16; for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2HEX[getUnsignedByte(buf, j)]); } dump.append(" |"); // ASCII dump for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]); } dump.append('|'); } // Dump the last row which has less than 16 bytes. if (remainder != 0) { int rowStartIndex = (fullRows << 4) + startIndex; appendHexDumpRowPrefix(dump, fullRows, rowStartIndex); // Hex dump int rowEndIndex = rowStartIndex + remainder; for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2HEX[getUnsignedByte(buf, j)]); } dump.append(HEXPADDING[remainder]); dump.append(" |"); // Ascii dump for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]); } dump.append(BYTEPADDING[remainder]); dump.append('|'); } dump.append(StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+"); } private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) { if (row < HEXDUMP_ROWPREFIXES.length) { dump.append(HEXDUMP_ROWPREFIXES[row]); } else { dump.append(StringUtil.NEWLINE); dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L)); dump.setCharAt(dump.length() - 9, '|'); dump.append('|'); } } public static short getUnsignedByte(ByteBuffer buffer, int index) { return (short) (buffer.get(index) & 0xFF); } }
测试类:
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/28 * @Description 字符串和ByteBuffer相互转换 */ public class TranslateTest { public static void main(String[] args) { String str1 = "hello"; String str2; String str3; // 通过StandardCharsets的encode方法获得ByteBuffer // 此时获得的ByteBuffer为读模式,无需通过flip切换模式 ByteBuffer buffer = StandardCharsets.UTF_8.encode(str1); //也可以使用wrap方法实现,无需通过flip切换模式 ByteBuffer wrap = ByteBuffer.wrap(str1.getBytes()); ByteBufferUtil.debugAll(wrap); ByteBufferUtil.debugAll(buffer); // 将缓冲区中的数据转化为字符串 // 通过StandardCharsets解码,获得CharBuffer,再通过toString获得字符串 str2 = StandardCharsets.UTF_8.decode(buffer).toString(); System.out.println(str2); str3 = StandardCharsets.UTF_8.decode(wrap).toString(); System.out.println(str3); } }
运行结果:
+--------+-------------------- all ------------------------+----------------+ position: [0], limit: [5] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+ +--------+-------------------- all ------------------------+----------------+ position: [0], limit: [5] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+ hello hello
粘包与半包
现象
网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔。
但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有 3 条为:
- Hello,world\n
- I’m Jack\n
- How are you?\n
变成了下面的两个 byteBuffer (粘包,半包)
- Hello,world\nI’m Jack\nHo
- w are you?\n
出现原因
粘包
发送方在发送数据时,并不是一条一条地发送数据,而是将数据整合在一起,当数据达到一定的数量后再一起发送。这就会导致多条信息被放在一个缓冲区中被一起发送出去。
半包
接收方的缓冲区的大小是有限的,当接收方的缓冲区满了以后,就需要将信息截断,等缓冲区空了以后再继续放入数据。这就会发生一段完整的数据最后被截断的现象。
解决办法
- 通过
get(index)
方法遍历 ByteBuffer,当遇到\n
后进行处理。 - 记录从 position 到 index 的数据长度,申请对应大小的缓冲区。
- 将缓冲区的数据通过
get()
获取写入到 target 缓冲区中。 - 最后,调用 compact()方法切换为写模式,因为缓冲区中可能还存在未读取的数据。
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description 解决黏包和半包 */ public class ByteBufferTest { public static void main(String[] args) { ByteBuffer buffer = ByteBuffer.allocate(32); //模拟黏包和半包 buffer.put("Hello,world\nI'm Jack\nHo".getBytes(StandardCharsets.UTF_8)); split(buffer); buffer.put("w are you?\n".getBytes(StandardCharsets.UTF_8)); split(buffer); } private static void split(ByteBuffer buffer) { //切换读模式 buffer.flip(); for (int i = 0; i < buffer.limit(); i++) { //找到完整消息 if (buffer.get(i) == '\n') { int length = i + 1 - buffer.position(); final ByteBuffer target = ByteBuffer.allocate(length); //从buffer中读取,写入 target for(int j = 0; j < length; j++) { // 将buffer中的数据写入target中 target.put(buffer.get()); } // 打印查看结果 ByteBufferUtil.debugAll(target); } } //清空已读部分,并切换写模式 buffer.compact(); } }
运行结果:
+--------+-------------------- all ------------------------+----------------+ position: [12], limit: [12] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 48 65 6c 6c 6f 2c 77 6f 72 6c 64 0a |Hello,world. | +--------+-------------------------------------------------+----------------+ +--------+-------------------- all ------------------------+----------------+ position: [9], limit: [9] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 49 27 6d 20 4a 61 63 6b 0a |I'm Jack. | +--------+-------------------------------------------------+----------------+ +--------+-------------------- all ------------------------+----------------+ position: [13], limit: [13] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 48 6f 77 20 61 72 65 20 79 6f 75 3f 0a |How are you?. | +--------+-------------------------------------------------+----------------+
文件编程
FileChannel
工作模式
📢:FileChannel 只能工作在 阻塞模式下!
获取
不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel()
方法。
- 通过 FileInputStream 获取的 channel 只能读
- 通过 FileOutputStream 获取的 channel 只能写
- 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定
读取
通过read()
方法将数据填充到 ByteBuffer 中,返回值表示读到了多少字节,-1
表示读到了文件末尾。
int readBytes = channel.read(buffer);
写入
因为 channel 是有写入上限的,所以 write() 方法并不能保证一次将 buffer 中的内容全部写入 channel。必须按照以下规则进行写入。
// 通过hasRemaining()方法查看缓冲区中是否还有数据未写入到通道中 while(buffer.hasRemaining()) { channel.write(buffer); }
关闭
Channel 必须关闭,不过调用 FileInputStream、FileOutputStream、 RandomAccessFile 的close()
方法时也会间接的调用 Channel 的 close()方法。
位置
channel 也拥有一个保存读取数据位置的属性,即 position。
long pos = channel.position();
可以通过 position(int pos)设置 channel 中 position 的值。
long newPos = 10; channel.position(newPos);
设置当前位置时,如果设置为文件的末尾:
- 这时读取会返回 -1
- 这时写入,会追加内容,但要注意如果 position 超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)
强制写入
操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘,而是等到缓存满了以后将所有数据一次性的写入磁盘。可以调用 force(true)
方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘。
常见方法
FileChannel 主要用来对本地文件进行 IO 操作,常见的方法有:
- public int read(ByteBuffer dst) :从通道中读取数据到缓冲区中。
- public int write(ByteBuffer src):把缓冲区中的数据写入到通道中。
- public long transferFrom(ReadableByteChannel src,long position,long count):从目标通道中复制数据到当前通道。
- public long transferTo(long position,long count,WriteableByteChannel target):把数据从当前通道复制给目标通道。
使用 FileChannel 写入文本文件
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description FileChannel测试写入文件 */ public class FileChannelTest { public static void main(String[] args) { try (final FileChannel channel = new FileOutputStream("data1.txt").getChannel()) { String msg = "Hello World!!!"; final ByteBuffer buffer = ByteBuffer.allocate(16); buffer.put(msg.getBytes(StandardCharsets.UTF_8)); buffer.flip(); channel.write(buffer); } catch (IOException e) { e.printStackTrace(); } } }
使用 FileChannel 读取文本文件
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description FileChannel测试读取文件 */ public class FileChannelTest { public static void main(String[] args) { try (final FileChannel channel = new FileInputStream("data1.txt").getChannel()) { final ByteBuffer buffer = ByteBuffer.allocate(16); channel.read(buffer); buffer.flip(); while (buffer.hasRemaining()) { System.out.print((char) buffer.get()); } //清空缓冲区,并重置为写模式 buffer.clear(); } catch (IOException e) { e.printStackTrace(); } } }
使用 FileChannel 进行数据传输
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description FileChannel测试文件传输 */ public class FileChannelTest { public static void main(String[] args){ try (final FileChannel from = new FileInputStream("data.txt").getChannel(); final FileChannel to = new FileOutputStream("data1.txt").getChannel()) { // 参数:inputChannel的起始位置,传输数据的大小,目的channel // 返回值为传输的数据的字节数 // transferTo一次只能传输2G的数据 from.transferTo(0, from.size(), to); } catch (IOException e) { e.printStackTrace(); } } }
transferTo()方法对应的还有 transferFrom()方法。
虽然 transferTo()方法传输效率较高,底层利用操作系统的零拷贝进行优化,但是 transferTo 方法一次只能传输 2G 的数据。
解决方法:可以根据 transferTo()的返回值来判断,返回值代表传输了多少,通过 from 的 size()大小来每次减去即可。
long size = from.size(); for (long left = size; left > 0; ) { left -= from.transferTo(size - left, size, to); } Ch
annel 和 Buffer 的注意事项
- ByteBuffer 支持类型化的 put 和 get,put 放入什么数据类型,get 就应该使用相应的数据类型来取出,否则可能会产生 ByteUnderflowException 异常。
- 可以将一个普通的 Buffer 转换为只读的 Buffer:asReadOnlyBuffer()方法。
- NIO 提供了 MapperByteBuffer,可以让文件直接在内存(堆外内存)中进行修改,而如何同步到文件由 NIO 来完成。
- NIO 还支持通过多个 Buffer(即 Buffer 数组)完成读写操作,即Scattering(分散)和 Gathering(聚集)。
Scattering(分散)
:在向缓冲区写入数据时,可以使用 Buffer 数组依次写入,一个 Buffer 数组写满后,继续写入下一个 Buffer 数组。Gathering(聚集)
:从缓冲区读取数据时,可以依次读取,读完一个 Buffer 再按顺序读取下一个。
网络编程
阻塞 vs 非阻塞
阻塞
- 在没有数据可读时,包括数据复制过程中,线程必须阻塞等待,不会占用 CPU,但线程相当于闲置状态
- 32 位 JVM 一个线程 320k,64 位 JVM 一个线程 1024k,为了减少线程数量,需要采用线程池技术
- 但即使使用线程池,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程
非阻塞
- 在某个 Channel 没有可读事件时,线程不必阻塞,它可以去处理其它有可读事件的 Channel
- 数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)
- 写数据时,线程只是等待数据写入 Channel 即可,无需等待 Channel 通过网络把数据发送出去
阻塞案例代码
服务端代码:
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description 使用NIO来理解阻塞模式-服务端 */ public class Server { public static void main(String[] args) { //1. 创建服务器 try (ServerSocketChannel ssc = ServerSocketChannel.open()) { final ByteBuffer buffer = ByteBuffer.allocate(16); //2. 绑定监听端口 ssc.bind(new InetSocketAddress(7777)); //3. 存放建立连接的集合 List<SocketChannel> channels = new ArrayList<>(); while (true) { System.out.println("建立连接..."); //4. accept 建立客户端连接 , 用来和客户端之间通信 final SocketChannel socketChannel = ssc.accept(); System.out.println("建立连接完成..."); channels.add(socketChannel); //5. 接收客户端发送的数据 for (SocketChannel channel : channels) { System.out.println("正在读取数据..."); channel.read(buffer); buffer.flip(); ByteBufferUtil.debugRead(buffer); buffer.clear(); System.out.println("数据读取完成..."); } } } catch (IOException e) { System.out.println("出现异常..."); } } }
客户端代码:
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description 使用NIO来理解阻塞模式-客户端 */ public class Client { public static void main(String[] args) { try (SocketChannel socketChannel = SocketChannel.open()) { // 建立连接 socketChannel.connect(new InetSocketAddress("localhost", 7777)); final ByteBuffer buffer = ByteBuffer.allocate(10); buffer.put("hello".getBytes(StandardCharsets.UTF_8)); buffer.flip(); socketChannel.write(buffer); } catch (IOException e) { e.printStackTrace(); } } }
运行结果:
- 在刚开始服务器运行后:服务器端因 accept 阻塞。
- 在客户端和服务器建立连接后,客户端发送消息前:服务器端因通道为空被阻塞。
- 客户端发送数据后,服务器处理通道中的数据。之后再次进入循环时,再次被 accept 阻塞。
- 之前的客户端再次发送消息,服务器端因为被 accept 阻塞,就无法处理之前客户端再次发送到通道中的信息了。
非阻塞
- 通过 ServerSocketChannel 的
configureBlocking(false)
方法将获得连接设置为非阻塞的。此时若没有连接,accept 会返回 null - 通过 SocketChannel 的
configureBlocking(false)
方法将从通道中读取数据设置为非阻塞的。若此时通道中没有数据可读,read 会返回-1
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description 使用NIO来理解阻塞模式-服务端 */ public class Server { public static void main(String[] args) { //1. 创建服务器 try (ServerSocketChannel ssc = ServerSocketChannel.open()) { final ByteBuffer buffer = ByteBuffer.allocate(16); //2. 绑定监听端口 ssc.bind(new InetSocketAddress(7777)); //3. 存放建立连接的集合 List<SocketChannel> channels = new ArrayList<>(); //设置非阻塞!! ssc.configureBlocking(false); while (true) { System.out.println("建立连接..."); //4. accept 建立客户端连接 , 用来和客户端之间通信 final SocketChannel socketChannel = ssc.accept(); //设置非阻塞!! socketChannel.configureBlocking(false); System.out.println("建立连接完成..."); channels.add(socketChannel); //5. 接收客户端发送的数据 for (SocketChannel channel : channels) { System.out.println("正在读取数据..."); channel.read(buffer); buffer.flip(); ByteBufferUtil.debugRead(buffer); buffer.clear(); System.out.println("数据读取完成..."); } } } catch (IOException e) { System.out.println("出现异常..."); } } }
因为设置为了非阻塞,会一直执行
while(true)
中的代码,CPU 一直处于忙碌状态,会使得性能变低,所以实际情况中不使用这种方法处理请求。
Selector
基本介绍
- Java 的 NIO 使用了非阻塞的 I/O 方式。可以用一个线程处理若干个客户端连接,就会使用到 Selector(选择器)。
- Selector 能够检测到多个注册通道上是否有事件发生(多个 Channel 以事件的形式注册到同一个 selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。
- 只有在连接真正有读写事件发生时,才会进行读写,减少了系统开销,并且不必为每个连接都创建一个线程,不用维护多个线程。
- 避免了多线程之间上下文切换导致的开销。
特点
单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称为多路复用。
- 多路复用仅针对网络 IO,普通文件 IO 无法利用多路复用
- 如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证
- 有可连接事件时才去连接
- 有可读事件才去读取
- 有可写事件才去写入
限于网络传输能力,Channel 未必随时可写,一旦 Channel 可写,会触发 Selector 的可写事件进行写入。
Selector 相关方法说明
selector.select()
://若未监听到注册管道中有事件,则持续阻塞selector.select(1000)
://阻塞 1000 毫秒,1000 毫秒后返回selector.wakeup()
://唤醒 selectorselector.selectNow()
: //不阻塞,立即返回
NIO 非阻塞网络编程过程分析
- 当客户端连接时,会通过 SeverSocketChannel 得到对应的 SocketChannel。
- Selector 进行监听,调用
select()
方法,返回注册该 Selector 的所有通道中有事件发生的通道个数。 - 将 SocketChannel 注册到 Selector 上,public final SelectionKey register(Selector sel, int ops),一个 Selector 上可以注册多个 SocketChannel。
- 注册后返回一个 SelectionKey,会和该 Selector 关联(以集合的形式)。
- 进一步得到各个 SelectionKey,有事件发生。
- 再通过 SelectionKey 反向获取 SocketChannel,使用 channnel()方法。
- 可以通过得到的 channel,完成业务处理。
SelectionKey 中定义了四个操作标志位:OP_READ
表示通道中发生读事件;OP_WRITE
—表示通道中发生写事件;OP_CONNECT
—表示建立连接;OP_ACCEPT
—请求新连接。
SelectionKey 的相关方法
方法 | 描述 |
public abstract Selector selector(); | 得到与之关联的 Selector 对象 |
public abstract SelectableChannel channel(); | 得到与之关联的通道 |
public final Object attachment() | 得到与之关联的共享数据 |
public abstract SelectionKey interestOps(int ops); | 设置或改变监听的事件类型 |
public final boolean isReadable(); | 通道是否可读 |
public final boolean isWritable(); | 通道是否可写 |
public final boolean isAcceptable(); | 是否可以建立连接 ACCEPT |
Selector 基本使用及 Accpet 事件
接下来我们使用 Selector 实现多路复用,对服务端代码进行改进。
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description Selector基本使用-服务端 */ public class Server { public static void main(String[] args) { try (ServerSocketChannel ssc = ServerSocketChannel.open(); final Selector selector = Selector.open()) {//创建selector 管理多个channel ssc.bind(new InetSocketAddress(7777)); ssc.configureBlocking(false); // 将通道注册到选择器中,并设置感兴趣的事件 ssc.register(selector, SelectionKey.OP_ACCEPT); ByteBuffer buffer = ByteBuffer.allocate(16); while (true) { // 如果事件就绪,线程会被阻塞,反之不会被阻塞。从而避免了CPU空转 // 返回值为就绪的事件个数 int ready = selector.select(); System.out.println("selector就绪总数: " + ready); // 获取所有事件 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { final SelectionKey key = iterator.next(); //判断key的事件类型 if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); final SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("获取到客户端连接..."); } // 处理完毕后移除 iterator.remove(); } } } catch (IOException e) { System.out.println("出现异常..."); } } }
事件发生后,要么处理,要么使用 key.cancel()方法取消,不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发。
当选择器中的通道对应的事件发生后,SelectionKey 会被放到另一个集合中,但是selecionKey 不会自动移除,所以需要我们在处理完一个事件后,通过迭代器手动移除其中的 selecionKey。否则会导致已被处理过的事件再次被处理,就会引发错误。
Read 事件
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description Read事件-服务端 */ public class Server { public static void main(String[] args) { try (ServerSocketChannel ssc = ServerSocketChannel.open(); final Selector selector = Selector.open()) {//创建selector 管理多个channel ssc.bind(new InetSocketAddress(7777)); ssc.configureBlocking(false); // 将通道注册到选择器中,并设置感兴趣的事件 ssc.register(selector, SelectionKey.OP_ACCEPT); ByteBuffer buffer = ByteBuffer.allocate(16); while (true) { // 如果事件就绪,线程会被阻塞,反之不会被阻塞。从而避免了CPU空转 // 返回值为就绪的事件个数 int ready = selector.select(); System.out.println("selector就绪总数: " + ready); // 获取所有事件 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { final SelectionKey key = iterator.next(); //判断key的事件类型 if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); final SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("获取到客户端连接..."); // 设置为非阻塞模式,同时将连接的通道也注册到选择其中 socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { //读事件 SocketChannel channel = (SocketChannel) key.channel(); channel.read(buffer); buffer.flip(); ByteBufferUtil.debugRead(buffer); buffer.clear(); } // 处理完毕后移除 iterator.remove(); } } } catch (IOException e) { System.out.println("出现异常..."); } } }
断开处理
当客户端与服务器之间的连接断开时,会给服务器端发送一个读事件,对异常断开
和正常断开
需要不同的方式进行处理:
- 正常断开
- 正常断开时,服务器端的 channel.read(buffer)方法的返回值为-1,所以当结束到返回值为-1 时,需要调用 key 的 cancel()方法取消此事件,并在取消后移除该事件
- 异常断开
- 异常断开时,会抛出 IOException 异常, 在 try-catch 的catch 块中捕获异常并调用 key 的 cancel()方法即可
消息边界
⚠️ 不处理消息边界存在的问题
将缓冲区的大小设置为 4 个字节,发送 2 个汉字(你好),通过 decode 解码并打印时,会出现乱码
ByteBuffer buffer = ByteBuffer.allocate(4); // 解码并打印 System.out.println(StandardCharsets.UTF_8.decode(buffer)); 你� ��
这是因为 UTF-8 字符集下,1 个汉字占用 3 个字节,此时缓冲区大小为 4 个字节,一次读时间无法处理完通道中的所有数据,所以一共会触发两次读事件。这就导致 你好
的 好
字被拆分为了前半部分和后半部分发送,解码时就会出现问题。
💡 处理消息边界
传输的文本可能有以下三种情况:
- 文本大于缓冲区大小,此时需要将缓冲区进行扩容
- 发生半包现象
- 发生粘包现象
解决方案:
- 固定消息长度,数据包大小一样,服务器按预定长度读取,当发送的数据较少时,需要将数据进行填充,直到长度与消息规定长度一致。缺点是浪费带宽
- 另一种思路是按分隔符拆分,缺点是效率低,需要一个一个字符地去匹配分隔符
- TLV 格式,即 Type 类型、Length 长度、Value 数据(也就是在消息开头用一些空间存放后面数据的长度),如 HTTP 请求头中的 Content-Type 与Content-Length。类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
下面演示第二种解决方案,按分隔符拆分:
我们需要在 Accept 事件发生后,将通道注册到 Selector 中时,对每个通道添加一个 ByteBuffer 附件,让每个通道发生读事件时都使用自己的通道,避免与其他通道发生冲突而导致问题。
ByteBuffer buffer = ByteBuffer.allocate(16); // 添加通道对应的Buffer附件 socketChannel.register(selector, SelectionKey.OP_READ, buffer);
当 Channel 中的数据大于缓冲区时,需要对缓冲区进行扩容操作。此代码中的扩容的判定方法:Channel 调用 compact 方法后,的 position 与 limit 相等,说明缓冲区中的数据并未被读取(容量太小),此时创建新的缓冲区,其大小扩大为两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,同时调用 SelectionKey 的 attach 方法将新的缓冲区作为新的附件放入 SelectionKey 中。
// 如果缓冲区太小,就进行扩容 if (buffer.position() == buffer.limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2); // 将旧buffer中的内容放入新的buffer中 buffer.flip(); newBuffer.put(buffer); // 将新buffer放到key中作为附件 key.attach(newBuffer); }
改进后的代码如下:
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description Read事件完整版-服务端 */ public class Server { public static void main(String[] args) { try (ServerSocketChannel ssc = ServerSocketChannel.open(); final Selector selector = Selector.open()) {//创建selector 管理多个channel ssc.bind(new InetSocketAddress(7777)); ssc.configureBlocking(false); // 将通道注册到选择器中,并设置感兴趣的事件 ssc.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 如果事件就绪,线程会被阻塞,反之不会被阻塞。从而避免了CPU空转 // 返回值为就绪的事件个数 int ready = selector.select(); System.out.println("selector就绪总数: " + ready); // 获取所有事件 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { final SelectionKey key = iterator.next(); //判断key的事件类型 if (key.isAcceptable()) { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); final SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("获取到客户端连接..."); socketChannel.configureBlocking(false); ByteBuffer byteBuffer = ByteBuffer.allocate(16); //注册到Selector并且设置读事件,设置附件bytebuffer socketChannel.register(selector, SelectionKey.OP_READ, byteBuffer); } else if (key.isReadable()) { //读事件 try { SocketChannel channel = (SocketChannel) key.channel(); // 通过key获得附件 ByteBuffer buffer = (ByteBuffer) key.attachment(); int read = channel.read(buffer); if (read == -1) { key.cancel(); channel.close(); } else { // 通过分隔符来分隔buffer中的数据 split(buffer); // 如果缓冲区太小,就进行扩容 if (buffer.position() == buffer.limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); // 将旧buffer中的内容放入新的buffer中 buffer.flip(); newBuffer.put(buffer); // 将新buffer放到key中作为附件 key.attach(newBuffer); } } } catch (IOException e) { //异常断开,取消事件 key.cancel(); } } // 处理完毕后移除 iterator.remove(); } } } catch (IOException e) { System.out.println("出现异常..."); } } private static void split(ByteBuffer buffer) { buffer.flip(); for (int i = 0; i < buffer.limit(); i++) { //找到一条完成数据 if (buffer.get(i) == '\n') { // 缓冲区长度 int length = i + 1 - buffer.position(); ByteBuffer target = ByteBuffer.allocate(length); // 将前面的内容写入target缓冲区 for (int j = 0; j < length; j++) { // 将buffer中的数据写入target中 target.put(buffer.get()); } ByteBufferUtil.debugAll(target); } } // 切换为写模式,但是缓冲区可能未读完,这里需要使用compact buffer.compact(); } }
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description Read事件完整版-客户端 */ public class Client { public static void main(String[] args) { try (SocketChannel socketChannel = SocketChannel.open()) { // 建立连接 socketChannel.connect(new InetSocketAddress("localhost", 7777)); final ByteBuffer buffer = ByteBuffer.allocate(32); buffer.put("01234567890abcdef3333\n".getBytes(StandardCharsets.UTF_8)); buffer.flip(); socketChannel.write(buffer); } catch (IOException e) { e.printStackTrace(); } } }
ByteBuffer 的大小分配
- 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
- ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
- 分配思路:
- 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能
- 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
Write 事件
服务器通过 Buffer 通道中写入数据时,可能因为通道容量小于 Buffer 中的数据大小,导致无法一次性将 Buffer 中的数据全部写入到 Channel 中,这时便需要分多次写入,具体步骤如下:
- 执行一次写操作,向将 buffer 中的内容写入到 SocketChannel 中,然后判断 Buffer 中是否还有数据
- 若 Buffer 中还有数据,则需要将 SockerChannel 注册到 Seletor 中,并关注写事件,同时将未写完的 Buffer 作为附件一起放入到 SelectionKey 中。
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description Write事件-服务端 */ public class Server { public static void main(String[] args) { try (ServerSocketChannel ssc = ServerSocketChannel.open(); final Selector selector = Selector.open()) { ssc.bind(new InetSocketAddress(7777)); ssc.configureBlocking(false); ssc.register(selector, SelectionKey.OP_ACCEPT); while (true) { int ready = selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { final SelectionKey key = iterator.next(); //判断key的事件类型 if (key.isAcceptable()) { final SocketChannel socketChannel = ssc.accept(); socketChannel.configureBlocking(false); StringBuilder sb = new StringBuilder(); for (int i = 0; i < 3000000; i++) { sb.append("a"); } final ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString()); final int write = socketChannel.write(buffer); System.out.println("accept事件器写入.."+write); // 判断是否还有剩余内容 if (buffer.hasRemaining()) { // 注册到Selector中,关注可写事件,并将buffer添加到key的附件中 socketChannel.register(selector, SelectionKey.OP_WRITE, buffer); } }else if (key.isWritable()) { SocketChannel socket = (SocketChannel) key.channel(); // 获得事件 ByteBuffer buffer = (ByteBuffer) key.attachment(); int write = socket.write(buffer); System.out.println("write事件器写入.."+write); // 如果已经完成了写操作,需要移除key中的附件,同时不再对写事件感兴趣 if (!buffer.hasRemaining()) { key.attach(null); key.interestOps(0); } } // 处理完毕后移除 iterator.remove(); } } } catch (IOException e) { System.out.println("出现异常..."); } } }
/** * @author 神秘杰克 * 公众号: Java菜鸟程序员 * @date 2021/12/29 * @Description Write事件-客户端 */ public class Client { public static void main(String[] args) { try (SocketChannel socketChannel = SocketChannel.open()) { // 建立连接 socketChannel.connect(new InetSocketAddress("localhost", 7777)); int count = 0; while (true) { final ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); count += socketChannel.read(buffer); System.out.println("客户端接受了.."+count); buffer.clear(); } } catch (IOException e) { e.printStackTrace(); } } }
运行结果: