epoll封装reactor原理剖析与代码实现(1)

简介: epoll封装reactor原理剖析与代码实现(1)

前言

 本文将由浅入深的介绍reactor,深入浅出的封装epoll,一步步变成reactor模型,并在文末介绍reactor的四种模型。


 本专栏知识点是通过零声教育的线上课学习,进行梳理总结写下文章,对c/c++linux课程感兴趣的读者,可以点击链接 C/C++后台高级服务器课程介绍 详细查看课程的服务。


reactor是什么?

 reactor是一种高并发服务器模型,是一种框架,一个概念,所以reactor没有一个固定的代码,可以有很多变种,后续会介绍到。


 组成:⾮阻塞的IO(如果是阻塞IO,发送缓冲区满了怎么办,就阻塞了) + io多路复⽤;特征:基于事件循环,以事件驱动或者事件回调的⽅式来实现业务逻辑。


 reactor中的IO使用的是select,poll,epoll这些IO多路复用,使用IO多路复用系统不必创建维护大量线程,只使用一个线程、一个选择器就可同时处理成千上万连接,大大减少了系统开销。


 reactor中文译为反应堆,将epoll中的IO变成事件驱动,比如读事件,写事件。来了个读事件,立马进行反应,执行提前注册好的事件回调函数。


 回想一下普通函数调用的机制:程序调用某函数,函数执行,程序等待,函数将结果和控制权返回给程序,程序继续处理。reactor反应堆,是一种事件驱动机制,和普通函数调用的不同之处在于:应用程序不是主动的调用某个 API 完成处理,而是恰恰相反,reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到 reactor上,如果相应的事件发生,reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。


 说白了,reactor就是对epoll进行封装,进行网络IO与业务的解耦,将epoll管理IO变成管理事件,整个程序由事件进行驱动执行。就像下图一样,有就绪事件返回,reactor:由事件驱动执行对应的回调函数;epoll:需要自己判断。

reactor模型三个重要组件与流程分析

 reactor是处理并发 I/O 比较常见的一种模式,用于同步 I/O,中心思想是将所有要处理的 I/O 事件注册到一个中心 I/O 多路复用器(epoll)上,同时主线程/进程阻塞在多路复用器上;一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。

组件

reactor模型有三个重要的组件

  • 多路复用器:由操作系统提供,在 linux 上一般是 select, poll, epoll 等系统调用。

  • 事件分发器:将多路复用器中返回的就绪事件分到对应的处理函数中。

  • 事件处理器:负责处理特定事件的处理函数。

流程

具体流程:

  1. 注册相应的事件处理器(刚开始listenfd注册都就绪事件)
  2. 多路复用器等待事件
  3. 事件到来,激活事件分发器,分发器调用事件到对应的处理器
  4. 事件处理器处理事件,然后注册新的事件(比如读事件,完成读操作后,根据业务处理数据,注册写事件,写事件根据业务响应请求;比如listen读事件,肯定要给新的连接注册读事件)

将epoll封装成reactor事件驱动

封装每一个连接sockfd变成ntyevent

 我们知道一个连接对应一个文件描述符fd,对于这个连接(fd)来说,它有自己的事件(读,写)。我们将fd都设置成非阻塞的,所以这里我们需要添加两个buffer,至于大小就是看业务需求了。

struct ntyevent {
    int fd;//socket fd
    int events;//事件
    char sbuffer[BUFFER_LENGTH];//写缓冲buffer
    int slength;
    char rbuffer[BUFFER_LENGTH];//读缓冲buffer
    int rlength;
//    typedef int (*NtyCallBack)(int, int, void *);
    NtyCallBack callback;//回调函数
    void *arg;
    int status;//1MOD 0 null
};

封装epfd和ntyevent变成ntyreactor

 我们知道socket fd已经被封装成了ntyevent,那么有多少个ntyevent呢?这里demo初始化reactor的时候其实是将*events指向了一个1024的ntyevent数组(按照道理来说客户端连接可以一直连,不止1024个客户端,后续文章有解决方案,这里从简)。epfd肯定要封装进行,不用多说。

struct ntyreactor {
    int epfd;
    struct ntyevent *events;
    //struct ntyevent events[1024];
};

封装读、写、接收连接等事件对应的操作变成callback

 前面已经说了,把事件写成回调函数,这里的参数fd肯定要知道自己的哪个连接,events是什么事件的意思,arg传的是ntyreactor (考虑到后续多线程多进程,如果将ntyreactor设为全局感觉不太好 )

typedef int (*NtyCallBack)(int, int, void *);
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);
int accept_cb(int fd, int events, void *arg);

给每个客户端的ntyevent设置属性

  具两个例子,我们知道第一个socket一定是listenfd,用来监听用的,那么首先肯定是设置ntyevent的各项属性。

 本来是读事件,读完后要改成写事件,那么必然要把原来的读回调函数设置成写事件回调。

void nty_event_set(struct ntyevent *ev, int fd, NtyCallBack callback, void *arg) {
    ev->fd = fd;
    ev->callback = callback;
    ev->events = 0;
    ev->arg = arg;
}

将ntyevent加入到epoll中由内核监听

int nty_event_add(int epfd, int events, struct ntyevent *ntyev) {
    struct epoll_event ev = {0, {0}};
    ev.data.ptr = ntyev;
    ev.events = ntyev->events = events;
    int op;
    if (ntyev->status == 1) {
        op = EPOLL_CTL_MOD;
    }
    else {
        op = EPOLL_CTL_ADD;
        ntyev->status = 1;
    }
    if (epoll_ctl(epfd, op, ntyev->fd, &ev) < 0) {
        printf("event add failed [fd=%d], events[%d],err:%s,err:%d\n", ntyev->fd, events, strerror(errno), errno);
        return -1;
    }
    return 0;
}

将ntyevent从epoll中去除

int nty_event_del(int epfd, struct ntyevent *ev) {
    struct epoll_event ep_ev = {0, {0}};
    if (ev->status != 1) {
        return -1;
    }
    ep_ev.data.ptr = ev;
    ev->status = 0;
    epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
    //epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, NULL);
    return 0;
}

读事件回调函数

这里就是被触发的回调函数,具体代码要与业务结合,这里的参考意义不大(这里就是读一次,改成写事件)

int recv_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    struct ntyevent *ntyev = &reactor->events[fd];
    int len = recv(fd, ntyev->buffer, BUFFER_LENGTH, 0);
    nty_event_del(reactor->epfd, ntyev);
    if (len > 0) {
        ntyev->length = len;
        ntyev->buffer[len] = '\0';
        printf("C[%d]:%s\n", fd, ntyev->buffer);
        nty_event_set(ntyev, fd, send_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLOUT, ntyev);
    }
    else if (len == 0) {
        close(ntyev->fd);
        printf("[fd=%d] pos[%ld], closed\n", fd, ntyev - reactor->events);
    }
    else {
        close(ntyev->fd);
        printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
    }
    return len;
}

写事件回调函数

这里就是被触发的回调函数,具体代码要与业务结合,这里的参考意义不大(将读事件读的数据写回,再改成读事件,相当于echo)

int send_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    struct ntyevent *ntyev = &reactor->events[fd];
    int len = send(fd, ntyev->buffer, ntyev->length, 0);
    if (len > 0) {
        printf("send[fd=%d], [%d]%s\n", fd, len, ntyev->buffer);
        nty_event_del(reactor->epfd, ntyev);
        nty_event_set(ntyev, fd, recv_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLIN, ntyev);
    }
    else {
        close(ntyev->fd);
        nty_event_del(reactor->epfd, ntyev);
        printf("send[fd=%d] error %s\n", fd, strerror(errno));
    }
    return len;
}

接受新连接事件回调函数

  本质上就是accept,然后将其加入到epoll监听

int accept_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    if (reactor == NULL) return -1;
    struct sockaddr_in client_addr;
    socklen_t len = sizeof(client_addr);
    int clientfd;
    if ((clientfd = accept(fd, (struct sockaddr *) &client_addr, &len)) == -1) {
        printf("accept: %s\n", strerror(errno));
        return -1;
    }
    printf("client fd = %d\n", clientfd);
    if ((fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
        printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
        return -1;
    }
    nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);
    nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);
    printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port),
           reactor->events[clientfd].last_active, clientfd);
    return 0;
}

reactor运行

  就是将原来的epoll_wait从main函数中封装到ntyreactor_run函数中

int ntyreactor_run(struct ntyreactor *reactor) {
    if (reactor == NULL) return -1;
    if (reactor->epfd < 0) return -1;
    if (reactor->events == NULL) return -1;
    struct epoll_event events[MAX_EPOLL_EVENTS];
    int checkpos = 0, i;
    while (1) {
        int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
        if (nready < 0) {
            printf("epoll_wait error, exit\n");
            continue;
        }
        for (i = 0; i < nready; i++) {
            struct ntyevent *ev = (struct ntyevent *) events[i].data.ptr;
            ev->callback(ev->fd, events[i].events, ev->arg);
        }
    }
}

reactor简单版代码与测试

  后续会出一篇测试百万连接数量的文章

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#define BUFFER_LENGTH           4096
#define MAX_EPOLL_EVENTS        1024
#define SERVER_PORT             8082
typedef int (*NtyCallBack)(int, int, void *);
struct ntyevent {
    int fd;
    int events;
    void *arg;
    NtyCallBack callback;
    int status;//1MOD 0 null
    char buffer[BUFFER_LENGTH];
    int length;
    long last_active;
};
struct ntyreactor {
    int epfd;
    struct ntyevent *events;
};
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);
int accept_cb(int fd, int events, void *arg);
void nty_event_set(struct ntyevent *ev, int fd, NtyCallBack callback, void *arg) {
    ev->fd = fd;
    ev->callback = callback;
    ev->events = 0;
    ev->arg = arg;
    ev->last_active = time(NULL);
}
int nty_event_add(int epfd, int events, struct ntyevent *ntyev) {
    struct epoll_event ev = {0, {0}};
    ev.data.ptr = ntyev;
    ev.events = ntyev->events = events;
    int op;
    if (ntyev->status == 1) {
        op = EPOLL_CTL_MOD;
    }
    else {
        op = EPOLL_CTL_ADD;
        ntyev->status = 1;
    }
    if (epoll_ctl(epfd, op, ntyev->fd, &ev) < 0) {
        printf("event add failed [fd=%d], events[%d],err:%s,err:%d\n", ntyev->fd, events, strerror(errno), errno);
        return -1;
    }
    return 0;
}
int nty_event_del(int epfd, struct ntyevent *ev) {
    struct epoll_event ep_ev = {0, {0}};
    if (ev->status != 1) {
        return -1;
    }
    ep_ev.data.ptr = ev;
    ev->status = 0;
    epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
    //epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, NULL);
    return 0;
}
int recv_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    struct ntyevent *ntyev = &reactor->events[fd];
    int len = recv(fd, ntyev->buffer, BUFFER_LENGTH, 0);
    nty_event_del(reactor->epfd, ntyev);
    if (len > 0) {
        ntyev->length = len;
        ntyev->buffer[len] = '\0';
        printf("C[%d]:%s\n", fd, ntyev->buffer);
        nty_event_set(ntyev, fd, send_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLOUT, ntyev);
    }
    else if (len == 0) {
        close(ntyev->fd);
        printf("[fd=%d] pos[%ld], closed\n", fd, ntyev - reactor->events);
    }
    else {
        close(ntyev->fd);
        printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
    }
    return len;
}
int send_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    struct ntyevent *ntyev = &reactor->events[fd];
    int len = send(fd, ntyev->buffer, ntyev->length, 0);
    if (len > 0) {
        printf("send[fd=%d], [%d]%s\n", fd, len, ntyev->buffer);
        nty_event_del(reactor->epfd, ntyev);
        nty_event_set(ntyev, fd, recv_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLIN, ntyev);
    }
    else {
        close(ntyev->fd);
        nty_event_del(reactor->epfd, ntyev);
        printf("send[fd=%d] error %s\n", fd, strerror(errno));
    }
    return len;
}
int accept_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    if (reactor == NULL) return -1;
    struct sockaddr_in client_addr;
    socklen_t len = sizeof(client_addr);
    int clientfd;
    if ((clientfd = accept(fd, (struct sockaddr *) &client_addr, &len)) == -1) {
        printf("accept: %s\n", strerror(errno));
        return -1;
    }
    printf("client fd = %d\n", clientfd);
    if ((fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
        printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
        return -1;
    }
    nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);
    nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);
    printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port),
           reactor->events[clientfd].last_active, clientfd);
    return 0;
}
int init_sock(short port) {
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(port);
    bind(fd, (struct sockaddr *) &server_addr, sizeof(server_addr));
    if (listen(fd, 20) < 0) {
        printf("listen failed : %s\n", strerror(errno));
    }
    return fd;
}
int ntyreactor_init(struct ntyreactor *reactor) {
    if (reactor == NULL) return -1;
    memset(reactor, 0, sizeof(struct ntyreactor));
    reactor->epfd = epoll_create(1);
    if (reactor->epfd <= 0) {
        printf("create epfd in %s err %s\n", __func__, strerror(errno));
        return -2;
    }
    reactor->events = (struct ntyevent *) malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    memset(reactor->events, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    if (reactor->events == NULL) {
        printf("create epll events in %s err %s\n", __func__, strerror(errno));
        close(reactor->epfd);
        return -3;
    }
    return 0;
}
int ntyreactor_destory(struct ntyreactor *reactor) {
    close(reactor->epfd);
    free(reactor->events);
}
int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NtyCallBack acceptor) {
    if (reactor == NULL) return -1;
    if (reactor->events == NULL) return -1;
    nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor);
    nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]);
    return 0;
}
_Noreturn int ntyreactor_run(struct ntyreactor *reactor) {
    if (reactor == NULL) return -1;
    if (reactor->epfd < 0) return -1;
    if (reactor->events == NULL) return -1;
    struct epoll_event events[MAX_EPOLL_EVENTS];
    int checkpos = 0, i;
    while (1) {
        //心跳包 60s 超时则断开连接
        long now = time(NULL);
        for (i = 0; i < 100; i++, checkpos++) {
            if (checkpos == MAX_EPOLL_EVENTS) {
                checkpos = 0;
            }
            if (reactor->events[checkpos].status != 1 || checkpos == 3) {
                continue;
            }
            long duration = now - reactor->events[checkpos].last_active;
            if (duration >= 60) {
                close(reactor->events[checkpos].fd);
                printf("[fd=%d] timeout\n", reactor->events[checkpos].fd);
                nty_event_del(reactor->epfd, &reactor->events[checkpos]);
            }
        }
        int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
        if (nready < 0) {
            printf("epoll_wait error, exit\n");
            continue;
        }
        for (i = 0; i < nready; i++) {
            struct ntyevent *ev = (struct ntyevent *) events[i].data.ptr;
            ev->callback(ev->fd, events[i].events, ev->arg);
        }
    }
}
int main(int argc, char *argv[]) {
    int sockfd = init_sock(SERVER_PORT);
    struct ntyreactor *reactor = (struct ntyreactor *) malloc(sizeof(struct ntyreactor));
    if (ntyreactor_init(reactor) != 0) {
        return -1;
    }
    ntyreactor_addlistener(reactor, sockfd, accept_cb);
    ntyreactor_run(reactor);
    ntyreactor_destory(reactor);
    close(sockfd);
    return 0;
}


目录
相关文章
|
7月前
|
API C++
socket编程之常用api介绍与socket、select、poll、epoll高并发服务器模型代码实现(1)
前言   本文旨在学习socket网络编程这一块的内容,epoll是重中之重,后续文章写reactor模型是建立在epoll之上的。
82 0
|
4月前
|
存储 网络协议 安全
Epoll的实现原理
Epoll的实现原理
|
6月前
|
应用服务中间件 Go nginx
Swoole 源码分析之 epoll 多路复用模块
IO多路复用技术通过使用少量的线程或进程同时监视多个IO事件,能够更高效地处理大量的IO操作,从而提高系统的性能和资源利用率。
60 0
Swoole 源码分析之 epoll 多路复用模块
|
7月前
|
监控 安全 Linux
socket编程之常用api介绍与socket、select、poll、epoll高并发服务器模型代码实现(3)
高并发服务器模型-poll poll介绍   poll跟select类似, 监控多路IO, 但poll不能跨平台。其实poll就是把select三个文件描述符集合变成一个集合了。
78 0
|
6月前
|
存储 安全 网络协议
epoll的底层实现原理
epoll的底层实现原理
55 0
|
7月前
|
监控 安全 Linux
reactor的原理与实现
前情回顾 网络IO,会涉及到两个系统对象:   一个是用户空间调用的进程或线程   一个是内核空间的内核系统 如果发生IO操作read时,会奖励两个阶段:
72 1
|
7月前
|
监控 Java 应用服务中间件
epoll封装reactor原理剖析与代码实现(2)
epoll封装reactor原理剖析与代码实现(2)
96 0
|
7月前
|
监控 Linux API
epoll-reactor模型原理及代码解析
epoll-reactor模型原理及代码解析
146 0
|
7月前
|
存储 安全 网络协议
epoll的实现原理
epoll的实现原理
82 0