【NIO】NIO版本的鸿儒聊天室

简介: 【NIO】NIO版本的鸿儒聊天室

# 需求


  • 基于NIO实现
  • 支持同时多个客户端接入
  • 支持客户端发送文本消息到服务器
  • 支持客户端自定义群聊名称
  • 接收到客户端发送的消息之后,服务器需要将消息转发给目前在线的所有其他客户端
  • 支持客户端退出群聊
  • 服务端停止服务后,客户端自动断开连接


# 技术介绍



  • Non-blockingI/O 编程模型
  • Channel 通道
  • ServerSocketChannel 服务端通道
  • SocketChannel  客户端通道
  • ByteBuffer NIO中使用的读写缓冲区
  • Selector 多路复用器
  • channel注册在多路复用器上,并监听相应的事件
  • 多线程
  • 线程池


# 代码



温馨提示:注意看代码注释哟~  跟上节奏,很简单😼

  • 服务器

/**
 * 基于NIO实现的聊天室服务端
 *
 * @author futao
 * @date 2020/7/8
 */
@Slf4j
public class NioChatServer {
    /**
     * 用于处理通道上的事件的线程池(可选的)
     */
    private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);
    /**
     * 启动聊天室
     */
    public void start() {
        try {
            //服务端Socket通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //将通道设置成非阻塞
            serverSocketChannel.configureBlocking(false);
            //绑定主机与监听端口
            serverSocketChannel.bind(new InetSocketAddress("localhost", Constants.SERVER_PORT));
            //多路复用器
            Selector selector = Selector.open();
            //将服务端通道注册到多路复用器上,并设置监听事件接入事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            log.debug("{} 基于NIO的聊天室在[{}]端口启动成功 {}", StringUtils.repeat("=", 30), Constants.SERVER_PORT, StringUtils.repeat("=", 30));
            while (true) {
                // 触发了事件的通道数量,该方法会阻塞
                int eventCountTriggered = selector.select();
                if (eventCountTriggered <= 0) {
                    continue;
                }
                // 获取到所有触发的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                // 遍历事件进行处理
                for (SelectionKey selectionKey : selectionKeys) {
                    // 处理事件
                    selectionKeyHandler(selectionKey, selector);
                }
                // 清除事件记录
                selectionKeys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 事件处理器
     *
     * @param selectionKey 触发的事件信息
     * @param selector     多路复用器
     */
    private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) {
        if (selectionKey.isAcceptable()) {
            //如果触发的是SocketChannel接入事件
            try {
                // ServerSocketChannel上触发的客户端SocketChannel接入
                SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
                log.debug("客户端[{}]成功接入聊天服务器", socketChannel.socket().getPort());
                // 将客户端SocketChannel通道设置成非阻塞
                socketChannel.configureBlocking(false);
                // 将客户端通道注册到多路复用器,并监听这个通道上发生的可读事件
                socketChannel.register(selector, SelectionKey.OP_READ);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else if (selectionKey.isReadable()) {
            // 触发的是可读事件
            // 获取到可读事件的客户端通道
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            //创建缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
            try {
                // 读取通道上的数据写入缓冲区(返回0或者-1说明读到了末尾)
                while (socketChannel.read(byteBuffer) > 0) {
                }
                //切换为读模式
                byteBuffer.flip();
                // 接收到的消息
                String message = String.valueOf(Constants.CHARSET.decode(byteBuffer));
                log.info("接收到来自客户端[{}]的数据:[{}]", socketChannel.socket().getPort(), message);
                // 是否退出
                quit(message, selector, selectionKey);
                // 消息转发
                forwardMessage(message, selector, selectionKey);
                // 清除缓冲区的数据
                byteBuffer.clear();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 客户端退出
     *
     * @param message      消息
     * @param selector     多路复用器
     * @param selectionKey 触发的selectionKey
     */
    public void quit(String message, Selector selector, SelectionKey selectionKey) {
        if (StringUtils.isBlank(message) || Constants.KEY_WORD_QUIT.equals(message)) {
            int port = ((SocketChannel) selectionKey.channel()).socket().getPort();
            // 客户端下线
            selectionKey.cancel();
            log.debug("客户端[{}]下线", port);
            // 因为发生了监听事件和channel的变更,所以需要通知selector重新整理selector所监听的事件
            selector.wakeup();
        }
    }
    /**
     * 转发消息
     *
     * @param message         需要转发的消息
     * @param selector        多路复用器
     * @param curSelectionKey 当前触发的selectionKey
     */
    public void forwardMessage(String message, Selector selector, SelectionKey curSelectionKey) {
        // 创建缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
        // 数据写入缓冲区
        byteBuffer.put(message.getBytes(Constants.CHARSET));
        // 切换为读模式
        byteBuffer.flip();
        // 在首尾进行标记,因为需要给每个客户端发送同样的数据,需要重复读取
        byteBuffer.mark();
        // 当前注册在多路复用器上的SelectionKey集合
        Set<SelectionKey> keys = selector.keys();
        for (SelectionKey key : keys) {
            // 消息不能转发给自己 and 只转发给客户端SocketChannel
            if (curSelectionKey.equals(key) || !(key.channel() instanceof SocketChannel)) {
                continue;
            }
            // 客户端SocketChannel
            SocketChannel socketChannel = (SocketChannel) key.channel();
            // 如果缓冲区中还有数据就一直写
            while (byteBuffer.hasRemaining()) {
                try {
                    // 数据写入通道
                    socketChannel.write(byteBuffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            // 重置到上次mark的地方,即首位
            byteBuffer.reset();
        }
        // 清除缓冲区的数据
        byteBuffer.clear();
    }
    public static void main(String[] args) {
        new NioChatServer().start();
    }
}
  • 客户端

/**
 * 基于NIO实现的群聊客户端
 *
 * @author futao
 * @date 2020/7/8
 */
@Getter
@Setter
@Slf4j
public class NioChatClient {
    /**
     * 用于处理用户输入数据的单个线程线程池,使用线程池是为了便于关闭
     */
    private static final ExecutorService USER_INPUT_HANDLER = Executors.newSingleThreadExecutor();
    /**
     * 用户名
     */
    private String userName;
    /**
     * 启动客户端
     */
    public void start() {
        try {
            // 创建客户端通道
            SocketChannel socketChannel = SocketChannel.open();
            // 将通道设置为非阻塞
            socketChannel.configureBlocking(false);
            // 创建多路复用器
            Selector selector = Selector.open();
            // 将客户端通道注册到多路复用器,并监听可读事件
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            // 尝试连接到聊天服务器
            socketChannel.connect(new InetSocketAddress("localhost", Constants.SERVER_PORT));
            while (true) {
                // 阻塞等待通道上的事件触发。返回触发的通道的数量
                int eventCountTriggered = selector.select();
                if (eventCountTriggered <= 0) {
                    continue;
                }
                // 获取到所有触发的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                // 遍历事件进行处理
                for (SelectionKey selectionKey : selectionKeys) {
                    // 处理事件
                    selectionKeyHandler(selectionKey, selector);
                }
                // 清除事件记录
                selectionKeys.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClosedSelectorException e) {
            log.debug("成功退出聊天室...");
        }
    }
    /**
     * 处理器
     *
     * @param selectionKey 触发的selectionKey
     * @param selector     多路复用器
     */
    private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) {
        if (selectionKey.isConnectable()) {
            // 触发的是成功接入服务器的事件
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            try {
                // 判断此通道上的连接操作是否正在进行中
                if (socketChannel.isConnectionPending()) {
                    // 完成连接套接字通道的过程
                    socketChannel.finishConnect();
                    log.debug("成功接入聊天服务器");
                    // 将通道设置成非阻塞
                    socketChannel.configureBlocking(false);
                    // 将通道注册到多路复用器,并监听可读事件
                    socketChannel.register(selector, SelectionKey.OP_READ);
                    // 创建缓冲区,用于处理将用户输入的数据写入通道
                    ByteBuffer byteBuffer = ByteBuffer.allocate(4 * 1024);
                    // 在新线程中处理用户输入
                    USER_INPUT_HANDLER.execute(() -> {
                        while (!Thread.currentThread().isInterrupted()) {
                            //先清空缓冲区中的数据
                            byteBuffer.clear();
                            // 获取用户输入的文本
                            String message = new Scanner(System.in).nextLine();
                            // 将数据写入缓冲区
                            byteBuffer.put(String.format("【%s】: %s", userName, message).getBytes(Constants.CHARSET));
                            // 将缓冲区设置为读模式
                            byteBuffer.flip();
                            try {
                                // 当缓冲区中还有数据
                                while (byteBuffer.hasRemaining()) {
                                    // 将数据写入通道
                                    socketChannel.write(byteBuffer);
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            // 判断是否退出群聊
                            if (quit(message, selector, selectionKey)) {
                                // 跳出循环,结束线程
                                break;
                            }
                        }
                        try {
                            // 关闭多路复用器
                            selector.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        // 关闭线程池
                        USER_INPUT_HANDLER.shutdown();
                    });
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else if (selectionKey.isReadable()) {
            // 触发的是可读事件
            // 获取到可读事件的通道
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            //创建缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
            try {
                // 将通道上的数据写入缓冲区(返回0或者-1说明读到了末尾)
                while (socketChannel.read(byteBuffer) > 0) {
                }
                // 切换成读模式
                byteBuffer.flip();
                String message = String.valueOf(Constants.CHARSET.decode(byteBuffer));
                byteBuffer.clear();
                log.info("接收到数据:[{}]", message);
                if (StringUtils.isBlank(message)) {
                    log.debug("服务器拉胯,下车...");
                    selector.close();
                    USER_INPUT_HANDLER.shutdownNow();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 退出群聊
     *
     * @param message      消息
     * @param selector     多路复用器
     * @param selectionKey 触发的selectionKey
     * @return 是否退出
     */
    public boolean quit(String message, Selector selector, SelectionKey selectionKey) {
        if (Constants.KEY_WORD_QUIT.equals(message)) {
            selectionKey.cancel();
            selector.wakeup();
            return true;
        }
        return false;
    }
    public static void main(String[] args) {
        NioChatClient nioChatClient = new NioChatClient();
        nioChatClient.setUserName("小9");
        nioChatClient.start();
    }
}


# 测试



  • 接入


image.png


客户端发送消息


image.png

消息转发

image.png

image.png

客户端下线

image.png


服务器宕机


image.png

# 源代码


* https://github.com/FutaoSmile/learn-IO/tree/master/practice/src/main/java/com/futao/practice/chatroom/nio

# 系列文章


相关文章
|
缓存 Java C语言
聊聊java中NIO的2.0版本AIO
在2011年7月28日,jdk1.7被正式发布。他的一个最大的亮点就是将原来的NIO类库生成到了NIO2.0,也被叫做AIO。这篇文章将通过案例对AIO进行一个讲解。
317 0
聊聊java中NIO的2.0版本AIO
|
缓存 网络协议 安全
【Netty】Netty 简介 ( 原生 NIO 弊端 | Netty 框架 | Netty 版本 | 线程模型 | 线程 阻塞 IO 模型 | Reactor 模式引入 )
【Netty】Netty 简介 ( 原生 NIO 弊端 | Netty 框架 | Netty 版本 | 线程模型 | 线程 阻塞 IO 模型 | Reactor 模式引入 )
231 0
【Netty】Netty 简介 ( 原生 NIO 弊端 | Netty 框架 | Netty 版本 | 线程模型 | 线程 阻塞 IO 模型 | Reactor 模式引入 )
【Netty】NIO 网络编程 聊天室案例(三)
【Netty】NIO 网络编程 聊天室案例(三)
158 0
【Netty】NIO 网络编程 聊天室案例(三)
|
Java
Java NIO 中的 Path 、Files 和 AsychronousFileChannel (附多人聊天室内代码)(下)
Java NIO 中的 Path 、Files 和 AsychronousFileChannel (附多人聊天室内代码)
133 0
|
Java
Java NIO 中的 Path 、Files 和 AsychronousFileChannel (附多人聊天室内代码)(中)
Java NIO 中的 Path 、Files 和 AsychronousFileChannel (附多人聊天室内代码)
139 0
|
Java 编译器 Linux
Java NIO 中的 Path 、Files 和 AsychronousFileChannel (附多人聊天室内代码)(上)
Java NIO 中的 Path 、Files 和 AsychronousFileChannel (附多人聊天室内代码)
210 0
【Netty】NIO 网络编程 聊天室案例(二)
【Netty】NIO 网络编程 聊天室案例(二)
156 0
【Netty】NIO 网络编程 聊天室案例(一)
【Netty】NIO 网络编程 聊天室案例(一)
163 0