Netty网络框架
(一) 基础篇
1、I/O基础
输入流:InputStream和Reader
输出流:OutputStream和Writer
字节流 字符流
计算机最小的二进制单位 bit 比特 代表0和1
字节 1 byte = 8bit 计算机处理的最小单位
字符 1 char = 2byte = 16bit 人处理的最小单位
所以,字节流处理文件、图片、视频等二进制数据,而字符流处理文本数据。
2、Socket
原意是“插座”,在计算机领域中,翻译为“套接字”。
本质上,是计算机之间进行通信的一种方式。
Linux,“一切皆文件”,给每个文件映射一个ID,叫做"文件描述符"。
当处理网络连接时,也会看成一个文件,read/write变成和远程计算机的交互。
OSI七层模型 = Open System Interconnection 开放式系统互联
从下到上分别为:物理层、数据链路层、网络层、传输层、会话层、表示层和应用层。
实际应用的是优化后的TCP/IP模型(四层)
网络接口层/链路层、网络层、传输层、应用层
应用层协议:HTTP、FTP、SMTP(邮件协议)
传输层协议:TCP、UDP
Socket其实是应用层与传输层之间的抽象层,是一组接口。
在设计模式中,是门面模式。
3、NIO
BIO - BlockingIO 同步阻塞
NIO - New IO / Non-Blocking IO 同步非阻塞
AIO - Asynchronous IO 异步非阻塞
同步和异步,关注的是消息通知的机制
阻塞和非阻塞,关注的是等待消息过程中的状态
多路复用的模型
三大元素:Channel 、Buffer、Selector
1) Channel
FileChannel 文件管道的数据
Pipe.SinkChannel
Pipe.SourceChannel 线程间通信的管道
ServerSocketChannel
SocketChannel 用于TCP网络通信的管道
DatagramChannel 用于UDP网络通信的管道
2) Buffer
capacity 总体容量大小
limit 存储容量的大小,是可读写和不可读写的界线
position 已读容量的大小,已读和未读区域的界线
【使用原理】
a) 初始化,给定总容量,position=0, limit=capacity
b) 当使用put方法存入数据是,通过position来记录存储的容量变化,position不断后移,直到存储结束(写完成)
c)写完成需要调用flip方法刷新,limit=position,position=0
保障limit记录的是可读写区域的大小,position已读部分重置为空
d) 读数据直到读完成,需要调用clear方法,position=0, limit=capacity
3) Selector
三个元素: Selector选择器、SelectableChannel可选择的通道、SelectionKey选择键
本质上,Selector是监听器,监听的是通道是否有我们关心的操作产生,操作对应的是事件(连接、接收、读/写),使用SelectionKey代表具体的事件,在确保通道是可选择的情况下,将通道注册进选择器中,此时Selector维护的是,通道和事件之间的关联关系。
Selector,管理被注册的通道集合,以及他们的状态
SelectableChannel,是一个抽象类,提供了通道可被选择需要实现的api。
FileChannel就不是可选择的,Socket相关的通道都是可选择的
一个通道可以被注册到多个选择器上吗? 可以的
多个通道可以注册到一个选择器上,但一个通道只能在一个选择器中注册一次
SelectionKey,封装了要监听的事件,连接、接收、读、写。
一方面,Selector关心通道要处理哪些事件
另一方面,当事件触发时,通道要处理哪些事件
【使用方式】
a、首先通过open方法,获取通道,将通道设置为非阻塞的
b、通过open方法,获取选择器,将通道注册进选择器中,伴随设置通道要处理的事件(OP_ACCEPT)
c、轮询选择器,当前是否有要处理的操作 select() > 0?
如果有,要获取,待处理操作的集合Set , 进行遍历
遍历到SelectionKey时,判断对应哪种操作,不同的操作设置不同的处理方式
如OP_ACCEPT,接收客户端通道并进行注册,监听后续处理的事件,如OP_WRITE
如OP_WRITE,通过key的方法获取通道本身,读取数据并继续监听事件,如OP_READ
4、零拷贝
需求:将磁盘中的文件读取出来,通过socket发送出去
传统的拷贝方式(4次)
Socket网络缓冲区,也属于操作系统的内核缓冲区。
在操作系统中进行的拷贝(如第二次和第三次),叫做CPU拷贝。
连接磁盘或网卡等硬件的拷贝(如第一次和第四次),叫做DMA拷贝。
零拷贝的概念:减少CPU拷贝的次数。
零拷贝是基于操作系统层面的优化方式(以下基于Linux系统)
1) mmap = memory mapping 内存映射
2)sendfile (linux2.1内核支持)
- sendfile with scatter/gather copy(批量sendfile)
从单个文件的处理,上升到多个物理地址的处理,提高处理速度
4)splice (拼接,在linux2.6内核支持)
在操作系统内核缓冲区和Socket网络缓冲区之间建立管道,来减少拷贝次数。
线程模型
1) 单线程Reactor模型
顾名思义 就是使用一个线程来处理问题 线程中
- selector
- 事件处理 : 连接事件
- 处理事件:handler
单线程服务器
public class ReactorServer { private Selector selector; private ServerSocketChannel serverSocketChannel; public ReactorServer() { try { // 初始化监听器 与 channel 通道 selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); // 配置为非阻塞的 serverSocketChannel.configureBlocking(false); // 配置通道连接地址 开放 9090 端口 SocketAddress address = new InetSocketAddress(9090); serverSocketChannel.socket().bind(address); //将channel 注册到 selector监听通道事件 达到多路复用 //首个注册事件一般都是 accept 连接事件 SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 创建处理连接事件的 acceptor // 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生 Acceptor acceptor = new Acceptor(selector, serverSocketChannel); //附加一个对象 用来处理事件 key.attach(acceptor); while (true) { //返回事件的个数 处理事件 int num = selector.select(); if (num == 0) { continue; } //没有跳过就代表有事件需要处理,拿到事件集合 Set<SelectionKey> SKeyset = selector.selectedKeys(); Iterator<SelectionKey> iterator = SKeyset.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); //拿到事件的第一事情 移出事件 避免重复处理 iterator.remove(); //根据事件类型 分发 给监听器处理 //需要处理事情的时候 取出存储的对象 //如有接收的时Accpet 事件 获取的就是Acceptor 事件 //如果接受的时读写事件 获取的就是 Handler 事件 Runnable runnable = (Runnable) key.attachment(); runnable.run(); } } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { } }
Accpetor 连接事件
public class Acceptor implements Runnable { private Selector selector; private ServerSocketChannel serverSocketChannel; public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) { this.selector = selector; this.serverSocketChannel = serverSocketChannel; } @Override public void run() { try { //接受客户端传入的连接时 Socket Channel SocketChannel socketChannel = serverSocketChannel.accept(); //设置异步 socketChannel.configureBlocking(false); SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ); // 创造处理器 处理连接 //单线程 //Handler handler = new Handler(key); //多线程 MultHandler handler = new MultHandler(key); handler.run(); } catch (Exception e) { e.printStackTrace(); } } }
Handler 单线程处理
public class Handler implements Runnable { private SelectionKey key; private State state; public Handler(SelectionKey key) { this.key = key; this.state = State.READ; } @Override public void run() { //处理 读写操作,判断读写 switch (state) { case READ: read(); break; case WRITE: write(); break; default: break; } } /*轮流处理末尾添加事件达到循环处理*/ //处理 读方法 private void read() { ByteBuffer buffer = ByteBuffer.allocate(1024); // 通过通道获取KEY SocketChannel channel = (SocketChannel) key.channel(); try { //将传入的数据写入到buffer中 int num = channel.read(buffer); // 转化成String String msg = new String(buffer.array()); // 增加业务处理 // 继续处理注册写事件 key.interestOps(SelectionKey.OP_WRITE); this.state = State.WRITE; } catch (Exception e) { e.printStackTrace(); } } //处理 写方法 private void write() { ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes()); try { // 通过通道获取KEY SocketChannel channel = (SocketChannel) key.channel(); channel.write(buffer); // 继续处理注册写事件 key.interestOps(SelectionKey.OP_READ); this.state = State.READ; } catch (Exception e) { e.printStackTrace(); } } //记录状态 非读即写 private enum State { //读写事件 READ, WRITE } }
2)多线程Reactor模型
提高handler的处理效率,首先handler不再负责具体的业务逻辑,当读取出数据后,分发给子线程处理,子线程处理完成后再将结果返回给handler,handler再将结果返回给客户端。
多线程处理 (handler使用线程池)
public class MultHandler implements Runnable { private SelectionKey key; private State state; private ExecutorService pool; public MultHandler(SelectionKey key) { this.key = key; this.state = State.READ; } @Override public void run() { //处理 读写操作,判断读写 switch (state) { case READ: //将最耗时的操作 放入线程池执行 pool.execute(new Runnable() { @Override public void run() { read(); } }); break; case WRITE: write(); break; default: break; } } /*轮流处理末尾添加事件达到循环处理*/ //处理 读方法 private void read() { ByteBuffer buffer = ByteBuffer.allocate(1024); // 通过通道获取KEY SocketChannel channel = (SocketChannel) key.channel(); try { //将传入的数据写入到buffer中 int num = channel.read(buffer); // 转化成String String msg = new String(buffer.array()); // 增加业务处理 // 继续处理注册写事件 key.interestOps(SelectionKey.OP_WRITE); this.state = State.WRITE; } catch (Exception e) { e.printStackTrace(); } } //处理 写方法 private void write() { ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes()); try { // 通过通道获取KEY SocketChannel channel = (SocketChannel) key.channel(); channel.write(buffer); // 继续处理注册写事件 key.interestOps(SelectionKey.OP_READ); this.state = State.READ; } catch (Exception e) { e.printStackTrace(); } } //记录状态 非读即写 private enum State { //读写事件 READ, WRITE } }
3)主从Reactor模型
mainReactor用来接收连接事件,然后分发给acceptor,acceptor在处理过程中,直接将后续的读写事件,注册到slaveReactor之中,以此来达到分流。
主从监听器
//主从模型 public class MultReactorServer { private Selector mainselector; private Selector slaveselector; private ServerSocketChannel serverSocketChannel; public MultReactorServer() { try { // 主 reactor 处理连接事件 mainselector = Selector.open(); //从reactor 处理读写事件 slaveselector = Selector.open(); // 配置为非阻塞的 serverSocketChannel.configureBlocking(false); // 配置通道连接地址 开放 9090 端口 SocketAddress address = new InetSocketAddress(9090); serverSocketChannel.socket().bind(address); //将channel 注册到 selector监听通道事件 达到多路复用 //首个注册事件一般都是 accept 连接事件 (参数变化) SelectionKey key = serverSocketChannel.register(mainselector, SelectionKey.OP_ACCEPT); // 创建处理连接事件的 acceptor // 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生 (参数变化) Acceptor acceptor = new Acceptor(slaveselector, serverSocketChannel); //附加一个对象 用来处理事件 key.attach(acceptor); //主从监听逻辑分离 new HandlerLoop(slaveselector).run(); while (true) { //返回事件的个数 处理事件 int num = mainselector.select(); if (num == 0) { continue; } //没有跳过就代表有事件需要处理,拿到事件集合 Set<SelectionKey> SKeyset = mainselector.selectedKeys(); Iterator<SelectionKey> iterator = SKeyset.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); //拿到事件的第一事情 移出事件 避免重复处理 iterator.remove(); //根据事件类型 分发 给监听器处理 //需要处理事情的时候 取出存储的对象 //只处理主Reactor 只处理连接事件 Runnable runnable = (Runnable) key.attachment(); runnable.run(); } } } catch (Exception e) { e.printStackTrace(); } } }
主从事件处理读写分离
//用于处理从reactor事件监听 public class HandlerLoop implements Runnable { private Selector selector; public HandlerLoop(Selector selector) { this.selector = selector; } @Override public void run() { try { while (true) { //返回事件的个数 处理事件 int num = selector.select(); if (num == 0) { continue; } //没有跳过就代表有事件需要处理,拿到事件集合 Set<SelectionKey> SKeyset = selector.selectedKeys(); Iterator<SelectionKey> iterator = SKeyset.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); //拿到事件的第一事情 移出事件 避免重复处理 iterator.remove(); //根据事件类型 分发 给监听器处理 //需要处理事情的时候 取出存储的对象 //只处理从reactor 所以 接受的一定是读写事件 Runnable runnable = (Runnable) selectionKey.attachment(); runnable.run(); } } } catch (Exception e) { e.printStackTrace(); } } }
Netty网络框架(二)https://developer.aliyun.com/article/1469520