Netty网络框架(一)

简介: Netty网络框架

Netty网络框架

(一) 基础篇

1、I/O基础

输入流:InputStream和Reader
输出流:OutputStream和Writer

               字节流                字符流

计算机最小的二进制单位   bit 比特    代表0和1
字节  1 byte = 8bit  计算机处理的最小单位
字符  1 char = 2byte = 16bit   人处理的最小单位

所以,字节流处理文件、图片、视频等二进制数据,而字符流处理文本数据。

2、Socket

原意是“插座”,在计算机领域中,翻译为“套接字”。
本质上,是计算机之间进行通信的一种方式。

Linux,“一切皆文件”,给每个文件映射一个ID,叫做"文件描述符"。
当处理网络连接时,也会看成一个文件,read/write变成和远程计算机的交互。

OSI七层模型 = Open System Interconnection 开放式系统互联
从下到上分别为:物理层、数据链路层、网络层、传输层、会话层、表示层和应用层。

实际应用的是优化后的TCP/IP模型(四层)
网络接口层/链路层、网络层、传输层、应用层

应用层协议:HTTP、FTP、SMTP(邮件协议)
传输层协议:TCP、UDP

Socket其实是应用层与传输层之间的抽象层,是一组接口。
在设计模式中,是门面模式。

3、NIO

BIO - BlockingIO  同步阻塞
NIO - New IO /  Non-Blocking IO  同步非阻塞
AIO - Asynchronous IO  异步非阻塞

同步和异步,关注的是消息通知的机制
阻塞和非阻塞,关注的是等待消息过程中的状态

多路复用的模型

三大元素:Channel 、Buffer、Selector

1)  Channel

FileChannel   文件管道的数据
Pipe.SinkChannel
Pipe.SourceChannel  线程间通信的管道
ServerSocketChannel
SocketChannel      用于TCP网络通信的管道
DatagramChannel   用于UDP网络通信的管道

2) Buffer

capacity  总体容量大小
limit   存储容量的大小,是可读写和不可读写的界线
position  已读容量的大小,已读和未读区域的界线

【使用原理】
a)  初始化,给定总容量,position=0, limit=capacity
b)  当使用put方法存入数据是,通过position来记录存储的容量变化,position不断后移,直到存储结束(写完成)
c)写完成需要调用flip方法刷新,limit=position,position=0
保障limit记录的是可读写区域的大小,position已读部分重置为空
d)  读数据直到读完成,需要调用clear方法,position=0, limit=capacity

3)  Selector

三个元素: Selector选择器、SelectableChannel可选择的通道、SelectionKey选择键

本质上,Selector是监听器,监听的是通道是否有我们关心的操作产生,操作对应的是事件(连接、接收、读/写),使用SelectionKey代表具体的事件,在确保通道是可选择的情况下,将通道注册进选择器中,此时Selector维护的是,通道和事件之间的关联关系。

Selector,管理被注册的通道集合,以及他们的状态
SelectableChannel,是一个抽象类,提供了通道可被选择需要实现的api。
FileChannel就不是可选择的,Socket相关的通道都是可选择的
一个通道可以被注册到多个选择器上吗?   可以的
多个通道可以注册到一个选择器上,但一个通道只能在一个选择器中注册一次

SelectionKey,封装了要监听的事件,连接、接收、读、写。
一方面,Selector关心通道要处理哪些事件
另一方面,当事件触发时,通道要处理哪些事件

【使用方式】

a、首先通过open方法,获取通道,将通道设置为非阻塞的
b、通过open方法,获取选择器,将通道注册进选择器中,伴随设置通道要处理的事件(OP_ACCEPT)
c、轮询选择器,当前是否有要处理的操作 select() > 0?
如果有,要获取,待处理操作的集合Set , 进行遍历
遍历到SelectionKey时,判断对应哪种操作,不同的操作设置不同的处理方式
如OP_ACCEPT,接收客户端通道并进行注册,监听后续处理的事件,如OP_WRITE
如OP_WRITE,通过key的方法获取通道本身,读取数据并继续监听事件,如OP_READ

4、零拷贝

需求:将磁盘中的文件读取出来,通过socket发送出去

传统的拷贝方式(4次)
Socket网络缓冲区,也属于操作系统的内核缓冲区。

在操作系统中进行的拷贝(如第二次和第三次),叫做CPU拷贝。
连接磁盘或网卡等硬件的拷贝(如第一次和第四次),叫做DMA拷贝。

零拷贝的概念:减少CPU拷贝的次数。

零拷贝是基于操作系统层面的优化方式(以下基于Linux系统)

1) mmap = memory mapping 内存映射

2)sendfile (linux2.1内核支持)

  1. sendfile with scatter/gather  copy(批量sendfile)
    从单个文件的处理,上升到多个物理地址的处理,提高处理速度

4)splice (拼接,在linux2.6内核支持)

   在操作系统内核缓冲区和Socket网络缓冲区之间建立管道,来减少拷贝次数。

线程模型

1) 单线程Reactor模型

顾名思义 就是使用一个线程来处理问题 线程中

  • selector
  • 事件处理 : 连接事件
  • 处理事件:handler

单线程服务器

public class ReactorServer {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    public ReactorServer() {
        try {
            // 初始化监听器 与 channel 通道
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            // 配置为非阻塞的
            serverSocketChannel.configureBlocking(false);
            // 配置通道连接地址 开放 9090 端口
            SocketAddress address = new InetSocketAddress(9090);
            serverSocketChannel.socket().bind(address);
            //将channel 注册到 selector监听通道事件  达到多路复用
            //首个注册事件一般都是 accept 连接事件
            SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            //    创建处理连接事件的 acceptor
            // 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生
            Acceptor acceptor = new Acceptor(selector, serverSocketChannel);
            //附加一个对象 用来处理事件
            key.attach(acceptor);
            while (true) {
                //返回事件的个数 处理事件
                int num = selector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = selector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //如有接收的时Accpet 事件 获取的就是Acceptor 事件
                    //如果接受的时读写事件 获取的就是 Handler 事件
                    Runnable runnable = (Runnable) key.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
    }
}

Accpetor 连接事件

public class Acceptor implements Runnable {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
        this.selector = selector;
        this.serverSocketChannel = serverSocketChannel;
    }
    @Override
    public void run() {
        try {
            //接受客户端传入的连接时 Socket Channel
            SocketChannel socketChannel = serverSocketChannel.accept();
            //设置异步
            socketChannel.configureBlocking(false);
            SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
            // 创造处理器 处理连接
            //单线程
            //Handler handler = new Handler(key);
            //多线程
            MultHandler handler = new MultHandler(key);
            handler.run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Handler 单线程处理

public class Handler implements Runnable {
    private SelectionKey key;
    private State state;
    public Handler(SelectionKey key) {
        this.key = key;
        this.state = State.READ;
    }
    @Override
    public void run() {
        //处理 读写操作,判断读写
        switch (state) {
            case READ:
                read();
                break;
            case WRITE:
                write();
                break;
            default:
                break;
        }
    }
    /*轮流处理末尾添加事件达到循环处理*/
    //处理 读方法
    private void read() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //    通过通道获取KEY
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            //将传入的数据写入到buffer中
            int num = channel.read(buffer);
            //    转化成String
            String msg = new String(buffer.array());
            //    增加业务处理
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_WRITE);
            this.state = State.WRITE;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //处理 写方法
    private void write() {
        ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
        try {
            //    通过通道获取KEY
            SocketChannel channel = (SocketChannel) key.channel();
            channel.write(buffer);
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_READ);
            this.state = State.READ;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //记录状态 非读即写
    private enum State {
        //读写事件
        READ, WRITE
    }
}
2)多线程Reactor模型

提高handler的处理效率,首先handler不再负责具体的业务逻辑,当读取出数据后,分发给子线程处理,子线程处理完成后再将结果返回给handler,handler再将结果返回给客户端。

多线程处理 (handler使用线程池)

public class MultHandler implements Runnable {
    private SelectionKey key;
    private State state;
    private ExecutorService pool;
    public MultHandler(SelectionKey key) {
        this.key = key;
        this.state = State.READ;
    }
    @Override
    public void run() {
        //处理 读写操作,判断读写
        switch (state) {
            case READ:
                //将最耗时的操作 放入线程池执行
                pool.execute(new Runnable() {
                    @Override
                    public void run() {
                        read();
                    }
                });
                break;
            case WRITE:
                write();
                break;
            default:
                break;
        }
    }
    /*轮流处理末尾添加事件达到循环处理*/
    //处理 读方法
    private void read() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //    通过通道获取KEY
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            //将传入的数据写入到buffer中
            int num = channel.read(buffer);
            //    转化成String
            String msg = new String(buffer.array());
            //    增加业务处理
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_WRITE);
            this.state = State.WRITE;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //处理 写方法
    private void write() {
        ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
        try {
            //    通过通道获取KEY
            SocketChannel channel = (SocketChannel) key.channel();
            channel.write(buffer);
            //    继续处理注册写事件
            key.interestOps(SelectionKey.OP_READ);
            this.state = State.READ;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //记录状态 非读即写
    private enum State {
        //读写事件
        READ, WRITE
    }
}
3)主从Reactor模型

mainReactor用来接收连接事件,然后分发给acceptor,acceptor在处理过程中,直接将后续的读写事件,注册到slaveReactor之中,以此来达到分流。

主从监听器

//主从模型
public class MultReactorServer {
    private Selector mainselector;
    private Selector slaveselector;
    private ServerSocketChannel serverSocketChannel;
    public MultReactorServer() {
        try {
            // 主 reactor 处理连接事件
            mainselector = Selector.open();
            //从reactor 处理读写事件
            slaveselector = Selector.open();
            // 配置为非阻塞的
            serverSocketChannel.configureBlocking(false);
            // 配置通道连接地址 开放 9090 端口
            SocketAddress address = new InetSocketAddress(9090);
            serverSocketChannel.socket().bind(address);
            //将channel 注册到 selector监听通道事件  达到多路复用
            //首个注册事件一般都是 accept 连接事件 (参数变化)
            SelectionKey key = serverSocketChannel.register(mainselector, SelectionKey.OP_ACCEPT);
            //    创建处理连接事件的 acceptor
            // 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生 (参数变化)
            Acceptor acceptor = new Acceptor(slaveselector, serverSocketChannel);
            //附加一个对象 用来处理事件
            key.attach(acceptor);
            //主从监听逻辑分离
            new HandlerLoop(slaveselector).run();
            while (true) {
                //返回事件的个数 处理事件
                int num = mainselector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = mainselector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //只处理主Reactor 只处理连接事件
                    Runnable runnable = (Runnable) key.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

主从事件处理读写分离

//用于处理从reactor事件监听
public class HandlerLoop implements Runnable {
    private Selector selector;
    public HandlerLoop(Selector selector) {
        this.selector = selector;
    }
    @Override
    public void run() {
        try {
            while (true) {
                //返回事件的个数 处理事件
                int num = selector.select();
                if (num == 0) {
                    continue;
                }
                //没有跳过就代表有事件需要处理,拿到事件集合
                Set<SelectionKey> SKeyset = selector.selectedKeys();
                Iterator<SelectionKey> iterator = SKeyset.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    //拿到事件的第一事情 移出事件 避免重复处理
                    iterator.remove();
                    //根据事件类型 分发 给监听器处理
                    //需要处理事情的时候 取出存储的对象
                    //只处理从reactor 所以 接受的一定是读写事件
                    Runnable runnable = (Runnable) selectionKey.attachment();
                    runnable.run();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


Netty网络框架(二)https://developer.aliyun.com/article/1469520

目录
相关文章
|
29天前
|
监控 安全
从 Racket 语言出发,创新员工网络监控软件的框架
在数字化企业环境中,员工网络监控软件对于保障信息安全和提升效率至关重要。Racket 语言凭借其独特特性和强大功能,为开发创新的监控软件提供了新可能。通过捕获和分析网络数据包、记录员工网络活动日志,甚至构建复杂的监控框架,Racket 能够满足企业的定制化需求,为企业信息安全和管理提供强有力支持。未来,基于 Racket 的创新解决方案将不断涌现。
36 6
|
4天前
|
数据采集 存储 JSON
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第27天】本文介绍了Python网络爬虫Scrapy框架的实战应用与技巧。首先讲解了如何创建Scrapy项目、定义爬虫、处理JSON响应、设置User-Agent和代理,以及存储爬取的数据。通过具体示例,帮助读者掌握Scrapy的核心功能和使用方法,提升数据采集效率。
28 6
|
12天前
|
机器学习/深度学习 人工智能
类人神经网络再进一步!DeepMind最新50页论文提出AligNet框架:用层次化视觉概念对齐人类
【10月更文挑战第18天】这篇论文提出了一种名为AligNet的框架,旨在通过将人类知识注入神经网络来解决其与人类认知的不匹配问题。AligNet通过训练教师模型模仿人类判断,并将人类化的结构和知识转移至预训练的视觉模型中,从而提高模型在多种任务上的泛化能力和稳健性。实验结果表明,人类对齐的模型在相似性任务和出分布情况下表现更佳。
25 3
|
1月前
|
安全 网络安全 区块链
网络安全与信息安全:构建数字世界的防线在当今数字化时代,网络安全已成为维护个人隐私、企业机密和国家安全的重要屏障。随着网络攻击手段的不断升级,从社交工程到先进的持续性威胁(APT),我们必须采取更加严密的防护措施。本文将深入探讨网络安全漏洞的形成原因、加密技术的应用以及提高公众安全意识的重要性,旨在为读者提供一个全面的网络安全知识框架。
在这个数字信息日益膨胀的时代,网络安全问题成为了每一个网民不可忽视的重大议题。从个人信息泄露到企业数据被盗,再到国家安全受到威胁,网络安全漏洞如同隐藏在暗处的“黑洞”,时刻准备吞噬掉我们的信息安全。而加密技术作为守护网络安全的重要工具之一,其重要性不言而喻。同时,提高公众的安全意识,也是防范网络风险的关键所在。本文将从网络安全漏洞的定义及成因出发,解析当前主流的加密技术,并强调提升安全意识的必要性,为读者提供一份详尽的网络安全指南。
|
2月前
|
存储 SQL 安全
网络安全与信息安全:守护数字世界的坚盾在这个高度数字化的时代,网络安全和信息安全已经成为个人、企业乃至国家安全的重要组成部分。本文将深入探讨网络安全漏洞、加密技术以及安全意识的重要性,旨在为读者提供一个全面的网络安全知识框架。
随着互联网技术的飞速发展,网络安全问题日益凸显。从个人信息泄露到企业数据被盗,再到国家安全受到威胁,网络安全事件层出不穷。本文将从网络安全漏洞的定义与分类入手,探讨常见的网络攻击手段;随后深入解析加密技术的原理及其在保护信息安全中的作用;最后强调提升公众与企业的安全意识的重要性,并提出具体的建议。通过综合运用这些知识点,我们可以更好地构建起一道道坚固的防线,守护我们的数字世界。
|
5天前
|
数据采集 前端开发 中间件
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第26天】Python是一种强大的编程语言,在数据抓取和网络爬虫领域应用广泛。Scrapy作为高效灵活的爬虫框架,为开发者提供了强大的工具集。本文通过实战案例,详细解析Scrapy框架的应用与技巧,并附上示例代码。文章介绍了Scrapy的基本概念、创建项目、编写简单爬虫、高级特性和技巧等内容。
21 4
|
5天前
|
网络协议 物联网 API
Python网络编程:Twisted框架的异步IO处理与实战
【10月更文挑战第26天】Python 是一门功能强大且易于学习的编程语言,Twisted 框架以其事件驱动和异步IO处理能力,在网络编程领域独树一帜。本文深入探讨 Twisted 的异步IO机制,并通过实战示例展示其强大功能。示例包括创建简单HTTP服务器,展示如何高效处理大量并发连接。
20 1
|
1月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
37 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
23天前
|
机器学习/深度学习 数据采集 算法
目标分类笔记(一): 利用包含多个网络多种训练策略的框架来完成多目标分类任务(从数据准备到训练测试部署的完整流程)
这篇博客文章介绍了如何使用包含多个网络和多种训练策略的框架来完成多目标分类任务,涵盖了从数据准备到训练、测试和部署的完整流程,并提供了相关代码和配置文件。
40 0
目标分类笔记(一): 利用包含多个网络多种训练策略的框架来完成多目标分类任务(从数据准备到训练测试部署的完整流程)
|
4天前
|
网络协议 调度 开发者
Python网络编程:Twisted框架的异步IO处理与实战
【10月更文挑战第27天】本文介绍了Python网络编程中的Twisted框架,重点讲解了其异步IO处理机制。通过反应器模式,Twisted能够在单线程中高效处理多个网络连接。文章提供了两个实战示例:一个简单的Echo服务器和一个HTTP服务器,展示了Twisted的强大功能和灵活性。
13 0