45 张图深度解析 Netty 架构与原理(四)

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 作为一个学 Java 的,如果没有研究过 Netty,那么你对 Java 语言的使用和理解仅仅停留在表面水平,会点 SSH 写几个 MVC,访问数据库和缓存,这些只是初等 Java 程序员干的事。如果你要进阶,想了解 Java 服务器的深层高阶知识,Netty 绝对是一个必须要过的门槛。 接下来我们会学习一个 Netty 系列教程,Netty 系列由「架构与原理」,「源码」,「架构」三部分组成,今天我们先来看看第一部分:Netty 架构与原理初探,大纲如下:

2.3. Netty 的模样

Netty 的设计主要基于主从 Reactor 多线程模式,并做了一定的改进。本节将使用一种渐进式的描述方式展示 Netty 的模样,即先给出 Netty 的简单版本,然后逐渐丰富其细节,直至展示出 Netty 的全貌。

简单版本的 Netty 的模样如下:

116.jpg070

关于这张图,作以下几点说明:

1)BossGroup 线程维护 Selector,ServerSocketChannel 注册到这个 Selector 上,只关注连接建立请求事件(相当于主 Reactor)。

2)当接收到来自客户端的连接建立请求事件的时候,通过 ServerSocketChannel.accept 方法获得对应的 SocketChannel,并封装成 NioSocketChannel 注册到 WorkerGroup 线程中的 Selector,每个 Selector 运行在一个线程中(相当于从 Reactor)。

3)当 WorkerGroup 线程中的 Selector 监听到自己感兴趣的 IO 事件后,就调用 Handler 进行处理。

我们给这简单版的 Netty 添加一些细节:

1177.jpg

关于这张图,作以下几点说明:

1)有两组线程池:BossGroup 和 WorkerGroup,BossGroup 中的线程(可以有多个,图中只画了一个)专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写。

2)BossGroup 和 WorkerGroup 含有多个不断循环的执行事件处理的线程,每个线程都包含一个 Selector,用于监听注册在其上的 Channel。

3)每个 BossGroup 中的线程循环执行以下三个步骤:

3.1)轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)

3.2)处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到 WorkerGroup 中某个线程上的 Selector 上

3.3)再去以此循环处理任务队列中的下一个事件

4)每个 WorkerGroup 中的线程循环执行以下三个步骤:

4.1)轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)

4.2)在对应的 NioSocketChannel 上处理 read/write 事件

4.3)再去以此循环处理任务队列中的下一个事件

我们再来看下终极版的 Netty 的模样,如下图所示(图片来源于网络):

118.jpg

关于这张图,作以下几点说明:

1)Netty 抽象出两组线程池:BossGroup 和 WorkerGroup,也可以叫做 BossNioEventLoopGroup 和 WorkerNioEventLoopGroup。每个线程池中都有 NioEventLoop 线程。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都是 NioEventLoopGroup。

2)NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就是一个 NioEventLoop。

3)NioEventLoop 表示一个不断循环的执行事件处理的线程,每个 NioEventLoop 都包含一个 Selector,用于监听注册在其上的 Socket 网络连接(Channel)。

4)NioEventLoopGroup 可以含有多个线程,即可以含有多个 NioEventLoop。

5)每个 BossNioEventLoop 中循环执行以下三个步骤:

5.1)select:轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)

5.2)processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到某个 WorkerNioEventLoop 上的 Selector 上

5.3)runAllTasks:再去以此循环处理任务队列中的其他任务

6)每个 WorkerNioEventLoop 中循环执行以下三个步骤:

6.1)select:轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)

6.2)processSelectedKeys:在对应的 NioSocketChannel 上处理 read/write 事件

6.3)runAllTasks:再去以此循环处理任务队列中的其他任务

7)在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了 Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)。这里暂时不详细展开讲解 Pipeline。

2.4. 基于 Netty 的 TCP Server/Client 案例

下面我们写点代码来加深理解 Netty 的模样。下面两段代码分别是基于 Netty 的 TCP Server 和 TCP Client。

服务端代码为:

/**
 * 需要的依赖:
 * <dependency>
 * <groupId>io.netty</groupId>
 * <artifactId>netty-all</artifactId>
 * <version>4.1.52.Final</version>
 * </dependency>
 */
public static void main(String[] args) throws InterruptedException {
    // 创建 BossGroup 和 WorkerGroup
    // 1. bossGroup 只处理连接请求
    // 2. 业务处理由 workerGroup 来完成
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        // 创建服务器端的启动对象
        ServerBootstrap bootstrap = new ServerBootstrap();
        // 配置参数
        bootstrap
                // 设置线程组
                .group(bossGroup, workerGroup)
                // 说明服务器端通道的实现类(便于 Netty 做反射处理)
                .channel(NioServerSocketChannel.class)
                // 设置等待连接的队列的容量(当客户端连接请求速率大
             // 于 NioServerSocketChannel 接收速率的时候,会使用
                // 该队列做缓冲)
                // option()方法用于给服务端的 ServerSocketChannel
                // 添加配置
                .option(ChannelOption.SO_BACKLOG, 128)
                // 设置连接保活
                // childOption()方法用于给服务端 ServerSocketChannel
                // 接收到的 SocketChannel 添加配置
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // handler()方法用于给 BossGroup 设置业务处理器
                // childHandler()方法用于给 WorkerGroup 设置业务处理器
                .childHandler(
                        // 创建一个通道初始化对象
                        new ChannelInitializer<SocketChannel>() {
                            // 向 Pipeline 添加业务处理器
                            @Override
                            protected void initChannel(
                                    SocketChannel socketChannel
                            ) throws Exception {
                                socketChannel.pipeline().addLast(
                                        new NettyServerHandler()
                                );
                                // 可以继续调用 socketChannel.pipeline().addLast()
                                // 添加更多 Handler
                            }
                        }
                );
        System.out.println("server is ready...");
        // 绑定端口,启动服务器,生成一个 channelFuture 对象,
        // ChannelFuture 涉及到 Netty 的异步模型,后面展开讲
        ChannelFuture channelFuture = bootstrap.bind(8080).sync();
        // 对通道关闭进行监听
        channelFuture.channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}
/**
 * 自定义一个 Handler,需要继承 Netty 规定好的某个 HandlerAdapter(规范)
 * InboundHandler 用于处理数据流入本端(服务端)的 IO 事件
 * InboundHandler 用于处理数据流出本端(服务端)的 IO 事件
 */
static class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 当通道有数据可读时执行
     *
     * @param ctx 上下文对象,可以从中取得相关联的 Pipeline、Channel、客户端地址等
     * @param msg 客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        // 接收客户端发来的数据
        System.out.println("client address: "
                + ctx.channel().remoteAddress());
        // ByteBuf 是 Netty 提供的类,比 NIO 的 ByteBuffer 性能更高
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("data from client: "
                + byteBuf.toString(CharsetUtil.UTF_8));
    }
    /**
     * 数据读取完毕后执行
     *
     * @param ctx 上下文对象
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {
        // 发送响应给客户端
        ctx.writeAndFlush(
                // Unpooled 类是 Netty 提供的专门操作缓冲区的工具
                // 类,copiedBuffer 方法返回的 ByteBuf 对象类似于
                // NIO 中的 ByteBuffer,但性能更高
                Unpooled.copiedBuffer(
                        "hello client! i have got your data.",
                        CharsetUtil.UTF_8
                )
        );
    }
    /**
     * 发生异常时执行
     *
     * @param ctx   上下文对象
     * @param cause 异常对象
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        // 关闭与客户端的 Socket 连接
        ctx.channel().close();
    }
}

客户端端代码为:

/**
 * 需要的依赖:
 * <dependency>
 * <groupId>io.netty</groupId>
 * <artifactId>netty-all</artifactId>
 * <version>4.1.52.Final</version>
 * </dependency>
 */
public static void main(String[] args) throws InterruptedException {
    // 客户端只需要一个事件循环组,可以看做 BossGroup
    EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    try {
        // 创建客户端的启动对象
        Bootstrap bootstrap = new Bootstrap();
        // 配置参数
        bootstrap
                // 设置线程组
                .group(eventLoopGroup)
                // 说明客户端通道的实现类(便于 Netty 做反射处理)
                .channel(NioSocketChannel.class)
                // handler()方法用于给 BossGroup 设置业务处理器
                .handler(
                        // 创建一个通道初始化对象
                        new ChannelInitializer<SocketChannel>() {
                            // 向 Pipeline 添加业务处理器
                            @Override
                            protected void initChannel(
                                    SocketChannel socketChannel
                            ) throws Exception {
                                socketChannel.pipeline().addLast(
                                        new NettyClientHandler()
                                );
                                // 可以继续调用 socketChannel.pipeline().addLast()
                                // 添加更多 Handler
                            }
                        }
                );
        System.out.println("client is ready...");
        // 启动客户端去连接服务器端,ChannelFuture 涉及到 Netty 的异步模型,后面展开讲
        ChannelFuture channelFuture = bootstrap.connect(
                "127.0.0.1",
                8080).sync();
        // 对通道关闭进行监听
        channelFuture.channel().closeFuture().sync();
    } finally {
        eventLoopGroup.shutdownGracefully();
    }
}
/**
 * 自定义一个 Handler,需要继承 Netty 规定好的某个 HandlerAdapter(规范)
 * InboundHandler 用于处理数据流入本端(客户端)的 IO 事件
 * InboundHandler 用于处理数据流出本端(客户端)的 IO 事件
 */
static class NettyClientHandler extends ChannelInboundHandlerAdapter {
    /**
     * 通道就绪时执行
     *
     * @param ctx 上下文对象
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx)
            throws Exception {
        // 向服务器发送数据
        ctx.writeAndFlush(
                // Unpooled 类是 Netty 提供的专门操作缓冲区的工具
                // 类,copiedBuffer 方法返回的 ByteBuf 对象类似于
                // NIO 中的 ByteBuffer,但性能更高
                Unpooled.copiedBuffer(
                        "hello server!",
                        CharsetUtil.UTF_8
                )
        );
    }
    /**
     * 当通道有数据可读时执行
     *
     * @param ctx 上下文对象
     * @param msg 服务器端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        // 接收服务器端发来的数据
        System.out.println("server address: "
                + ctx.channel().remoteAddress());
        // ByteBuf 是 Netty 提供的类,比 NIO 的 ByteBuffer 性能更高
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("data from server: "
                + byteBuf.toString(CharsetUtil.UTF_8));
    }
    /**
     * 发生异常时执行
     *
     * @param ctx   上下文对象
     * @param cause 异常对象
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        // 关闭与服务器端的 Socket 连接
        ctx.channel().close();
    }
}

什么?你觉得使用 Netty 编程难度和工作量更大了?不会吧不会吧,你要知道,你通过这么两段简短的代码得到了一个基于主从 Reactor 多线程模式的服务器,一个高吞吐量和并发量的服务器,一个异步处理服务器……你还要怎样?

对上面的两段代码,作以下简单说明:

1)Bootstrap 和 ServerBootstrap 分别是客户端和服务器端的引导类,一个 Netty 应用程序通常由一个引导类开始,主要是用来配置整个 Netty 程序、设置业务处理类(Handler)、绑定端口、发起连接等。

2)客户端创建一个 NioSocketChannel 作为客户端通道,去连接服务器。

3)服务端首先创建一个 NioServerSocketChannel 作为服务器端通道,每当接收一个客户端连接就产生一个 NioSocketChannel 应对该客户端。

4)使用 Channel 构建网络 IO 程序的时候,不同的协议、不同的阻塞类型和 Netty 中不同的 Channel 对应,常用的 Channel 有:

  • NioSocketChannel:非阻塞的 TCP 客户端 Channel(本案例的客户端使用的 Channel)
  • NioServerSocketChannel:非阻塞的 TCP 服务器端 Channel(本案例的服务器端使用的 Channel)
  • NioDatagramChannel:非阻塞的 UDP Channel
  • NioSctpChannel:非阻塞的 SCTP 客户端 Channel
  • NioSctpServerChannel:非阻塞的 SCTP 服务器端 Channel
    ......

启动服务端和客户端代码,调试以上的服务端代码,发现:

1)默认情况下 BossGroup 和 WorkerGroup 都包含 16 个线程(NioEventLoop),这是因为我的 PC 是 8 核的 NioEventLoop 的数量=coreNum*2。这 16 个线程相当于主 Reactor。

119.jpg120.jpg121.jpg

其实创建 BossGroup 和 WorkerGroup 的时候可以指定 NioEventLoop 数量,如下:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(16);

这样就能更好地分配线程资源。

2)每一个 NioEventLoop 包含如下的属性(比如自己的 Selector、任务队列、执行器等):

122.jpg

3)将代码断在服务端的 NettyServerHandler.channelRead 上:

123.jpg

可以看到 ctx 中包含的属性如下:

124.jpg

可以看到:

  • 当前 ChannelHandlerContext ctx 是位于 ChannelHandlerContext 责任链中的一环,可以看到其 next、prev 属性
  • 当前 ChannelHandlerContext ctx 包含一个 Handler
  • 当前 ChannelHandlerContext ctx 包含一个 Pipeline
  • Pipeline 本质上是一个双向循环列表,可以看到其 tail、head 属性
  • Pipeline 中包含一个 Channel,Channel 中又包含了该 Pipeline,两者互相引用
    ……

从下一节开始,我将深入剖析以上两段代码,向读者展示 Netty 的更多细节。

相关文章
|
1月前
|
存储 缓存 算法
HashMap深度解析:从原理到实战
HashMap,作为Java集合框架中的一个核心组件,以其高效的键值对存储和检索机制,在软件开发中扮演着举足轻重的角色。作为一名资深的AI工程师,深入理解HashMap的原理、历史、业务场景以及实战应用,对于提升数据处理和算法实现的效率至关重要。本文将通过手绘结构图、流程图,结合Java代码示例,全方位解析HashMap,帮助读者从理论到实践全面掌握这一关键技术。
88 13
|
8天前
|
机器学习/深度学习 自然语言处理 搜索推荐
自注意力机制全解析:从原理到计算细节,一文尽览!
自注意力机制(Self-Attention)最早可追溯至20世纪70年代的神经网络研究,但直到2017年Google Brain团队提出Transformer架构后才广泛应用于深度学习。它通过计算序列内部元素间的相关性,捕捉复杂依赖关系,并支持并行化训练,显著提升了处理长文本和序列数据的能力。相比传统的RNN、LSTM和GRU,自注意力机制在自然语言处理(NLP)、计算机视觉、语音识别及推荐系统等领域展现出卓越性能。其核心步骤包括生成查询(Q)、键(K)和值(V)向量,计算缩放点积注意力得分,应用Softmax归一化,以及加权求和生成输出。自注意力机制提高了模型的表达能力,带来了更精准的服务。
|
1月前
|
运维 监控 持续交付
微服务架构解析:跨越传统架构的技术革命
微服务架构(Microservices Architecture)是一种软件架构风格,它将一个大型的单体应用拆分为多个小而独立的服务,每个服务都可以独立开发、部署和扩展。
317 36
微服务架构解析:跨越传统架构的技术革命
|
19天前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
74 14
|
27天前
|
网络协议 安全 网络安全
探索网络模型与协议:从OSI到HTTPs的原理解析
OSI七层网络模型和TCP/IP四层模型是理解和设计计算机网络的框架。OSI模型包括物理层、数据链路层、网络层、传输层、会话层、表示层和应用层,而TCP/IP模型则简化为链路层、网络层、传输层和 HTTPS协议基于HTTP并通过TLS/SSL加密数据,确保安全传输。其连接过程涉及TCP三次握手、SSL证书验证、对称密钥交换等步骤,以保障通信的安全性和完整性。数字信封技术使用非对称加密和数字证书确保数据的机密性和身份认证。 浏览器通过Https访问网站的过程包括输入网址、DNS解析、建立TCP连接、发送HTTPS请求、接收响应、验证证书和解析网页内容等步骤,确保用户与服务器之间的安全通信。
102 1
|
1月前
|
存储 Linux API
深入探索Android系统架构:从内核到应用层的全面解析
本文旨在为读者提供一份详尽的Android系统架构分析,从底层的Linux内核到顶层的应用程序框架。我们将探讨Android系统的模块化设计、各层之间的交互机制以及它们如何共同协作以支持丰富多样的应用生态。通过本篇文章,开发者和爱好者可以更深入理解Android平台的工作原理,从而优化开发流程和提升应用性能。
|
2月前
|
SQL 数据可视化 数据库
多维度解析低代码:从技术架构到插件生态
本文深入解析低代码平台,从技术架构到插件生态,探讨其在企业数字化转型中的作用。低代码平台通过图形化界面和模块化设计降低开发门槛,加速应用开发与部署,提高市场响应速度。文章重点分析开源低代码平台的优势,如透明架构、兼容性与扩展性、可定制化开发等,并详细介绍了核心技术架构、数据处理与功能模块、插件生态及数据可视化等方面,展示了低代码平台如何支持企业在数字化转型中实现更高灵活性和创新。
60 1
|
1月前
|
弹性计算 API 持续交付
后端服务架构的微服务化转型
本文旨在探讨后端服务从单体架构向微服务架构转型的过程,分析微服务架构的优势和面临的挑战。文章首先介绍单体架构的局限性,然后详细阐述微服务架构的核心概念及其在现代软件开发中的应用。通过对比两种架构,指出微服务化转型的必要性和实施策略。最后,讨论了微服务架构实施过程中可能遇到的问题及解决方案。
|
2月前
|
Cloud Native Devops 云计算
云计算的未来:云原生架构与微服务的革命####
【10月更文挑战第21天】 随着企业数字化转型的加速,云原生技术正迅速成为IT行业的新宠。本文深入探讨了云原生架构的核心理念、关键技术如容器化和微服务的优势,以及如何通过这些技术实现高效、灵活且可扩展的现代应用开发。我们将揭示云原生如何重塑软件开发流程,提升业务敏捷性,并探索其对企业IT架构的深远影响。 ####
64 3
|
2月前
|
Cloud Native 安全 数据安全/隐私保护
云原生架构下的微服务治理与挑战####
随着云计算技术的飞速发展,云原生架构以其高效、灵活、可扩展的特性成为现代企业IT架构的首选。本文聚焦于云原生环境下的微服务治理问题,探讨其在促进业务敏捷性的同时所面临的挑战及应对策略。通过分析微服务拆分、服务间通信、故障隔离与恢复等关键环节,本文旨在为读者提供一个关于如何在云原生环境中有效实施微服务治理的全面视角,助力企业在数字化转型的道路上稳健前行。 ####

热门文章

最新文章

推荐镜像

更多