C语言企业项目实战(三)

简介: 教程来源 https://xcfsr.cn/category/ai.html 本项目实现轻量级缓存服务器网络层与持久化模块:基于epoll封装事件循环,支持高并发I/O;提供连接管理、RESP协议解析;集成RDB快照(fork子进程异步保存)与AOF日志(追加写+定时刷盘),兼顾性能与数据安全。

第四部分:网络层实现

4.1 事件循环(epoll封装)

// src/server/event_loop.h
#ifndef CCACHE_EVENT_LOOP_H
#define CCACHE_EVENT_LOOP_H

#include <stdint.h>

/* 事件类型 */
#define AE_NONE     0
#define AE_READABLE 1
#define AE_WRITABLE 2

/* 文件事件处理函数 */
typedef void (*aeFileProc)(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

/* 时间事件处理函数 */
typedef int (*aeTimeProc)(struct aeEventLoop *eventLoop, long long id, void *clientData);

/* 事件循环结构 */
typedef struct aeEventLoop {
    int epfd;                           // epoll文件描述符
    int stop;                           // 是否停止循环
    struct aeFileEvent *events;         // 文件事件表
    struct aeFiredEvent *fired;         // 就绪事件表
    struct aeTimeEvent *timeEventHead;  // 时间事件链表
    long long timeEventNextId;          // 下一个时间事件ID
} aeEventLoop;

/* API */
aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc proc, void *clientData);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
int aeProcessEvents(aeEventLoop *eventLoop);
void aeMain(aeEventLoop *eventLoop);

#endif
// src/server/event_loop.c
#include "event_loop.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <errno.h>

#define AE_SETSIZE 10240  // 最大事件数
#define AE_EPOLL_EVENTS 64

/* 文件事件结构 */
typedef struct aeFileEvent {
    int mask;
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

/* 就绪事件结构 */
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;

/* 时间事件结构 */
typedef struct aeTimeEvent {
    long long id;
    long long when;           // 触发时间(毫秒)
    aeTimeProc *timeProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;

/* 创建事件循环 */
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop = malloc(sizeof(aeEventLoop));
    if (!eventLoop) return NULL;

    eventLoop->epfd = epoll_create(1024);
    if (eventLoop->epfd == -1) {
        free(eventLoop);
        return NULL;
    }

    eventLoop->events = calloc(setsize, sizeof(aeFileEvent));
    eventLoop->fired = calloc(setsize, sizeof(aeFiredEvent));
    eventLoop->stop = 0;
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 1;

    return eventLoop;
}

/* 创建文件事件(非阻塞IO) */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {
    if (fd >= AE_SETSIZE) return -1;

    aeFileEvent *fe = &eventLoop->events[fd];

    // 注册到epoll
    struct epoll_event ee;
    ee.data.fd = fd;
    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;

    if (epoll_ctl(eventLoop->epfd, EPOLL_CTL_ADD, fd, &ee) == -1) {
        return -1;
    }

    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;

    return 0;
}

/* 事件循环核心 */
int aeProcessEvents(aeEventLoop *eventLoop) {
    int numevents;
    struct epoll_event events[AE_EPOLL_EVENTS];

    // 计算最近的时间事件
    long long shortest = -1;
    aeTimeEvent *te = eventLoop->timeEventHead;
    while (te) {
        if (shortest == -1 || te->when < shortest) {
            shortest = te->when;
        }
        te = te->next;
    }

    // 计算epoll等待时间
    int timeout = -1;
    if (shortest != -1) {
        long long now = getMonotonicUs() / 1000;
        if (shortest > now) {
            timeout = shortest - now;
        } else {
            timeout = 0;
        }
    }

    // 等待事件
    numevents = epoll_wait(eventLoop->epfd, events, AE_EPOLL_EVENTS, timeout);

    // 处理文件事件
    for (int i = 0; i < numevents; i++) {
        int fd = events[i].data.fd;
        int mask = 0;
        if (events[i].events & EPOLLIN) mask |= AE_READABLE;
        if (events[i].events & EPOLLOUT) mask |= AE_WRITABLE;

        aeFileEvent *fe = &eventLoop->events[fd];
        if ((fe->mask & mask & AE_READABLE) && fe->rfileProc) {
            fe->rfileProc(eventLoop, fd, fe->clientData, mask);
        }
        if ((fe->mask & mask & AE_WRITABLE) && fe->wfileProc) {
            fe->wfileProc(eventLoop, fd, fe->clientData, mask);
        }
    }

    // 处理时间事件
    long long now = getMonotonicUs() / 1000;
    te = eventLoop->timeEventHead;
    while (te) {
        if (te->when <= now) {
            int retval = te->timeProc(eventLoop, te->id, te->clientData);
            if (retval == AE_NOMORE) {
                // 删除这个时间事件
            }
        }
        te = te->next;
    }

    return numevents;
}

/* 主循环 */
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop);
    }
}

4.2 连接管理

// src/server/connection.h
#ifndef CCACHE_CONNECTION_H
#define CCACHE_CONNECTION_H

#include "event_loop.h"
#include "../protocol/resp.h"

/* 连接状态 */
enum conn_state {
    CONN_STATE_READING,   // 读取请求
    CONN_STATE_WRITING,   // 写入响应
    CONN_STATE_CLOSE      // 关闭连接
};

/* 连接结构 */
typedef struct connection {
    int fd;                     // socket文件描述符
    struct sockaddr_in addr;    // 客户端地址
    aeEventLoop *eventLoop;     // 所属事件循环

    enum conn_state state;      // 连接状态

    // 读缓冲区
    char *readBuffer;
    int readBufferSize;
    int readBufferUsed;

    // 写缓冲区
    char *writeBuffer;
    int writeBufferSize;
    int writeBufferUsed;

    // 协议解析
    respParser *parser;         // RESP协议解析器
    respValue *currentCommand;  // 当前解析的命令

    // 统计
    time_t createTime;
    long long totalRequests;
} connection;

connection *connCreate(int fd, struct sockaddr_in *addr, aeEventLoop *eventLoop);
void connFree(connection *conn);
void connSetReadHandler(connection *conn);
void connSetWriteHandler(connection *conn);
void connSendReply(connection *conn, respValue *reply);
void connClose(connection *conn);

#endif

第五部分:持久化与复制

5.1 RDB快照实现

// src/persistence/rdb.c
#include "rdb.h"
#include "../datastore/dict.h"
#include "../utils/alloc.h"
#include <stdio.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>

/* RDB文件格式:
 * "REDIS" (5字节) + 版本号(4字节) + 数据 + EOF(1字节) + 校验和(8字节)
 */

#define RDB_VERSION 9
#define RDB_OPCODE_SELECTDB 0xFE
#define RDB_OPCODE_EOF 0xFF
#define RDB_OPCODE_EXPIRETIME 0xFD
#define RDB_OPCODE_EXPIRETIME_MS 0xFC

/* 保存RDB快照(fork子进程执行,避免阻塞主线程)*/
int rdbSave(const char *filename) {
    pid_t child_pid = fork();

    if (child_pid < 0) {
        LOG_ERROR("fork失败,无法保存RDB");
        return C_ERR;
    }

    if (child_pid == 0) {
        // 子进程执行保存
        int ret = rdbSaveToFile(filename);
        exit(ret == C_OK ? 0 : 1);
    }

    // 父进程返回
    LOG_INFO("RDB保存已启动,子进程PID: %d", child_pid);
    return C_OK;
}

/* 子进程实际保存函数 */
static int rdbSaveToFile(const char *filename) {
    char tmpfile[256];
    snprintf(tmpfile, sizeof(tmpfile), "%s.tmp", filename);

    FILE *fp = fopen(tmpfile, "w");
    if (!fp) {
        LOG_ERROR("无法创建临时RDB文件: %s", tmpfile);
        return C_ERR;
    }

    // 写入魔数
    if (rdbWriteMagic(fp) != C_OK) goto error;

    // 写入版本号
    if (rdbWriteVersion(fp) != C_OK) goto error;

    // 遍历所有数据库,写入数据
    for (int i = 0; i < server.dbnum; i++) {
        if (dictSize(server.db[i].dict) == 0) continue;

        // 写入SELECTDB操作码和数据库编号
        if (rdbSaveSelectDB(fp, i) != C_OK) goto error;

        // 遍历字典,写入键值对
        dictIterator *iter = dictGetIterator(server.db[i].dict);
        dictEntry *de;
        while ((de = dictNext(iter)) != NULL) {
            robj *key = dictGetKey(de);
            robj *val = dictGetVal(de);

            // 检查过期时间
            long long expire = getExpire(server.db + i, key);
            if (expire != -1) {
                if (rdbSaveExpire(fp, expire) != C_OK) goto error;
            }

            // 保存键值对
            if (rdbSaveObject(fp, key, val) != C_OK) goto error;
        }
        dictReleaseIterator(iter);
    }

    // 写入EOF标记
    if (rdbWriteOpcode(fp, RDB_OPCODE_EOF) != C_OK) goto error;

    // 写入校验和
    if (rdbWriteChecksum(fp) != C_OK) goto error;

    fclose(fp);

    // 原子替换临时文件
    if (rename(tmpfile, filename) != 0) {
        LOG_ERROR("重命名RDB文件失败");
        return C_ERR;
    }

    LOG_INFO("RDB保存成功: %s", filename);
    return C_OK;

error:
    fclose(fp);
    unlink(tmpfile);
    LOG_ERROR("RDB保存失败");
    return C_ERR;
}

5.2 AOF日志实现

// src/persistence/aof.c
#include "aof.h"
#include "../utils/alloc.h"
#include <unistd.h>
#include <fcntl.h>

/* AOF追加写入 */
void feedAppendOnlyFile(struct redisCommand *cmd, int argc, robj **argv) {
    // 构造RESP格式的命令
    sds cmdstr = sdsnew("*");
    cmdstr = sdscatprintf(cmdstr, "%d\r\n", argc);

    for (int i = 0; i < argc; i++) {
        robj *obj = argv[i];
        cmdstr = sdscatprintf(cmdstr, "$%d\r\n", sdslen(obj->ptr));
        cmdstr = sdscatlen(cmdstr, obj->ptr, sdslen(obj->ptr));
        cmdstr = sdscatlen(cmdstr, "\r\n", 2);
    }

    // 写入AOF缓冲区
    server.aof_buf = sdscatlen(server.aof_buf, cmdstr, sdslen(cmdstr));
    sdsfree(cmdstr);
}

/* AOF后台刷新(每2秒执行一次)*/
void flushAppendOnlyFile(int force) {
    if (sdslen(server.aof_buf) == 0) return;

    // 写入文件
    ssize_t nwritten = write(server.aof_fd, server.aof_buf, sdslen(server.aof_buf));
    if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
        LOG_ERROR("AOF写入失败: %s", strerror(errno));
        return;
    }

    // 清空缓冲区
    sdsfree(server.aof_buf);
    server.aof_buf = sdsempty();

    // fsync(根据配置决定是否立即同步)
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        fsync(server.aof_fd);
    } else if (server.aof_fsync == AOF_FSYNC_EVERYSEC && force) {
        fsync(server.aof_fd);
    }
}

来源:
https://xcfsr.cn/category/ai.html

相关文章
|
6天前
|
人工智能 JSON 监控
Claude Code 源码泄露:一份价值亿元的 AI 工程公开课
我以为顶级 AI 产品的护城河是模型。读完这 51.2 万行泄露的源码,我发现自己错了。
4358 17
|
17天前
|
人工智能 JSON 机器人
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
本文带你零成本玩转OpenClaw:学生认证白嫖6个月阿里云服务器,手把手配置飞书机器人、接入免费/高性价比AI模型(NVIDIA/通义),并打造微信公众号“全自动分身”——实时抓热榜、AI选题拆解、一键发布草稿,5分钟完成热点→文章全流程!
16679 138
让龙虾成为你的“公众号分身” | 阿里云服务器玩Openclaw
|
5天前
|
人工智能 数据可视化 安全
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
本文详解如何用阿里云Lighthouse一键部署OpenClaw,结合飞书CLI等工具,让AI真正“动手”——自动群发、生成科研日报、整理知识库。核心理念:未来软件应为AI而生,CLI即AI的“手脚”,实现高效、安全、可控的智能自动化。
4854 8
王炸组合!阿里云 OpenClaw X 飞书 CLI,开启 Agent 基建狂潮!(附带免费使用6个月服务器)
|
7天前
|
人工智能 自然语言处理 数据挖掘
零基础30分钟搞定 Claude Code,这一步90%的人直接跳过了
本文直击Claude Code使用痛点,提供零基础30分钟上手指南:强调必须配置“工作上下文”(about-me.md+anti-ai-style.md)、采用Cowork/Code模式、建立标准文件结构、用提问式提示词驱动AI理解→规划→执行。附可复制模板与真实项目启动法,助你将Claude从聊天工具升级为高效执行系统。
|
6天前
|
人工智能 定位技术
Claude Code源码泄露:8大隐藏功能曝光
2026年3月,Anthropic因配置失误致Claude Code超51万行源码泄露,意外促成“被动开源”。代码中藏有8大未发布功能,揭示其向“超级智能体”演进的完整蓝图,引发AI编程领域震动。(239字)
2461 9