Reactor模式(二)

简介: Reactor模式


2.4 回调函数

Accepter:当连接事件到来时调用该回调函数获取底层建立好的连接

Recver:当读事件就绪时调用该回调函数读取客户端发来的数据并处理

Sender:当写事件就绪时调用该回调函数向客户端发送响应数据

Excepter:当异常事件就绪时调用该函数进行一系列资源的释放

为某个文件描述符创建Connection结构时,可以调用Connection类提供的SetCallBack函数,将这些回调函数添加到Connection结构中


监听套接字对应的Connection结构中的_recvCb为Accepter,因为监听套接字的读事件就绪就意味着连接事件就绪了,而监听套接字一般只关心读事件,因此监听套接字对应的_sendCb和_exceptCb可以设置为nullptr

当Dispatcher监测到监听套接字的读事件就绪时,会调用监听套接字对应的Connection结构中的_recvCb回调,此时就会调用Accepter回调获取底层建立好的连接

对于与客户端建立连接的套接字,其对应的Connection结构中的_recvCb、_sendCb和_exceptCb分别为Recver、Sender和Excepter

当Dispatcher监测到这些套接字的事件就绪时,就会调用其对应的Connection结构中对应的回调函数,即Recver、Sender和Excepter

2.4.1 Accepter

Accepter回调用于处理连接事件,其工作流程如下:


调用封装的Accept函数获取底层建立好的连接

使用AddConnection函数将获取到的套接字封装为Connection并添加至服务器的管理中

此时套接字及其对应需要关心的事件就已注册到Dispatcher中

下一次Dispatcher在进行事件派发时就会关注该套接字对应的事件,当事件就绪时就会执行该套接字对应的Connection结构中对应的回调方法

void Accepter(Connection* con) 
{
    while(true)
    {
        string clientIp;
        uint16_t clientPort;
        int acceptErrno = 0;
        int socket = Socket::Accept(con->_socketFd, &clientIp, &clientPort, &acceptErrno);
        if(socket < 0) 
        {
            if(acceptErrno == EAGAIN || acceptErrno == EWOULDBLOCK) break;//底层已无链接
            else if(acceptErrno == EINTR) continue;//信号中断
            else {//读取失败
                LogMessage(WARNING, "Accept error, %d : %s", acceptErrno, strerror(acceptErrno));
                break;
            }
        }
        AddConnection(socket, bind(&TcpServer::Recver, this, std::placeholders::_1), \
        bind(&TcpServer::Sender, this, std::placeholders::_1), bind(&TcpServer::Excepter, this, std::placeholders::_1));
        LogMessage(DEBUG, "Accept client [%s : %d] success, socket: %d", clientIp.c_str(), clientPort, socket);
    }
}


本博客实现的ET模式下的epoll服务器,因此在获取底层连接时需要循环调用accept函数进行读取,并且监听套接字必须设置为非阻塞


因为ET模式下只有当底层建立的连接从无到有或是从有到多时才会通知上层,若没有一次性将底层建立好的连接全部获取,并且此后再也没有建立好的连接,那么底层没有读取完的连接就相当于丢失了

循环调用accept函数也意味着,当底层连接全部被获取后再调用accept函数,此时就会因为底层已经没有连接了而被阻塞住,因此需要将监听套接字设置为非阻塞,这样当底层没有连接时accept就不会被阻塞。accept获取到的新的套接字也需设置为非阻塞,为了避免将来循环调用recv、send等函数时被阻塞

设置非阻塞的操作都在AddConnection函数中的SetNonBlock函数完成

设置非阻塞


设置文件描述符为非阻塞时,需先调用fcntl函数获取该文件描述符对应的文件状态标记,然后在该文件状态标记的基础上添加非阻塞标记O_NONBLOCK,最后调用fcntl函数对该文件描述符的状态标记进行设置即可

static bool SetNonBlock(int socket) {
    int fl = fcntl(socket, F_GETFL);
    if(fl < 0) return false;
    fcntl(socket, F_SETFL, fl | O_NONBLOCK);
    return true;
} 

监听套接字设置为非阻塞后,当底层连接不就绪时,accept函数会以出错的形式返回,因此当调用accept函数的返回值小于0时,需继续判断错误码


若错误码为EAGAIN或EWOULDBLOCK,说明本次出错返回是因为底层已经没有可获取的连接了,此时底层连接全部获取完毕,这时可以返回0,表示本次Accepter调用成功。

若错误码为EINTR,说明本次调用accept函数获取底层连接时被信号中断了,这时还应该继续调用accept函数进行获取。

除此之外,才说明accept函数是真正调用失败了,此时可以返回-1,表示本次accepter调用失败

accept、recv和send等IO系统调用为什么会被信号中断?


IO系统调用函数出错返回并且将错误码设置为EINTR,表明本次在进行数据读取或数据写入之前被信号中断了,即IO系统调用在陷入内核,但并没有返回用户态的时候内核去处理其他信号


在内核态返回用户态之前会检查信号的pending位图,即未决信号集,若pending位图中有未处理的信号,那么内核就会对该信号进行处理

IO系统调用函数在进行IO操作前就被信号中断了,这是一个特例,因为IO过程分为"等"和"拷贝"两个步骤,一般"等"的过程比较漫长,而在这个过程中执行流其实是处于闲置状态的,因此在"等"的过程中若有信号产生,内核就会立即进行信号的处理

写事件按需打开


Accepter获取上来的套接字在添加到Dispatcher中时,只添加了EOPLLIN和EPOLLET事件,即只让epoll关心该套接字的读事件


之所以没有添加写事件,是因为并没有要发送的数据,因此没有必要让epoll关心写事件。一般读事件是会被设置的,而写事件则是按需打开的,只当有数据要发送时才会将写事件打开,并且在数据全部写入完毕后又会立即将写事件关闭


2.4.2 Recver

recver回调用于处理读事件,其工作流程如下:


循环调用recv函数读取数据,并将读取到的数据添加到该套接字对应Connection结构的_inBuffer中

对_inBuffer中的数据进行切割,将完整的报文切割出来,剩余的留在inbuffer中

调用业务处理函数

void Recver(Connection* con) 
{
    bool error = false;
    while(true)
    {
        char buffer[1024];
        ssize_t num = recv(con->_socketFd, buffer, sizeof(buffer) - 1, 0);
        if(num < 0) 
        {
            if(errno == EAGAIN || errno == EWOULDBLOCK) break;
            else if(errno == EINTR) continue;
            else {
                LogMessage(WARNING, "recv error %d : %s", errno, strerror(errno));
                error = true;
                con->_exceptCb(con);
                break;
            }
        }
        else if(num == 0) {
            LogMessage(DEBUG, "client[%d] quit, serve close %d", con->_socketFd, con->_socketFd);
            error = true;
            con->_exceptCb(con);
            break;
        }
        else {
            buffer[num] = '\0';
            con->_inBuffer += buffer;//放入链接的输入缓冲区中
        }
    }
    LogMessage(DEBUG, "socket: %d , con->_inBuffer: %s", con->_socketFd, (con->_inBuffer).c_str());
    if(!error)//无错
    {
        vector<string> messages;
        SpliteMessage(con->_inBuffer, &messages);
        for(auto& msg : messages) _cb(con, msg);
    }
}

当recv函数的返回值小于0时需要进一步判断错误码,若错误码为EAGAIN或EWOULDBLOCK则说明底层数据读取完毕了,若错误码为EINTR则说明读取过程被信号中断了,此时还需继续调用recv函数进行读取,否则就是读取出错了

当读取出错时直接调用该套接字对应的_exceptCb回调,在_exceptCb回调中将该套接字进行关闭

报文切割


报文切割本质就是为了防止粘包问题,而粘包问题还涉及到协议定制


需要根据协议知道如何将各个报文进行分离,如UDP分离报文采用的就是定长报头+自描述字段

本博客目的是演示整个数据处理的过程,为了简单起见就不进行过于复杂的协议定制了,就以"X"作为各个报文之间的分隔符,每个报文的最后都会以一个"X"作为报文结束的标志

因此现在要做的就是以"X"作为分隔符对_inBuffer中的字符串进行切割

SpliteMessage函数要做的就是对_inBuffer中的字符串进行切割,将切割出来的一个个报文放到vector中,对于最后无法切出完整报文的数据就留在_inBuffer中

void SpliteMessage(string &buffer, vector<string> *out)
{
    while (true)
    {
        size_t pos = buffer.find(SEP);
        if (pos == string::npos)
            break;
        string message = buffer.substr(0, pos);
        buffer.erase(0, pos + SEP_LENGTH);
        out->push_back(message);
    }
}

业务处理函数


对切割出来的完整报文进行反序列化

业务处理

业务处理后形成响应报文

将响应报头添加到对应Conection结构的_outBuffer中,并打开写事件

下一次Dispatcher在进行事件派发时就会关注该套接字的写事件,当写事件就绪时就会执行该套接字对应的Connection结构中写回调方法,进而将_outBuffer中的响应数据发送给客户端

void NetCal(Connection* con, string& request) 
{
    LogMessage(DEBUG, "NetCal been call, Get request: %s", request.c_str());
    //反序列化
    Request req;
    if(!req.Deserialized(request)) return;
    //业务处理
    Response resp = calculator(req);
    //构建应答
    string sendstr = resp.Serialize();
    sendstr = Encode(sendstr);
    //递交
    con->_outBuffer += sendstr;
    //"提醒"TCP服务器处理
    con->_svrPtr->EnableReadWrite(con, true, true);
}


协议定制

string Encode(string &s) {
    return s + SEP;
}
class Request
{
public:
    string Serialize()
    {
        std::string str;
        str = std::to_string(x_);
        str += SPACE;
        str += op_; // TODO
        str += SPACE;
        str += std::to_string(y_);
        return str;
    }
    bool Deserialized(const std::string &str)
    {
        std::size_t left = str.find(SPACE);
        if (left == std::string::npos)
            return false;
        std::size_t right = str.rfind(SPACE);
        if (right == std::string::npos)
            return false;
        x_ = atoi(str.substr(0, left).c_str());
        y_ = atoi(str.substr(right + SPACE_LEN).c_str());
        if (left + SPACE_LEN > str.size())
            return false;
        else
            op_ = str[left + SPACE_LEN];
        return true;
    }
public:
    Request() {}
    Request(int x, int y, char op) : x_(x), y_(y), op_(op) {}
    ~Request() {}
public:
    int x_;
    int y_;
    char op_; // '+' '-' '*' '/' '%'
};
class Response
{
public:
    string Serialize()
    {
        string s;
        s = std::to_string(code_);
        s += SPACE;
        s += std::to_string(result_);
        return s;
    }
    bool Deserialized(const string &s)
    {
        size_t pos = s.find(SPACE);
        if (pos == string::npos)
            return false;
        code_ = atoi(s.substr(0, pos).c_str());
        result_ = atoi(s.substr(pos + SPACE_LEN).c_str());
        return true;
    }
public:
    Response() {}
    Response(int result, int code) : result_(result), code_(code) {}
    ~Response() {}
public:
    int result_; // 计算结果
    int code_;   // 计算结果的状态码
};


2.4.3 Sender

循环调用send函数发送数据,并将发送出去的数据从该套接字对应Connection结构的_outBuffer中删除。

若循环调用send函数后该套接字对应的_outBuffer中的数据被全部发送,此时就需要将该套接字对应的写事件关闭,因为已没有要发送的数据了,若_outBuffer中的数据还有剩余,那么该套接字对应的写事件就应继续打开

void Sender(Connection* con)
{
    while(true)
    {
        ssize_t size = send(con->_socketFd, con->_outBuffer.c_str(), con->_outBuffer.size(), 0);
        if(size > 0) {
            con->_outBuffer.erase(0,size);
            if(con->_outBuffer.empty()) break;
        }
        else {
            if(errno == EAGAIN || errno == EWOULDBLOCK) break;
            else if(errno == EINTR) continue;
            else {
                LogMessage(WARNING, "send error %d : %s", errno, strerror(errno));
                con->_exceptCb(con);
                break;
            }
        }
    }
    if(con->_outBuffer.empty()) EnableReadWrite(con, true, false);
    else EnableReadWrite(con, true, true);
}

send函数的返回值小于0时需进一步判断错误码,若错误码为EAGAIN或EWOULDBLOCK则说明底层TCP发送缓冲区已被写满了,这时将已经发送的数据从_outBuffer中移除

若错误码为EINTR则说明发送过程被信号中断了,此时还需要继续调用send函数进行发送,否则就是发送出错了

当发送出错时直接调用该套接字对应的_exceptCb回调,在_exceptCb回调中将该套接字进行关闭

若最终_outBuffer中的数据全部发送成功,则_outBuffer被清空,可以关闭对写事件的关心

2.4.4 Excepter

对于异常事件就绪的套接字不做过多处理,调用close函数将该套接字关闭即可

但在关闭该套接字前,需先将该套接字从epoll模型中删除,并取消该套接字与其对应的Connection结构的映射关系

释放Connection对象

void Excepter(Connection* con)
{
    if(!(_connections.find(con->_socketFd) != _connections.end())) return;
    else //还存在
    {
        //从epoll中移除
        bool ret = _epoll.DelFromEpoll(con->_socketFd);
        assert(ret);
        //从映射表中移除
        _connections.erase(con->_socketFd);
        //关闭
        close(con->_socketFd);
        //释放链接对象
        delete con;
    }
    LogMessage(DEBUG, "Excepter 回收完毕");
}

2.5 Socket套接字

封装有关网络通信的接口

//网络套接字封装
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <fcntl.h>
#include "Log.hpp"
class Socket
{
    const static int gbacklog = 20;
public://服务端客户端通用
    static int SocketCreate() {
        int SocketFd = socket(AF_INET, SOCK_STREAM, 0);
        if(SocketFd < 0) {
            LogMessage(FATAL, "socket create fail, %d:%s", errno, strerror(errno));
            exit(1);
        }
        LogMessage(NORMAL, "socket create success, SocketFd:%d", SocketFd);
        return SocketFd;
    }
    static bool SetNonBlock(int socket) {
        int fl = fcntl(socket, F_GETFL);
        if(fl < 0) return false;
        fcntl(socket, F_SETFL, fl | O_NONBLOCK);
        return true;
    } 
public://服务端专用
    static void Bind(int listenSocketFd, uint16_t serverPort, std::string serverIp = "0.0.0.0") {
        struct sockaddr_in local;
        memset(&local, '\0', sizeof local);
        local.sin_family = AF_INET;
        local.sin_port = htons(serverPort);
        inet_pton(AF_INET, serverIp.c_str(), &local.sin_addr);
        if(bind(listenSocketFd, (struct sockaddr*)&local, sizeof local) < 0) {
            LogMessage(FATAL, "bind fail, %d:%s", errno, strerror(errno));
            exit(2);
        }
        LogMessage(NORMAL, "bind success, serverPort:%d", serverPort);
    }
    static void Listen(int listenSocketFd) {
        if(listen(listenSocketFd, gbacklog) < 0) {
            LogMessage(FATAL, "listen fail, %d:%s", errno, strerror(errno));
            exit(3);
        }
        LogMessage(NORMAL, "listen success");
    }
    static int Accept(int listenSocketFd, std::string* clientIp, uint16_t* clientPort, int* acceptErrno) {
        struct sockaddr_in client;
        socklen_t length = sizeof client;
        int serviceSocketFd = accept(listenSocketFd, (struct sockaddr*)&client, &length);
        if(serviceSocketFd < 0) {
            *acceptErrno = errno;
            return -1;
        }
        if(clientIp != nullptr) *clientIp = inet_ntoa(client.sin_addr);
        if(clientPort != nullptr) *clientPort = ntohs(client.sin_port);
        return serviceSocketFd;
    }
public://客户端专用
    bool Connect(int clientSocketFd, std::string& serverIp, uint16_t& serverPort) {
        struct sockaddr_in server;
        server.sin_family = AF_INET;
        server.sin_addr.s_addr = inet_addr(serverIp.c_str());
        server.sin_port = htons(serverPort);
        if(connect(clientSocketFd, (struct sockaddr*)&server, sizeof server) == 0) return true;
        else return false;
    }
public:
    Socket() {}
    ~Socket() {}
};

2.6 服务器测试

启动服务器后就可以发现监听套接字为3号文件描述符


dc312c44a42a4eefbfa5fb4b3c44946e.png


当客户端连接服务器后,在服务器端会显示客户端使用的是5号文件描述符,因为4号文件描述符已被epoll模型使用了


8bdf76c24dc44ca283dfa113fdaf1e73.png


此时客户端可以向服务器发送一些简单计算任务,计算任务间用"X"隔开,服务器收到计算请求处理后会将计算结果发送给客户端,计算结果之间也是用"X"隔开的。若发送的不是完整报文,则会保存在socket对应的Connection结构中的_inBuffer中


2aca2b6524dd454fadb2f80e2113c6e6.png


由于使用了多路转接技术,虽然epoll服务器是一个单进程的服务器,但却可同时为多个客户端提供服务



85093b0182584cc0b5a8befd1e5eded6.png

当客户端退出后服务器端也会将对应的文件描述符从epoll模型中删除


a420a6ebf68f48ea83e29fe1d447d54e.png


三、总结

基于多路转接方案,当事件就绪的时候,采用回调的方式,进行业务处理的模式就被称为反应堆模式(Reactor)。上述代码中的TcpServer就是一个反应堆,其中一个个Connection对象就称为事件。每一个事件中都有:


文件描述符

独立的缓冲区

回调方法

回指向反应堆的指针

反应堆中有一个事件派发函数,当epoll中的某个事件就绪,事件派发函数回调用此事件的回调函数


特性


单进程:既负责事件派发又负责IO

半异步半同步:异步,事件到来是随机的。同步:当前线程参与IO


目录
相关文章
|
7月前
|
消息中间件 Kubernetes NoSQL
Reactor 和 Proactor 区别
Reactor 和 Proactor 区别
|
2月前
|
Java
Reactor模式
通过一个具体的Java代码示例展示了如何在NIO框架下实现Reactor模式,用于处理网络IO事件,包括事件的接收、分发和处理。
40 4
Reactor模式
|
2月前
|
NoSQL Java Redis
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
本文通过一个简单的单线程Reactor模式的Java代码示例,展示了如何使用NIO创建一个服务端,处理客户端的连接和数据读写,帮助理解Reactor模式的核心原理。
36 0
Reactor实战,创建一个简单的单线程Reactor(理解了就相当于理解了多线程的Reactor)
|
7月前
|
设计模式
深入浅出Reactor和Proactor模式
深入浅出Reactor和Proactor模式
|
7月前
|
监控 安全 Linux
reactor的原理与实现
前情回顾 网络IO,会涉及到两个系统对象:   一个是用户空间调用的进程或线程   一个是内核空间的内核系统 如果发生IO操作read时,会奖励两个阶段:
72 1
|
7月前
|
缓存
2.1.2事件驱动reactor的原理与实现
2.1.2事件驱动reactor的原理与实现
|
7月前
|
Java 调度
【Netty 网络通信】Reactor模式
【1月更文挑战第9天】【Netty 网络通信】Reactor模式
|
7月前
|
监控 Java 应用服务中间件
Reactor反应器模式
在Java的OIO编程中,最初和最原始的网络服务器程序使用一个while循环,不断地监听端口是否有新的连接,如果有就调用一个处理函数来处理。这种方法最大的问题就是如果前一个网络连接的处理没有结束,那么后面的连接请求没法被接收,于是后面的请求统统会被阻塞住,服务器的吞吐量就太低了。 为了解决这个严重的连接阻塞问题,出现了一个即为经典模式:Connection Per Thread。即对于每一个新的网络连接都分配一个线程,每个线程都独自处理自己负责的输入和输出,任何socket连接的输入和输出处理不会阻塞到后面新socket连接的监听和建立。早期版本的Tomcat服务器就是这样实现的。
|
设计模式 网络协议 数据处理
Reactor模式(一)
Reactor模式
124 0
|
设计模式 存储 监控