教你使用io_uring来写一个并发回声服务器

简介: 教你使用io_uring来写一个并发回声服务器

io_uring的使用

什么是io_uring🍻

是内核版本5.10之后的产物,也就是你的内核版本要在5.10之后才能使用,用户空间的环形队列

看见其名字就知道,带队列,能够起到异步解耦的作用,它可以与epoll的性能相提并论,但是却与epoll的工作原理完全不同,下面就让我们来学习它

安装一个liburing

git clone https://github.com/axboe/liburing.git
./configure
make
make install

工作原理🍻

符号说明🔣

  • sqsubmit队列
  • cqcomplete队列
  • sqesq上某一节点
  • cqecq上某一节点

两个队列❓

内核中有两个环形队列,其中一个是submit queue一个是complete queue,简称他们为sqcq

  • sq:用于用户发起操作请求的队列,例如用户发起一个accept(这个异步acceptliburing实现)之后会将这个ACCEPT请求封装为一个节点,放进sq的队列中,之后通过调用一个submit函数来将sq队列中的节点放入内核中去处理
  • cq:用于内核处理完成后放入节点的位置,内核在异步处理完操作后会将节点放入cq队列中

注意:在上述描述中我用到将节点放进这个说法其实是不对的,这样说就好像有拷贝的动作,但是这整个过程中其实都没有拷贝的动作,sqcq都维护的指针,指向的是对应节点,只是什么时候他们该指向哪个节点,例如内核处理完成后cq就会指向完成节点

共享内存

io_uring底层也有共享内存的部分,sqcq中没有拷贝的动作,他们指向的都是一个内核与用户态共享的一块内存块

异步

通过队列,io_uringaccep、recv、send封装成了异步io

  • 例如accept,假设io_uring给出的接口是accept_prepare,调用他后直接返回,io_uringaccept请求放入sq中,内核取出处理完毕后的放进cq中,通过cq->res获取原本系统调用的返回值,通过一些附加信息获取原始sockfd

使用io_uring🍻

使用io_uring实现一个可供多个客户端连接的回声服务器

大概流程🎏

  • 初始化sqcq队列
  • accept操作注册进sq队列中
  • submitsq队列中的操作到内核去处理
  • cq中获取操作完成的操作们到用户态
  • 循环遍历判断状态来获取相应的返回值以及进行相应处理

**注意:**当状态通知到时 操作就已经是完成了的,我们只需要直接读结果就行,而不是像reactor那样事件通知然后执行相应的操作

使用io_uring

首先写一个没有acceptTCP服务器 hh伪代码

int sockfd = socket(AF_INET, SOCK_STREAM, 0); // io
bind -> Ip:0.0.0.0 Port:9999
listen

获取原始sockfd

什么是原始sockfd

  • 例如posixAPIaccpet,第一个参数是一个listenfd,其返回值是clientfd,其listenfd就是原始sockfd
  • 例如posixAPIrecv,第一个参数是clientfd,其返回值是实际读取到的字节,其clientfd就是原始sockfd
  • 为什么要专门说获取原始sockfd,因为如果不做任何附加信息,cq中取出来的节点其上面只有原始函数的返回值,无法获取原始sockfd,因而有一个场景,如果想要连接多个客户端,在第一次accpet状态触发后需要重新注册accept操作进sq队列,此时进凭cqe->res是无法操作的,我们需要利用cqe->user_data

epoll中用epoll_event来获取原始fd和注册的对应事件,而io_uring要获取原始fd,和设置操作状态 的话也是需要这样一个结构体,我们可以自己实现

enum {
  EVENT_ACCEPT = 0,
  EVENT_READ,
  EVENT_WRITE
};
typedef struct _conninfo {
  int connfd;
  int event;
} conninfo;
  • io_uring_sqe结构体中有一个64ull的成员user_data,所以我们设计一个conninfo的结构体来存储不同的操作状态和原始sockfd它后续是这样使用的
struct io_uring_sqe *sqe = get_sqe_from_ring();
io_uring_prep_accept(sqe, sockfd, addr, addrlen, flag);
conninfo info_accept = {
    .connfd = sockfd,
    .event = EVENT_ACCEPT,
};
// 将对应状态附给sqe
memcpy(&sqe->user_data, &info_accept, sizeof(info_accept));
  • cqe(完成队列某节点)中我们就可以通过其user_data字段得到对应的描述符,根据其状态来决定下一步操作,比如如果状态为EVENT_ACCEPT则说明有客户端连接,首先将返回的clientfd获取,然后先将listenfd的**accept操作注册进sq中(保证多个客户端可连接)然后再将clientfdrecv操作**注册进sq中(使服务器能够接收数据)
    注意:最后的处理是将他们的操作注册进sq中,在代码上的形式就是调用了io_uring中异步的acceptrecv,他们的返回值都是void,真正的返回值通过cq队列中节点的res字段获取
初始化sqcq队列🐝 <io_uring_queue_init_params>
#define ENTRIES_LENGTH    1024
struct io_uring_params params;
memset(&params, 0, sizeof(params));
struct io_uring ring;
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);

该函数执行后,sqcq队列被初始化

  • io_uring结构体中维护着sqcq队列
  • ENTRIES_LENGTH指定队列的长度
  • params被初始化为0值,表示属性全部使用默认
注册accept操作到sq队列🐝 io_uring_prep_accept

io_uring提供的异步accept只比accept4多了一个参数,也就是sq队列的地址

封装了一个函数

void set_accept_event(struct io_uring *ring, int sockfd, struct sockaddr *addr,
                   socklen_t *addrlen, int flags) {
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags);
    conninfo info_accept = {
        .connfd = sockfd,
        .event = EVENT_ACCEPT,
    };
    memcpy(&sqe->user_data, &info_accept, sizeof(info_accept));
}
  • 此函数主要是执行了异步的accept,并附加了状态信息以及描述符信息
  • 获取sq位置,
  • 调用异步api
  • 加入附加信息

还有两个操作被封装成了函数,注册read操作与注册write操作到sq队列

void set_send_event(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags) {
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
  io_uring_prep_send(sqe, sockfd, buf, len, flags);
  conninfo info_send = {
    .connfd = sockfd,
    .event = EVENT_WRITE,
  };
  memcpy(&sqe->user_data, &info_send, sizeof(info_send));
}
void set_recv_event(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags) {
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
  io_uring_prep_recv(sqe, sockfd, buf, len, flags);
  conninfo info_recv = {
    .connfd = sockfd,
    .event = EVENT_READ,
  };
  memcpy(&sqe->user_data, &info_recv, sizeof(info_recv));
}

accept相似,只是内部调用api的不同与附加操作状态的不同

mainloop阶段➿ <while(1)>

下面的操作都包含在一个while(1)里面

提交sq上的操作到内核🚀<io_uring_submit>
io_uring_submit(&ring);
主程序等待cq中有节点⬅️<io_uring_wait_cqe>
struct io_uring_cqe *cqe_;
io_uring_wait_cqe(&ring, &cqe_);
探测到cq中有节点后取出cq中指定个数的节点⬅️<io_uring_peek_batch_cqe>
struct io_uring_cqe *cqes[10];
int cqecount = io_uring_peek_batch_cqe(&ring, cqes, 10);
  • cqecount <= 第三个参数(这里是10)
循环遍历每个节点,根据操作状态来决定下一步⬅️<for(int i = 0; i < cqecount; i++)>

取出对应操作的原始sockfd与操作状态

for外面定义struct io_uring_cqe *cqe;供获取每个操作完成的节点

cqe = cqes[i];
// 取出里面的原始sockfd与操作状态
conninfo ci;
memcpy(&ci, &cqe->user_data, sizeof(ci));
  • 状态:ci.event,原始sockfdci.connfd
根据状态延伸出不同的操作
  • ci.event == EVENT_ACCEPT
int connfd = cqe->res;
set_accept_event(&ring, ci.connfd, (struct sockaddr*)&clientaddr, &clilen, 0);
set_recv_event(&ring, connfd, buffer, 1024, 0);
  • 重新注册accept是为了多个客户端可连接
    注册read是让服务器可以接受客户端发送数据
  • ci.event == EVENT_READ
if (cqe->res == 0) {
    close(ci.connfd);
} else {
    printf("recv --> %s, %d\n", buffer, cqe->res);
    set_send_event(&ring, ci.connfd, buffer, cqe->res, 0);
}
  • 通过cqe->res获取异步read操作的返回值,这里就能看出与reactor的区别,reactorread事件触发了才开始执行read操作,这里当read状态通知时是read操作已经调用完成了,接着就直接注册send
  • ci.event == EVENT_WRITE
set_recv_event(&ring, ci.connfd, buffer, 1024, 0);
  • 能够执行到这里就说明send成功了,此时只需要再次设置recv客户端即可进行多次发送
推进I/O事件完成队列的指针✴️<io_uring_cq_advance>

for循环完成之后调用

io_uring_cq_advance(&ring, cqecount);
  • 这样下次调用io_uring_peek_batch_cqe获取cqe就不会出错了

完整代码

uring_io/uring_server.c at main · luopanforever/uring_io · GitHub

相关文章
|
3月前
|
开发框架 并行计算 算法
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
揭秘Python并发神器:IO密集型与CPU密集型任务的异步革命,你竟还傻傻分不清?
46 4
|
2月前
|
监控 并行计算 数据处理
构建高效Python应用:并发与异步编程的实战秘籍,IO与CPU密集型任务一网打尽!
在Python编程的征途中,面对日益增长的性能需求,如何构建高效的应用成为了每位开发者必须面对的课题。并发与异步编程作为提升程序性能的两大法宝,在处理IO密集型与CPU密集型任务时展现出了巨大的潜力。今天,我们将深入探讨这些技术的最佳实践,助你打造高效Python应用。
39 0
|
4月前
|
开发框架 缓存 .NET
并发请求太多,服务器崩溃了?试试使用 ASP.NET Core Web API 操作筛选器对请求进行限流
并发请求太多,服务器崩溃了?试试使用 ASP.NET Core Web API 操作筛选器对请求进行限流
217 0
|
1月前
|
存储 关系型数据库 MySQL
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
255 2
|
27天前
|
存储 弹性计算 固态存储
阿里云服务器ESSD Entry系统盘测评IOPS、IO读写和时延性能参数
ESSD Entry云盘是阿里云推出的新一代云盘,具备高IOPS、低延迟和企业级数据保护能力。适用于开发与测试场景,支持按量付费和包年包月计费模式。99元和199元的ECS经济型e实例和通用算力型u1实例均采用ESSD Entry系统盘,性价比高。详细性能参数和价格请参考阿里云官方页面。
58 0
|
6月前
|
Java
Java Socket编程与多线程:提升客户端-服务器通信的并发性能
【6月更文挑战第21天】Java网络编程中,Socket结合多线程提升并发性能,服务器对每个客户端连接启动新线程处理,如示例所示,实现每个客户端的独立操作。多线程利用多核处理器能力,避免串行等待,提升响应速度。防止死锁需减少共享资源,统一锁定顺序,使用超时和重试策略。使用synchronized、ReentrantLock等维持数据一致性。多线程带来性能提升的同时,也伴随复杂性和挑战。
105 0
|
2月前
|
开发框架 并行计算 .NET
脑洞大开!Python并发与异步编程的哲学思考:IO密集型与CPU密集型任务的智慧选择!
脑洞大开!Python并发与异步编程的哲学思考:IO密集型与CPU密集型任务的智慧选择!
29 1
|
3月前
|
存储 关系型数据库 MySQL
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
166 5
|
3月前
|
算法 Java 程序员
解锁Python高效之道:并发与异步在IO与CPU密集型任务中的精准打击策略!
在数据驱动时代,高效处理大规模数据和高并发请求至关重要。Python凭借其优雅的语法和强大的库支持,成为开发者首选。本文将介绍Python中的并发与异步编程,涵盖并发与异步的基本概念、IO密集型任务的并发策略、CPU密集型任务的并发策略以及异步IO的应用。通过具体示例,展示如何使用`concurrent.futures`、`asyncio`和`multiprocessing`等库提升程序性能,帮助开发者构建高效、可扩展的应用程序。
119 0
|
3月前
|
网络协议 数据处理 C语言
利用C语言基于poll实现TCP回声服务器的多路复用模型
此代码仅为示例,展示了如何基于 `poll`实现多路复用的TCP回声服务器的基本框架。在实际应用中,你可能需要对其进行扩展或修改,以满足具体的需求。
88 0