2.9 epoll的实现原理

简介: 2.9 epoll的实现原理

前文已经基于dpdk实现了用户态协议栈,但是有个缺陷就是不能连接多服务端。这也就引出了本文的目的——如何实现自定的 epoll

为什么不用系统自带的epoll

用户态协议栈是指运行在用户态的协议栈,与传统的内核态协议栈相比,它有许多优点,如灵活性、可扩展性、高性能等。因为可以避免内核态和用户态之间频繁的上下文切换,从而提高网络应用的处理性能。

在用户态协议栈中,一般需要自定义 I/O 多路复用机制,以便实现事件驱动的网络编程模型。传统的 Linux 系统提供了 epoll 这种高效的 I/O 多路复用机制,但是无法直接在用户态协议栈中使用。主要原因是,Linux 中的 epoll 实现涉及到了内核态和用户态之间的交互,如果要在用户态协议栈中使用 Linux 的 epoll,就必须频繁地进行系统调用和内存拷贝,这样会导致性能严重下降,并且违背了使用用户态协议栈的初衷。


因此在实现 epoll之前,需要先学习内核 epoll的原理。可以从四个方面入手:

1) epoll的数据结构

2) epoll的线程安全

3) epoll的内核回调

4) epoll的LT和ET


一、epoll的数据结构

1、epoll的数据结构选择

epoll需要为以下两类fd选择合适的数据结构

1)总集:所有fd的集合

2)就绪:就绪fd的集合


对于总集,首先需要明确几点:

1)每一个fd底层对应了一个tcb,包含了该连接的所有信息。也就是key—value的存储模式

2)对于io主要操作有:查找、删除、修改、插入。对于epoll总集,强查找的频率、数量会远高于其他操作。

3)io数量不确定,有时候多,有时候少。但不管什么时候,性能都不能太差。


因此,基于上面,总集可选择的数据结构有

1)hash —— O(1)

\quad 优点是查询速度快。缺点是创建时候需要指定hash的大小,但是不能太小,因为很可能会面临百万的fd。但是大多时候仅需要管理一小部分的fd,那就浪费了很大的空间。


2)rbtree —— O(lg N)

\quad 查找速度快,且不需分配初始大小。

3)链表 —— O(N)

\quad 随着链表越长,查找性能越来越差。

4)跳表

\quad 随着fd越来越多,分级实现越来越复杂。


而对于就绪fd,不需要查找,仅需挨个取出处理,因此选择双向链表即可。


2、epoll数据结构图

epoll的总集eventpoll和就绪epitem的数据结构如下,List 用来存储准备就绪的 IO,Rbtree 用来存储所有 io 的数据。

epitem:存储每个 io 对应的事件,每个注册到 eventpoll的 fd 对应1个 epitem

struct epitem {
  RB_ENTRY(epitem) rbn;
  LIST_ENTRY(epitem) rdlink;
  int rdy; //是否在就绪队列中
  int sockfd;   
  struct epoll_event event; //注册事件的类型
};

eventpoll:管理epoll 对象

struct eventpoll {
  int fd;
  ep_rb_tree rbr;
  int rbcnt;
  LIST_HEAD( ,epitem) rdlist;
  int rdnum;
  int waiting;
  pthread_mutex_t mtx; //rbtree update
  pthread_spinlock_t lock; //rdlist update
  pthread_cond_t cond; //block for event
  pthread_mutex_t cdmtx; //mutex for cond
};


二、epoll的线程安全

epoll为用户态提供三个接口

1)epoll_create:创建fd — 创建红黑树的根节点

2)epoll_ctl:控制fd — EPOLL_CTL_ADD、EPOLL_CTL_MOD、EPOLL_CTL_DEL分别对应增加、修改、删除结点

3)epoll_wait:把就绪队列的结点,通过指针赋值的方式,复制到用户态的events


epoll的线程安全,无非就是保证两个数据结构的能够多线程并行操作

1)多个线程可以同时操作红黑树,即调用epoll_ctl ——> 对红黑树加锁(互斥锁),锁根节点

2)多个线程可以同时复制,即调用epoll_wait ——> 对就绪队列加锁(自旋锁)


三、epoll的内核回调

epoll 回调函数

调用回调函数,在总集红黑树查找epitem

1)若存在,将新的要关注的事件掩码添加到已有的事件掩码

2)若不存在,加入到就绪队列,然后唤醒epoll_wait。epoll_wait运行后,将就绪队列里的epitem cpoy到用户态的events中,并删除就绪队列里相应的epitem


epoll_callback 是生产者,放入结点,唤醒 epoll_wait;epoll_wait 是消费者,消费结点。

int epoll_event_callback(struct eventpoll *ep, int sockid, uint32_t event) {
  struct epitem tmp;
  tmp.sockfd = sockid;
  //在红黑树查找tmp
  struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp);
  if (!epi) {
    printf("rbtree not exist\n");
    return -1;
  }
  //已在就绪队列中,将新的事件掩码添加到已有的事件掩码
  if (epi->rdy) {
    epi->event.events |= event;
    return 1;
  } 
  printf("epoll_event_callback --> %d\n", epi->sockfd);
  //不在就绪队列,则将结点加入到就绪队列
  pthread_spin_lock(&ep->lock);
  epi->rdy = 1;
  LIST_INSERT_HEAD(&ep->rdlist, epi, rdlink);
  ep->rdnum ++;
  pthread_spin_unlock(&ep->lock);
  pthread_mutex_lock(&ep->cdmtx);
  //唤醒epoll_wait
  pthread_cond_signal(&ep->cond);
  pthread_mutex_unlock(&ep->cdmtx);
  return 0;
}


epoll回调时机

一个有四个调用时机

  1. 三次握手完成之后
    tcp 三次握手,对端反馈 ack 后,socket 进入 rcvd 状态。需要将监听 socket 的
    event 置为 EPOLLIN,此时标识可以进入到 accept 读取 socket 数据。
  2. 接收数据回复ACK之后
    在 established 状态,收到数据以后,需要将 socket 的 event 置为 EPOLLIN 状态。
  3. 发送数据收到ACK之后
    检测 socket 的 send 状态,如果对端 cwnd>0 ,可以发送的数据。故需要将 socket
    置为 EPOLLOUT。
  4. 接收FIN回复ACK之后
    在 established 状态,收到 fin 时,此时 socket 进入到 close_wait。需要 socket 的 event置为 EPOLLIN。读取断开信息。



四、epoll的用户态接口

epoll_create

创建 eventpoll 结构体

int nepoll_create(int size) {
  if (size <= 0) return -1;
  //从位图中获取一个可用文件描述符fd
  int epfd = get_fd_frombitmap(); 
  struct eventpoll *ep = (struct eventpoll*)rte_malloc("eventpoll", sizeof(struct eventpoll), 0);
  if (!ep) { //若创建失败,从位图删除
    set_fd_frombitmap(epfd);
    return -1;
  }
  //获得指向全局结构体 ng_tcp_table 类型的指针
  struct ng_tcp_table *table = tcpInstance();
  table->ep = ep;
  //初始化红黑树和就绪队列
  ep->fd = epfd;
  ep->rbcnt = 0;
  RB_INIT(&ep->rbr);
  LIST_INIT(&ep->rdlist);
  if (pthread_mutex_init(&ep->mtx, NULL)) {
    free(ep);
    set_fd_frombitmap(epfd);
    return -2;
  }
  if (pthread_mutex_init(&ep->cdmtx, NULL)) {
    pthread_mutex_destroy(&ep->mtx);
    free(ep);
    set_fd_frombitmap(epfd);
    return -2;
  }
  if (pthread_cond_init(&ep->cond, NULL)) {
    pthread_mutex_destroy(&ep->cdmtx);
    pthread_mutex_destroy(&ep->mtx);
    free(ep);
    set_fd_frombitmap(epfd);
    return -2;
  }
  if (pthread_spin_init(&ep->lock, PTHREAD_PROCESS_SHARED)) {
    pthread_cond_destroy(&ep->cond);
    pthread_mutex_destroy(&ep->cdmtx);
    pthread_mutex_destroy(&ep->mtx);
    free(ep);
    set_fd_frombitmap(epfd);
    return -2;
  }
  return epfd;
}

epoll_ctl

对红黑树进行增添,修改、删除

int nepoll_ctl(int epfd, int op, int sockid, struct epoll_event *event) {
  // 通过epfd获取所关联的内核对象,并强转为 eventpoll 结构体类型
  struct eventpoll *ep = (struct eventpoll*)get_hostinfo_fromfd(epfd);
  if (!ep || (!event && op != EPOLL_CTL_DEL)) {
    errno = -EINVAL;
    return -1;
  }
  // 增加--EPOLL_CTL_ADD
  if (op == EPOLL_CTL_ADD) {
    pthread_mutex_lock(&ep->mtx);
    struct epitem tmp;
    tmp.sockfd = sockid;
    //在红黑树查找要增加的结点
    struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp);
    if (epi) { //若存在,则返回
      pthread_mutex_unlock(&ep->mtx);
      return -1;
    }
    //若不存在,则创建新的结点epitem,并添加相应的sockfd、event,最后插入到红黑树中
    epi = (struct epitem*)rte_malloc("epitem", sizeof(struct epitem), 0);
    if (!epi) {
      pthread_mutex_unlock(&ep->mtx);
      rte_errno = -ENOMEM;
      return -1;
    }
    epi->sockfd = sockid;
    memcpy(&epi->event, event, sizeof(struct epoll_event));
    epi = RB_INSERT(_epoll_rb_socket, &ep->rbr, epi);
    ep->rbcnt ++;
    pthread_mutex_unlock(&ep->mtx);
  } else if (op == EPOLL_CTL_DEL) { //删除--EPOLL_CTL_DEL
    pthread_mutex_lock(&ep->mtx);
    struct epitem tmp;
    tmp.sockfd = sockid;
    //在红黑树查找要删除的结点
    struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp);
    if (!epi) {//若不存在,则返回
      pthread_mutex_unlock(&ep->mtx);
      return -1;
    }
    //若存在,则从红黑树删除
    epi = RB_REMOVE(_epoll_rb_socket, &ep->rbr, epi);
    if (!epi) {
      pthread_mutex_unlock(&ep->mtx);
      return -1;
    }
    ep->rbcnt --;
    rte_free(epi);
    pthread_mutex_unlock(&ep->mtx);
  } else if (op == EPOLL_CTL_MOD) { //修改--EPOLL_CTL_MOD
    struct epitem tmp;
    tmp.sockfd = sockid;
    struct epitem *epi = RB_FIND(_epoll_rb_socket, &ep->rbr, &tmp);
    if (epi) {
      epi->event.events = event->events;
      epi->event.events |= EPOLLERR | EPOLLHUP;
    } else {
      rte_errno = -ENOENT;
      return -1;
    }
  } 
  return 0;
}

epoll_wait

监控就绪队列,等待 fd 就绪。若有读写事件发生,从内核拷贝数据到用户空间,加入到事件列表中,并通知等待这些事件的进程或线程;若没有则阻塞等待。

int nepoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) {
  // 通过epfd获取所关联的内核对象,并强转为 eventpoll 结构体类型
  struct eventpoll *ep = (struct eventpoll*)get_hostinfo_fromfd(epfd);;
  if (!ep || !events || maxevents <= 0) {
    rte_errno = -EINVAL;
    return -1;
  }
  //
  if (pthread_mutex_lock(&ep->cdmtx)) {
    if (rte_errno == EDEADLK) {
      printf("epoll lock blocked\n");
    }
  }
  // 1.若就绪队列为空,且timeout != 0 等待
  while (ep->rdnum == 0 && timeout != 0) {
    //设置状态为等待
    ep->waiting = 1;
    if (timeout > 0) { //timeout > 0,等待timeout
      struct timespec deadline;
      //计算定时器的超时时间
      clock_gettime(CLOCK_REALTIME, &deadline);
      if (timeout >= 1000) {
        int sec;
        sec = timeout / 1000;
        deadline.tv_sec += sec;
        timeout -= sec * 1000;
      }
      deadline.tv_nsec += timeout * 1000000;
      if (deadline.tv_nsec >= 1000000000) {
        deadline.tv_sec++;
        deadline.tv_nsec -= 1000000000;
      }
      //指定的时间deadline内等待条件变量cond的状态改变。
      int ret = pthread_cond_timedwait(&ep->cond, &ep->cdmtx, &deadline);
      if (ret && ret != ETIMEDOUT) {
        printf("pthread_cond_timewait\n");
        pthread_mutex_unlock(&ep->cdmtx);
        return -1;
      }
      timeout = 0;
    } else if (timeout < 0) { //timeout < 0,一直等待
      int ret = pthread_cond_wait(&ep->cond, &ep->cdmtx);
      if (ret) {
        printf("pthread_cond_wait\n");
        pthread_mutex_unlock(&ep->cdmtx);
        return -1;
      }
    }
    ep->waiting = 0;  //更新状态为不等待
  }
  pthread_mutex_unlock(&ep->cdmtx);
  //对就绪队列加锁(自旋锁)
  pthread_spin_lock(&ep->lock);
  int cnt = 0;
  int num = (ep->rdnum > maxevents ? maxevents : ep->rdnum);
  int i = 0;
  //2.从就绪队列中拷贝事件到用户态数组
  while (num != 0 && !LIST_EMPTY(&ep->rdlist)) { //EPOLLET
    //从就绪队列取出第一个结点
    struct epitem *epi = LIST_FIRST(&ep->rdlist);
    LIST_REMOVE(epi, rdlink);
    epi->rdy = 0;
    //将事件拷贝到用户态
    memcpy(&events[i++], &epi->event, sizeof(struct epoll_event));
    num --;
    cnt ++;
    ep->rdnum --;
  }
  pthread_spin_unlock(&ep->lock);
  return cnt;
}

五、epoll的LT和ET

  • ET边沿触发,只触发一次
  • LT水平触发,如果有事件(比如数据recv_buffer没读完)就一直触发
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
4月前
|
存储 网络协议 安全
Epoll的实现原理
Epoll的实现原理
|
6月前
|
应用服务中间件 Go nginx
Swoole 源码分析之 epoll 多路复用模块
IO多路复用技术通过使用少量的线程或进程同时监视多个IO事件,能够更高效地处理大量的IO操作,从而提高系统的性能和资源利用率。
60 0
Swoole 源码分析之 epoll 多路复用模块
|
6月前
|
存储 安全 网络协议
epoll的底层实现原理
epoll的底层实现原理
55 0
|
6月前
epoll(2) 使用及源码分析的引子
epoll(2) 使用及源码分析的引子
|
7月前
|
监控 Linux
epoll 的用法
【4月更文挑战第16天】epoll 通过改进的接口设计,避免了用户态 - 内核态频繁的数据拷贝,大大提高了系统性能。在使用 epoll 的时候,我们一定要理解条件触发和边缘触发两种模式。
|
7月前
|
存储 安全 网络协议
epoll的实现原理
epoll的实现原理
82 0
|
7月前
|
监控 Java 应用服务中间件
epoll封装reactor原理剖析与代码实现(2)
epoll封装reactor原理剖析与代码实现(2)
96 0
|
7月前
|
Linux API C++
epoll封装reactor原理剖析与代码实现(1)
epoll封装reactor原理剖析与代码实现(1)
213 0
|
7月前
|
监控 Linux API
epoll-reactor模型原理及代码解析
epoll-reactor模型原理及代码解析
147 0
|
存储 监控 NoSQL
epoll底层原理
epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本。
438 0
epoll底层原理