🚀1. Java NIO 介绍
🎁Java NIO(New IO) 又被称为 Java Non-Blocking IO,是在 Java 1.4 开始引入的一个新的 IO API. NIO 支持面向缓冲区的、基于通道的 IO 操作,以更高效的方式进行文件的读写操作。传统 IO 的读写操作只能阻塞执行,线程在读写期间不能干其他事情。例如,调用 socket.read() 时,如果服务器一直没有数据传输过来,线程就一直阻塞,而 NIO 可以配置 socket 为非阻塞模式。
NIO 的非阻塞模式详解
NIO 非阻塞读,使一个线程从某个通道发送请求或读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变为可读取之前,该线程可以继续做其他的事情。
NIO 非阻塞写,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做其他事情。
NIO 和 BIO 的比较
BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,IO 块比 IO 流效率更高。
BIO 是阻塞的,NIO 是非阻塞的。
BIO 基于字节流和字符流进行操作,NIO 基于通道(Channel)和缓存区(Buffer)进行操作,数据总是从通道读取到缓存区中,或者从缓存区写入到通道中。
NIO | BIO |
面向缓存区(Buffer) | 面向流(Stream) |
非阻塞IO(Non-Blocking IO) | 阻塞IO(Blocking IO) |
选择器(Selector) |
NIO 有三大核心组件
通道(Channel):Java NIO 的通道类似流,但又有些不同,既可以从通道中读取数据,又可以写数据到通道中,但流的读写通常都是单向的。通道可以非阻塞读取和写入数据,通道支持读取或写入缓存区,也支持异步地读写。
缓存区(Buffer):缓存区本质上是一块可以写入数据并从中读取数据的内存,这块内存被包装成 NIO Buffer 对象,并提供了一组方法用来方便的访问这块内存。
选择器(Selector):可以监听一个或多个 NIO 通道,并确定哪些通道已经准备好进行读取或写入,单个线程可以管理多个通道,从而管理多个网络连接提高效率。
🚀2. 缓存区(Buffer)
缓存区是一个用于特定基本类型的容器,由 java.nio 包定义,所有缓存区都是 Buffer 抽象类的子类。主要用于与 NIO 通道进行交互,数据从通道读入缓存区或者从缓存区写入通道中。
缓存区就像一个数组,可以保存多个相同类型的数据,根据数据类型划分,有以下 Buffer 常用子类
- ByteBuffer
- CharBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
虽然上述 Buffer 类各自管理的数据类型不同,但是都是采用类似的方式管理数据,通过下面的方法获取一个 Buffer 对象
static xxxBuffer allocate(int capacity):创建一个容量为capacity的xxxBuffer对象
缓存区的基本属性
容量(capacity):作为一个内存块,Buffer 具有固定的大小,也成为“容量”,缓冲区容量不能为负,并且创建后不能更改。
限制(limit):表示缓存区中可以操作数据的大小(limit 后数据不能进行读写),缓存区的限制不能为负,并且不能大于其容量。写入模式下,限制等于 buffer 的容量,读取模式下,limit 等于写入的数据量。
位置(position):下一个要读取或写入的数据的索引,缓存区的位置不能为负,并且不能大于其限制。
标记(mark)与重置(reset):标记是一个索引,通过 Buffer 中的 mark() 方法指定 Buffer 中一个特定的位置,之后通过调用 reset() 方法恢复到这个位置。标记、位置、限制、容量遵循不变式0<=mark<=position<=limit<=capacity
缓存区常见方法
Buffer clear() //清空缓存区并返回对缓存区的引用 Buffer flip() //为将缓存区的界限设置为当前位置,并将当前位置重置为0 int capacity() //返回Buffer的capacity大小 boolean hasRemaining()//判断缓存区中是否还有元素 int limit() //返回缓存区的界限(limit)的位置 Buffer limit(int n)//将设置缓存区界限为n,并返回一个具有新limit的缓存区对象 Buffer mark() //对缓存区设置标记 int position() //返回缓存区的当前位置position Buffer position(int n)//将设置缓存区的当前位置为n,并返回修改后的缓存区对象 int remaining() //返回position和limit之间的元素个数 Buffer reset() //将位置position转到以前设置的mark所在的位置 Buffer rewind() //将位置设为为0.取消设置的mark
缓存区的数据操作
Buffer 所有子类提供了两个用于数据操作的方法:get() put() 方法获取 Buffer 中的数据 get(): 读取单个字节 get(byte[] dst): 批量读取多个字节到 dst 中 get(int index): 读取指定索引位置的字节(不会移动position) 放入数据到 Buffer 中 put(byte b): 将单个字节写入缓存区的当前位置 put(byte[] src): 将src中的字节写入缓存区的当前位置 put(int index, byte b): 将指定字节写入缓存区的索引位置(不会移动position)
使用缓存区读写数据一般遵循以下四个步骤
- 写入数据到缓存区
- 调用 buffer.flip() 方法,转换为读取模式
- 从缓存区中读取数据
- 调用 buffer.clear() 方法或 buffer.compact() 方法清除缓存区
案例演示
public class BufferTest { @Test public void test01() { //1.分配一个缓存区,容量设置为10 ByteBuffer buffer = ByteBuffer.allocate(10); System.out.println(buffer.position()); //0 System.out.println(buffer.limit()); //10 System.out.println(buffer.capacity()); //10 System.out.println("----------------------"); //put往缓存区中添加数据 String name = "java nio"; buffer.put(name.getBytes()); System.out.println(buffer.position()); //8 System.out.println(buffer.limit()); //10 System.out.println(buffer.capacity()); //10 System.out.println("----------------------"); //3.flip()为将缓存区的界限设置为当前位置,并将当前位置设置为0 可读模式 buffer.flip(); System.out.println(buffer.position()); //0 System.out.println(buffer.limit()); //8 System.out.println(buffer.capacity()); //10 System.out.println("----------------------"); //4.get数据的获取 char ch = (char) buffer.get(); System.out.println(ch); //j System.out.println(buffer.position()); //1 System.out.println(buffer.limit()); //8 System.out.println(buffer.capacity()); //10 } @Test public void test02() { //分配一个缓存区,容量设置为10 ByteBuffer buffer = ByteBuffer.allocate(10); System.out.println(buffer.position()); //0 System.out.println(buffer.limit()); //10 System.out.println(buffer.capacity()); //10 System.out.println("----------------------"); //put往缓存区中添加数据 String name = "java nio"; buffer.put(name.getBytes()); System.out.println(buffer.position()); //8 System.out.println(buffer.limit()); //10 System.out.println(buffer.capacity()); //10 System.out.println("----------------------"); //clear清除缓存区中的数据 buffer.clear(); System.out.println(buffer.position()); //0 System.out.println(buffer.limit()); //10 System.out.println(buffer.capacity()); //10 System.out.println((char)buffer.get()); //j, 数据并没有被恢复,只是恢复了position的位置 System.out.println("----------------------"); //定义一个缓存区 ByteBuffer buf = ByteBuffer.allocate(10); String n = "javanio"; buf.put(n.getBytes()); buf.flip(); //读取数据 byte[] b = new byte[2]; buf.get(b); String rs = new String(b); System.out.println(rs); //ja System.out.println(buffer.position()); //1 System.out.println(buffer.limit()); //10 System.out.println(buffer.capacity()); //10 System.out.println("----------------------"); buf.mark(); //标记此刻这个位置:2 byte[] b2 = new byte[3]; buf.get(b2); System.out.println(new String(b2)); //van System.out.println(buffer.position()); //1 System.out.println(buffer.limit()); //10 System.out.println(buffer.capacity()); //10 System.out.println("----------------------"); buf.reset(); //回到标记位置 if (buf.hasRemaining()) { System.out.println(buf.remaining()); //5 } } }
直接内存与非直接内存
byte buffer 可以是两种类型,一种是基于直接内存(也就是非堆内存),另一种是非直接内存(也就是堆内存)。对于直接内存来说,JVM 将会在 IO 操作上具有更高的性能,因为它是直接作用于本地系统的 IO 操作;而非直接内存,如果要进行 IO 操作,会先从本进程内存复制到直接内存,再利用本地 IO 处理。
从数据流的角度,非直接内存是下面这样的作用链
本地IO---->直接内存---->非直接内存---->直接内存---->本地IO
而直接内存的作用链为
本地IO---->直接内存---->本地IO
在做 IO 处理时,例如通过网络发送大量数据,直接内存具有更高的效率,因为直接内存使用 allocateDirect 创建。虽然它比申请普通的堆内存需要耗费更高的性能,但是这部分的数据是在 JVM 之外的,它不会占用应用的内存。
因此,如果有很大的数据需要缓存,并且数据生命周期很长,那么使用直接内存比较合适。如果不能带来很明显的性能提升,还是推荐直接使用堆内存,字节缓冲区是直接缓冲区还是非字节缓冲区,可以通过调用 isDirect() 方法确定。
@Test public void test03() { //创建一个非直接内存的缓存区 ByteBuffer buffer = ByteBuffer.allocate(1024); //buffer.isDirect()用于判断是否为直接内存 System.out.println(buffer.isDirect()); //false System.out.println("----------------------"); //创建一个直接内存的缓存区 ByteBuffer buffer2 = ByteBuffer.allocateDirect(1024); System.out.println(buffer2.isDirect()); //true }
直接内存使用场景
- 数据量很大并且这些数据的生命周期很长
- 频繁的 IO 操作,例如网络并发场景
🚀3. 通道(Channel)
通道(Channel)由 java.nio.channels 包定义,表示 IO 源与目标打开的连接,它类似于传统的“流”,只不过通道(Channel)本身不能直接访问数据,只能与 Buffer 进行交互。
NIO 的通道类似于流,但有些区别如下
通道可以同时进行读写,而流只能读或者只能写
通道可以实现异步读写数据,流只能同步读或同步写
通道可以从缓存读数据,也可以写数据到缓存
通道(Channel)在 NIO 中是一个接口
public interface Channel extends Closeable()
常用的 Channel 实现类
FileChannel:用于读取、写入、映射和操作文件的通道
DatagramChannel:通过 UDP 读写网络中的数据通道
SocketChannel:通过 TCP 读写网络中的数据通道
ServerSocketChannel:可以监听新建立的 TCP 连接,对每一个新建立的连接都会创建一个 SocketChannel
对于 FileChannel 类,获取通道的一种方式是对支持通道的对象调用 getChannel() 方法,支持通道的类如下
FileInputStream
FileOutputStream
RandomAccessFile
DatagramSocket
Socket
ServerSocket
获取通道的其他方式是使用 Files 类的静态方法 newByteChannel() 获取字节通道或通过通道的静态方法 open() 打开并返回指定通道。
FileChannel 的常用方法
int read(ByteBuffer dst) 从Channel当中读取数据至ByteBuffer long read(ByteBuffer[] dsts) 将Channel当中的数据“分散”至ByteBuffer[] int write(ByteBuffer src) 将ByteBuffer当中的数据写入到Channel long write(ByteBuffer[] srcs) 将ByteBuffer[]当中的数据“聚集”到Channel long position() 返回此通道的文件位置 FileChannel position(long p) 设置此通道的文件位置 long size() 返回此通道的文件的当前大小 FileChannel truncate(long s) 将此通道的文件截取为给定大小 void force(boolean meteData) 强制将所有对此通道的文件更新写入到存储设备中
案例1-本地文件写数据
public class ChannelTest { @Test public void write() { try { //1.字节输出流通向目标文件 FileOutputStream fos = new FileOutputStream("data01.txt"); //2.得到字节输出流对应的通道Channel FileChannel channel = fos.getChannel(); //3.分配缓存区 ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put("hello, friends".getBytes()); //4.把缓存区切换为写模式 buffer.flip(); channel.write(buffer); channel.close(); System.out.println("写数据到文件中!"); } catch (Exception e) { e.printStackTrace(); } } }
运行结果截图
案例2-本地文件读数据
@Test public void read() throws Exception { //1.定义一个文件字节输入流与源文件接通 FileInputStream is = new FileInputStream("data01_txt"); //2.需要得到文件字节输入流的文件通道 FileChannel channel = is.getChannel(); //3.定义一个缓存区 ByteBuffer buffer = ByteBuffer.allocate(1024); //4.读取数据到缓存区 channel.read(buffer); buffer.flip(); //归位 //5.读取出缓存区中的数据并输出即可 String rs = new String(buffer.array(), 0, buffer.remaining()); System.out.println(rs); }
运行结果如下
hello, friends
案例3-使用 Buffer 完成文件复制
@Test public void copy() throws Exception { //源文件 File srcFile = new File("C:\\Users\\Desktop\\1.jpg"); File destFile = new File("C:\\Users\\Desktop\\1_copy.jpg"); //得到一个字节输出流、字节输入流 FileInputStream fis = new FileInputStream(srcFile); FileOutputStream fos = new FileOutputStream(destFile); //得到文件通道 FileChannel fisChannel = fis.getChannel(); FileChannel fosChannel = fos.getChannel(); //分配缓存区 ByteBuffer buffer = ByteBuffer.allocate(1024); while (true) { //必须先清空缓存区然后再写入数据到缓存区 buffer.clear(); //开始读取一次数据 int flag = fisChannel.read(buffer); if (flag == -1) { break; } //已经读取了数据,把缓存区的模式切换为可读模式 buffer.flip(); fosChannel.write(buffer); } fisChannel.close(); fosChannel.close(); System.out.println("复制完成"); }
案例4-分散(Scatter)和聚集(Gatter)
- 分散读取(Scatter):把 Channel 通道的数据读取到多个缓存区中
- 聚集写入(Gathering):是指将多个 Buffer 中的数据聚集到 Channel
@Test public void test() throws Exception { //1.字节输入管道 FileInputStream is = new FileInputStream("data01.txt"); FileChannel isChannel = is.getChannel(); //2.字节输出管道 FileOutputStream os = new FileOutputStream("data02.txt"); FileChannel osChannel = os.getChannel(); //3.定义多个缓存区做数据分散 ByteBuffer buffer1 = ByteBuffer.allocate(4); ByteBuffer buffer2 = ByteBuffer.allocate(1024); ByteBuffer[] buffers = {buffer1, buffer2}; //4.从通道中读取数据分散到多个缓存区 isChannel.read(buffers); //5.从每个缓存区中查询是否有数据读取到 for (ByteBuffer buffer : buffers) { buffer.flip(); //切换到读数据模式 System.out.println(new String(buffer.array(), 0, buffer.remaining())); } //6.聚集写入到通道 osChannel.write(buffers); isChannel.close(); osChannel.close(); System.out.println("文件复制"); }
运行结果截图如下
案例5-从目标通道中去复制原通道数据
@Test public void test02() throws Exception { //1.字节输入通道 FileInputStream is = new FileInputStream("data01.txt"); FileChannel isChannel = is.getChannel(); //2.字节输出管道 FileOutputStream os = new FileOutputStream("data03.txt"); FileChannel osChannel = os.getChannel(); //目标通道 //3.复制数据 osChannel.transferFrom(isChannel, isChannel.position(), isChannel.size()); isChannel.close(); osChannel.close(); System.out.println("复制完成"); }
运行结果截图如下
案例6-把原通道数据复制到目标通道
@Test public void test03() throws Exception{ //1.字节输入通道 FileInputStream is = new FileInputStream("data01.txt"); FileChannel isChannel = is.getChannel(); //2.字节输出管道 FileOutputStream os = new FileOutputStream("data04.txt"); FileChannel osChannel = os.getChannel(); //3.复制数据 isChannel.transferTo(isChannel.position(), isChannel.size(), osChannel); isChannel.close(); osChannel.close(); System.out.println("复制完成"); }
运行结果截图如下
🚀4. 选择器(Selector)
选择器(Selector) 是 SelectableChannel 对象的多路复用器,它可以同时监控多个SelectableChannel 的 IO 状况,利用选择器(Selector) 可以使一个单独的线程管理多个通道(Channel).
Selector 是非阻塞 IO 的核心
Java 的 NIO 使用非阻塞的 IO 方式,可以用一个线程处理多个客户端的连接,就会使用到选择器。
选择器能够检测到多个注册的通道,通道上若有事件发生便获取事件然后针对每个事件进行相应的处理,这样便可以只用一个单线程去管理多个通道,即管理多个连接和请求。
只有在连接的通道真正有读写事件发生时,才会进行读写,而不用为每个连接都创建一个线程,不用去维护多个线程避免了多线程之间的上下文切换带来的开销
创建 Selector:通过 Selector.open() 方法创建一个 Selector
Selector selector = Selector.open()
向选择器注册通道:SelectableChannel.register(Selector sel, int ops)
//1.获取通道 ServerSocketChannel ssChannel = ServerSocketChannel.open(); //2.切换非阻塞模式 ssChannel.configureBlocking(false); //3.绑定连接 ssChannel.bind(new InetSocketAddress(9898)); //4.获取选择器 Selector selector = Selector.open(); //5.将通道注册到选择器上,并指定"监听接收事件" ssChannel.register(selector, SelectionKey.OP_ACCEPT);
当调用 register(Selector sel, mt ops) 向通道注册选择器时,通过第二个参数 ops 指定选择器对通道的监听事件,可以监听的事件类型有
读:SelectionKey.OP_READ(1)
写:SelectionKey.OP_WRITE(4)
连接:SelectionKey.OP_CONNECT(8)
接收:SelectionKey.OP_ACCEPT(16)
若注册时不止监听一个事件,则可以使用“位或”操作符连接
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE
🚀5. NIO 非阻塞式网络通信原理分析
Java NIO 的组件-----选择器(Selector)能够实现一个 IO 线程可以并发处理 N 个客户端连接和读写操作,从根本上解决了传统同步阻塞 IO 的一个连接一个线程的模型在性能上的缺陷问题,提高了架构的性能、弹性伸缩能力和可靠性。
服务端流程
1、当客户端连接服务端时,服务端会通过 ServerSocketChannel 得到 SocketChannel
ServerSocketChannel ssChannel = ServerSocketChannel.open();
2、切换阻塞模式
ss.Channel.configureBlocking(false);
3、绑定连接
ssChannel.bind(new InetSocketAddress(9999));
4、获取选择器
Selector selector = Selector.open();
5、将通道注册到选择器上,并且指定“监听接收事件”
ssChannel.register(seletor, SelectionKey.OP_ACCEPT);
6、轮询式的获取选择器上已经“准备就绪”的事件
//轮询式的获取选择器上已经"准备就绪"的事件 while (selector.select() > 0) { System.out.println("轮询"); //获取当前选择器中所有注册的"选择键(已就绪的监听事件)" Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { //获取准备“就绪”的事件 SelectionKey sk = it.next(); //判断具体是什么事件准备就绪 if (sk.isAcceptable()) { //若“接收就绪”,获取客户端连接 SocketChannel sChannel = ssChannel.accept(); //切换非阻塞模式 sChannel.configureBlocking(false); //将该通道注册到选择器上 sChannel.register(selector, SelectionKey.OP_READ); } else if (sk.isReadable()) { //获取当前选择器上"读就绪"状态的通道 SocketChannel sChannel = (SocketChannel) sk.channel(); //读取数据 ByteBuffer buf = ByteBuffer.allocate(1024); int len = 0; while ((len = sChannel.read(buf)) > 0) { buf.flip(); System.out.println(new String(buf.array(), 0, len)); System.out.println(); buf.clear(); } } } //取消选择键SelectionKey it.remove();
客户端流程
1、获取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
2、切换非阻塞模式
sChannel.configureBlocking(false);
3、分配指定大小的缓存区
ByteBuffer buf = ByteBuffer.allocate(1024);
4、发送数据给服务端
Scanner scan = new Scanner(System.in); while (scan.hasNext()) { String str = scan.nextLine(); buf.put((new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(System.currentTimeMillis()) + "\n" + str).getBytes()); buf.flip(); sChannel.write(buf); buf.clear(); } //关闭通道 sChannel.close();
🚀6. NIO 非阻塞式网络通信案例
服务端接收客户端的连接请求,并接收多个客户端发送过来的事件。
客户端代码如下
public class Client { public static void main(String[] args) throws IOException { //1.获取通道 SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999)); //2.切换为非阻塞模式 sChannel.configureBlocking(false); //3.分配指定大小缓存区 ByteBuffer buffer = ByteBuffer.allocate(1024); //4.发送数据给服务端 Scanner sc = new Scanner(System.in); while (true) { System.out.print("请输入:"); String msg = sc.nextLine(); buffer.put(("华仔仔:"+msg).getBytes()); buffer.flip(); sChannel.write(buffer); buffer.clear(); } } }
服务端代码如下
public class Server { public static void main(String[] args) throws IOException { System.out.println("---------服务端启动-----------"); //1.获取通道 ServerSocketChannel ssChannel = ServerSocketChannel.open(); //2.切换为非阻塞模式 ssChannel.configureBlocking(false); //3.绑定连接的端口 ssChannel.bind(new InetSocketAddress(9999)); //4.获取选择器 Selector selector = Selector.open(); //5.将通道都注册到选择器上去,并且开始指定监听接收事件 ssChannel.register(selector, SelectionKey.OP_ACCEPT); //6.使用Selector选择器轮询已经就绪好的事件 while (selector.select() > 0) { System.out.println("开始一轮事件处理..."); //7.获取选择器中的所有注册的通道中已经就绪好的事件 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //8.开始遍历这些准备好的事件 while (it.hasNext()) { //提取当前这个事件 SelectionKey sk = it.next(); //9.判断这个事件具体是什么事件 if (sk.isAcceptable()) { //10.直接获取当前接入的客户端通道 SocketChannel sChannel = ssChannel.accept(); //11.将客户端通道也设置为非阻塞式的 sChannel.configureBlocking(false); //12.将客户端通道也注册到选择器Selector上 sChannel.register(selector, SelectionKey.OP_READ); } else if (sk.isReadable()) { //13.获取当前选择器上的"读就绪事件" SocketChannel sChannel = (SocketChannel) sk.channel(); //14.开始读取数据 ByteBuffer buffer = ByteBuffer.allocate(1024); int len = 0; while ((len = sChannel.read(buffer)) > 0) { buffer.flip(); System.out.println(new String(buffer.array(), 0, len)); buffer.clear(); //清除之前的数据 } } //处理完毕当前事件后,需要移除当前事件,否则会重复处理 it.remove(); } } } }
运行结果如下
---------服务端启动----------- 开始一轮事件处理... 开始一轮事件处理... 开始一轮事件处理... 开始一轮事件处理... 华仔仔:你好,我是client1 开始一轮事件处理... 华仔仔:你好,我是client2 开始一轮事件处理... 华仔仔:你好,我是client3 --------client1----------- 请输入:你好,我是client1 --------client2----------- 请输入:你好,我是client2 --------client3----------- 请输入:你好,我是client3
🚀7. NIO 网络编程实现群聊系统应用
需求说明
编写一个 NIO 群聊系统,实现客户端与客户端的通信需求(非阻塞)
服务端:可以检测用户上线、离线,并实现消息转发功能
客户端:通过通道(Channel) 可以无阻塞发送信息给其他所有客户端用户,同时可以接受其他客户端用户通过服务端转发的消息
服务端代码实现
public class Server { private Selector selector; private ServerSocketChannel ssChannel; private static final int PORT = 9999; //初始化工作 public Server() { try { //1.创建选择器 selector = Selector.open(); //2.获取通道 ssChannel = ServerSocketChannel.open(); //3.切换为非阻塞模式 ssChannel.configureBlocking(false); //4.绑定连接的端口 ssChannel.bind(new InetSocketAddress(PORT)); //5.将通道都注册到选择器上去,并且开始指定监听接收事件 ssChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } } //监听 public void listen() { try { while (selector.select() > 0) { //获取选择器中所有注册通道的就绪事件 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //开始遍历这个事件 while (it.hasNext()) { //提取这个事件 SelectionKey sk = it.next(); //判断这个事件 if (sk.isAcceptable()) { //客户端接入请求 //获取当前客户端通道 SocketChannel schannel = ssChannel.accept(); //注册成非阻塞模式 schannel.configureBlocking(false); //注册给选择器,监听读数据的事件 schannel.register(selector, SelectionKey.OP_READ); } else if (sk.isReadable()) { //处理这个客户端的消息接收它,然后实现转发逻辑 readClientData(sk); } it.remove(); //处理完毕之后,需要移除当前事件 } } } catch (Exception e) { e.printStackTrace(); } } //接收当前客户端消息,转发给其他全部客户端通道 private void readClientData(SelectionKey sk) { SocketChannel sChannel = null; try { //直接得到当前客户端通道 sChannel = (SocketChannel) sk.channel(); //创建缓存区对象,开始接收客户端通道的数据 ByteBuffer buffer = ByteBuffer.allocate(1024); int count = sChannel.read(buffer); if (count > 0) { buffer.flip(); //提取读取到的信息 String msg = new String(buffer.array(), 0, buffer.remaining()); System.out.println("接收到了客户端消息:"+msg); //把这个消息推送给全部客户端接收 sendMsgToAllClient(msg, sChannel); } } catch (Exception e) { try { System.out.println("有人离线了:"+sChannel.getRemoteAddress()); //当前客户端离线 sk.cancel(); //取消注册 sChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } } } //把当前客户端的消息推送给当前全部在线注册的channel private void sendMsgToAllClient(String msg, SocketChannel sChannel) throws IOException { System.out.println("服务端开始转发这个消息,当前处理的线程"+ Thread.currentThread().getName()); for (SelectionKey key : selector.keys()) { Channel channel = key.channel(); if (channel instanceof SocketChannel && channel != sChannel) { ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); ((SocketChannel) channel).write(buffer); } } } public static void main(String[] args) { //创建服务端对象 Server server = new Server(); //开始监听客户端的各种消息事件:连接、群聊消息、离线消息 server.listen(); } }
客户端代码实现
public class Client { private Selector selector; private static int PORT = 9999; private SocketChannel socketChannel; //初始化客户端信息 public Client() { try { //创建选择器 selector = Selector.open(); //连接服务器 socketChannel= SocketChannel.open(new InetSocketAddress("127.0.0.1", PORT)); //设置非阻塞模式 socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); System.out.println("当前客户端准备完成"); } catch (Exception e) { e.printStackTrace(); } } private void sendToServer(String s) { try { socketChannel.write(ByteBuffer.wrap(("华仔说:"+s).getBytes())); } catch (Exception e) { e.printStackTrace(); } } private void readInfo() throws IOException { while (selector.select() > 0) { Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); if (key.isReadable()) { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); sc.read(buffer); System.out.println(new String(buffer.array()).trim()); System.out.println("-dsd---------------------"); } it.remove(); } } } public static void main(String[] args) { Client client = new Client(); //定义一个线程,专门负责监听服务端发送过来的读消息事件 new Thread(() -> { try { client.readInfo(); } catch (Exception e) { e.printStackTrace(); } }).start(); //发消息 Scanner sc = new Scanner(System.in); while (sc.hasNextLine()) { System.out.println("-----------------------"); String s = sc.nextLine(); client.sendToServer(s); } } }