# 需求
- 基于NIO实现
- 支持同时多个客户端接入
- 支持客户端发送文本消息到服务器
- 支持客户端自定义群聊名称
- 接收到客户端发送的消息之后,服务器需要将消息转发给目前在线的所有其他客户端
- 支持客户端退出群聊
- 服务端停止服务后,客户端自动断开连接
# 技术介绍
Non-blockingI/O
编程模型Channel
通道
ServerSocketChannel
服务端通道SocketChannel
客户端通道
ByteBuffer
NIO中使用的读写缓冲区Selector
多路复用器
- 将
channel
注册在多路复用器上,并监听相应的事件
- 多线程
- 线程池
# 代码
温馨提示:注意看代码注释哟~ 跟上节奏,很简单😼
- 服务器
/** * 基于NIO实现的聊天室服务端 * * @author futao * @date 2020/7/8 */ @Slf4j public class NioChatServer { /** * 用于处理通道上的事件的线程池(可选的) */ private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10); /** * 启动聊天室 */ public void start() { try { //服务端Socket通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //将通道设置成非阻塞 serverSocketChannel.configureBlocking(false); //绑定主机与监听端口 serverSocketChannel.bind(new InetSocketAddress("localhost", Constants.SERVER_PORT)); //多路复用器 Selector selector = Selector.open(); //将服务端通道注册到多路复用器上,并设置监听事件接入事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); log.debug("{} 基于NIO的聊天室在[{}]端口启动成功 {}", StringUtils.repeat("=", 30), Constants.SERVER_PORT, StringUtils.repeat("=", 30)); while (true) { // 触发了事件的通道数量,该方法会阻塞 int eventCountTriggered = selector.select(); if (eventCountTriggered <= 0) { continue; } // 获取到所有触发的事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 遍历事件进行处理 for (SelectionKey selectionKey : selectionKeys) { // 处理事件 selectionKeyHandler(selectionKey, selector); } // 清除事件记录 selectionKeys.clear(); } } catch (IOException e) { e.printStackTrace(); } } /** * 事件处理器 * * @param selectionKey 触发的事件信息 * @param selector 多路复用器 */ private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) { if (selectionKey.isAcceptable()) { //如果触发的是SocketChannel接入事件 try { // ServerSocketChannel上触发的客户端SocketChannel接入 SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept(); log.debug("客户端[{}]成功接入聊天服务器", socketChannel.socket().getPort()); // 将客户端SocketChannel通道设置成非阻塞 socketChannel.configureBlocking(false); // 将客户端通道注册到多路复用器,并监听这个通道上发生的可读事件 socketChannel.register(selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } else if (selectionKey.isReadable()) { // 触发的是可读事件 // 获取到可读事件的客户端通道 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //创建缓冲区 ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4); try { // 读取通道上的数据写入缓冲区(返回0或者-1说明读到了末尾) while (socketChannel.read(byteBuffer) > 0) { } //切换为读模式 byteBuffer.flip(); // 接收到的消息 String message = String.valueOf(Constants.CHARSET.decode(byteBuffer)); log.info("接收到来自客户端[{}]的数据:[{}]", socketChannel.socket().getPort(), message); // 是否退出 quit(message, selector, selectionKey); // 消息转发 forwardMessage(message, selector, selectionKey); // 清除缓冲区的数据 byteBuffer.clear(); } catch (IOException e) { e.printStackTrace(); } } } /** * 客户端退出 * * @param message 消息 * @param selector 多路复用器 * @param selectionKey 触发的selectionKey */ public void quit(String message, Selector selector, SelectionKey selectionKey) { if (StringUtils.isBlank(message) || Constants.KEY_WORD_QUIT.equals(message)) { int port = ((SocketChannel) selectionKey.channel()).socket().getPort(); // 客户端下线 selectionKey.cancel(); log.debug("客户端[{}]下线", port); // 因为发生了监听事件和channel的变更,所以需要通知selector重新整理selector所监听的事件 selector.wakeup(); } } /** * 转发消息 * * @param message 需要转发的消息 * @param selector 多路复用器 * @param curSelectionKey 当前触发的selectionKey */ public void forwardMessage(String message, Selector selector, SelectionKey curSelectionKey) { // 创建缓冲区 ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4); // 数据写入缓冲区 byteBuffer.put(message.getBytes(Constants.CHARSET)); // 切换为读模式 byteBuffer.flip(); // 在首尾进行标记,因为需要给每个客户端发送同样的数据,需要重复读取 byteBuffer.mark(); // 当前注册在多路复用器上的SelectionKey集合 Set<SelectionKey> keys = selector.keys(); for (SelectionKey key : keys) { // 消息不能转发给自己 and 只转发给客户端SocketChannel if (curSelectionKey.equals(key) || !(key.channel() instanceof SocketChannel)) { continue; } // 客户端SocketChannel SocketChannel socketChannel = (SocketChannel) key.channel(); // 如果缓冲区中还有数据就一直写 while (byteBuffer.hasRemaining()) { try { // 数据写入通道 socketChannel.write(byteBuffer); } catch (IOException e) { e.printStackTrace(); } } // 重置到上次mark的地方,即首位 byteBuffer.reset(); } // 清除缓冲区的数据 byteBuffer.clear(); } public static void main(String[] args) { new NioChatServer().start(); } }
- 客户端
/** * 基于NIO实现的群聊客户端 * * @author futao * @date 2020/7/8 */ @Getter @Setter @Slf4j public class NioChatClient { /** * 用于处理用户输入数据的单个线程线程池,使用线程池是为了便于关闭 */ private static final ExecutorService USER_INPUT_HANDLER = Executors.newSingleThreadExecutor(); /** * 用户名 */ private String userName; /** * 启动客户端 */ public void start() { try { // 创建客户端通道 SocketChannel socketChannel = SocketChannel.open(); // 将通道设置为非阻塞 socketChannel.configureBlocking(false); // 创建多路复用器 Selector selector = Selector.open(); // 将客户端通道注册到多路复用器,并监听可读事件 socketChannel.register(selector, SelectionKey.OP_CONNECT); // 尝试连接到聊天服务器 socketChannel.connect(new InetSocketAddress("localhost", Constants.SERVER_PORT)); while (true) { // 阻塞等待通道上的事件触发。返回触发的通道的数量 int eventCountTriggered = selector.select(); if (eventCountTriggered <= 0) { continue; } // 获取到所有触发的事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 遍历事件进行处理 for (SelectionKey selectionKey : selectionKeys) { // 处理事件 selectionKeyHandler(selectionKey, selector); } // 清除事件记录 selectionKeys.clear(); } } catch (IOException e) { e.printStackTrace(); } catch (ClosedSelectorException e) { log.debug("成功退出聊天室..."); } } /** * 处理器 * * @param selectionKey 触发的selectionKey * @param selector 多路复用器 */ private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) { if (selectionKey.isConnectable()) { // 触发的是成功接入服务器的事件 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); try { // 判断此通道上的连接操作是否正在进行中 if (socketChannel.isConnectionPending()) { // 完成连接套接字通道的过程 socketChannel.finishConnect(); log.debug("成功接入聊天服务器"); // 将通道设置成非阻塞 socketChannel.configureBlocking(false); // 将通道注册到多路复用器,并监听可读事件 socketChannel.register(selector, SelectionKey.OP_READ); // 创建缓冲区,用于处理将用户输入的数据写入通道 ByteBuffer byteBuffer = ByteBuffer.allocate(4 * 1024); // 在新线程中处理用户输入 USER_INPUT_HANDLER.execute(() -> { while (!Thread.currentThread().isInterrupted()) { //先清空缓冲区中的数据 byteBuffer.clear(); // 获取用户输入的文本 String message = new Scanner(System.in).nextLine(); // 将数据写入缓冲区 byteBuffer.put(String.format("【%s】: %s", userName, message).getBytes(Constants.CHARSET)); // 将缓冲区设置为读模式 byteBuffer.flip(); try { // 当缓冲区中还有数据 while (byteBuffer.hasRemaining()) { // 将数据写入通道 socketChannel.write(byteBuffer); } } catch (IOException e) { e.printStackTrace(); } // 判断是否退出群聊 if (quit(message, selector, selectionKey)) { // 跳出循环,结束线程 break; } } try { // 关闭多路复用器 selector.close(); } catch (IOException e) { e.printStackTrace(); } // 关闭线程池 USER_INPUT_HANDLER.shutdown(); }); } } catch (IOException e) { e.printStackTrace(); } } else if (selectionKey.isReadable()) { // 触发的是可读事件 // 获取到可读事件的通道 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //创建缓冲区 ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4); try { // 将通道上的数据写入缓冲区(返回0或者-1说明读到了末尾) while (socketChannel.read(byteBuffer) > 0) { } // 切换成读模式 byteBuffer.flip(); String message = String.valueOf(Constants.CHARSET.decode(byteBuffer)); byteBuffer.clear(); log.info("接收到数据:[{}]", message); if (StringUtils.isBlank(message)) { log.debug("服务器拉胯,下车..."); selector.close(); USER_INPUT_HANDLER.shutdownNow(); } } catch (IOException e) { e.printStackTrace(); } } } /** * 退出群聊 * * @param message 消息 * @param selector 多路复用器 * @param selectionKey 触发的selectionKey * @return 是否退出 */ public boolean quit(String message, Selector selector, SelectionKey selectionKey) { if (Constants.KEY_WORD_QUIT.equals(message)) { selectionKey.cancel(); selector.wakeup(); return true; } return false; } public static void main(String[] args) { NioChatClient nioChatClient = new NioChatClient(); nioChatClient.setUserName("小9"); nioChatClient.start(); } }
# 测试
- 接入
客户端发送消息
消息转发
客户端下线
服务器宕机
# 源代码
* https://github.com/FutaoSmile/learn-IO/tree/master/practice/src/main/java/com/futao/practice/chatroom/nio