第四部分:网络层实现
4.1 事件循环(epoll封装)
// src/server/event_loop.h
#ifndef CCACHE_EVENT_LOOP_H
#define CCACHE_EVENT_LOOP_H
#include <stdint.h>
/* 事件类型 */
#define AE_NONE 0
#define AE_READABLE 1
#define AE_WRITABLE 2
/* 文件事件处理函数 */
typedef void (*aeFileProc)(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
/* 时间事件处理函数 */
typedef int (*aeTimeProc)(struct aeEventLoop *eventLoop, long long id, void *clientData);
/* 事件循环结构 */
typedef struct aeEventLoop {
int epfd; // epoll文件描述符
int stop; // 是否停止循环
struct aeFileEvent *events; // 文件事件表
struct aeFiredEvent *fired; // 就绪事件表
struct aeTimeEvent *timeEventHead; // 时间事件链表
long long timeEventNextId; // 下一个时间事件ID
} aeEventLoop;
/* API */
aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc proc, void *clientData);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
int aeProcessEvents(aeEventLoop *eventLoop);
void aeMain(aeEventLoop *eventLoop);
#endif
// src/server/event_loop.c
#include "event_loop.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <errno.h>
#define AE_SETSIZE 10240 // 最大事件数
#define AE_EPOLL_EVENTS 64
/* 文件事件结构 */
typedef struct aeFileEvent {
int mask;
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
/* 就绪事件结构 */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
/* 时间事件结构 */
typedef struct aeTimeEvent {
long long id;
long long when; // 触发时间(毫秒)
aeTimeProc *timeProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
/* 创建事件循环 */
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop = malloc(sizeof(aeEventLoop));
if (!eventLoop) return NULL;
eventLoop->epfd = epoll_create(1024);
if (eventLoop->epfd == -1) {
free(eventLoop);
return NULL;
}
eventLoop->events = calloc(setsize, sizeof(aeFileEvent));
eventLoop->fired = calloc(setsize, sizeof(aeFiredEvent));
eventLoop->stop = 0;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 1;
return eventLoop;
}
/* 创建文件事件(非阻塞IO) */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {
if (fd >= AE_SETSIZE) return -1;
aeFileEvent *fe = &eventLoop->events[fd];
// 注册到epoll
struct epoll_event ee;
ee.data.fd = fd;
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
if (epoll_ctl(eventLoop->epfd, EPOLL_CTL_ADD, fd, &ee) == -1) {
return -1;
}
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
return 0;
}
/* 事件循环核心 */
int aeProcessEvents(aeEventLoop *eventLoop) {
int numevents;
struct epoll_event events[AE_EPOLL_EVENTS];
// 计算最近的时间事件
long long shortest = -1;
aeTimeEvent *te = eventLoop->timeEventHead;
while (te) {
if (shortest == -1 || te->when < shortest) {
shortest = te->when;
}
te = te->next;
}
// 计算epoll等待时间
int timeout = -1;
if (shortest != -1) {
long long now = getMonotonicUs() / 1000;
if (shortest > now) {
timeout = shortest - now;
} else {
timeout = 0;
}
}
// 等待事件
numevents = epoll_wait(eventLoop->epfd, events, AE_EPOLL_EVENTS, timeout);
// 处理文件事件
for (int i = 0; i < numevents; i++) {
int fd = events[i].data.fd;
int mask = 0;
if (events[i].events & EPOLLIN) mask |= AE_READABLE;
if (events[i].events & EPOLLOUT) mask |= AE_WRITABLE;
aeFileEvent *fe = &eventLoop->events[fd];
if ((fe->mask & mask & AE_READABLE) && fe->rfileProc) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
}
if ((fe->mask & mask & AE_WRITABLE) && fe->wfileProc) {
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
}
}
// 处理时间事件
long long now = getMonotonicUs() / 1000;
te = eventLoop->timeEventHead;
while (te) {
if (te->when <= now) {
int retval = te->timeProc(eventLoop, te->id, te->clientData);
if (retval == AE_NOMORE) {
// 删除这个时间事件
}
}
te = te->next;
}
return numevents;
}
/* 主循环 */
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop);
}
}
4.2 连接管理
// src/server/connection.h
#ifndef CCACHE_CONNECTION_H
#define CCACHE_CONNECTION_H
#include "event_loop.h"
#include "../protocol/resp.h"
/* 连接状态 */
enum conn_state {
CONN_STATE_READING, // 读取请求
CONN_STATE_WRITING, // 写入响应
CONN_STATE_CLOSE // 关闭连接
};
/* 连接结构 */
typedef struct connection {
int fd; // socket文件描述符
struct sockaddr_in addr; // 客户端地址
aeEventLoop *eventLoop; // 所属事件循环
enum conn_state state; // 连接状态
// 读缓冲区
char *readBuffer;
int readBufferSize;
int readBufferUsed;
// 写缓冲区
char *writeBuffer;
int writeBufferSize;
int writeBufferUsed;
// 协议解析
respParser *parser; // RESP协议解析器
respValue *currentCommand; // 当前解析的命令
// 统计
time_t createTime;
long long totalRequests;
} connection;
connection *connCreate(int fd, struct sockaddr_in *addr, aeEventLoop *eventLoop);
void connFree(connection *conn);
void connSetReadHandler(connection *conn);
void connSetWriteHandler(connection *conn);
void connSendReply(connection *conn, respValue *reply);
void connClose(connection *conn);
#endif
第五部分:持久化与复制
5.1 RDB快照实现
// src/persistence/rdb.c
#include "rdb.h"
#include "../datastore/dict.h"
#include "../utils/alloc.h"
#include <stdio.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
/* RDB文件格式:
* "REDIS" (5字节) + 版本号(4字节) + 数据 + EOF(1字节) + 校验和(8字节)
*/
#define RDB_VERSION 9
#define RDB_OPCODE_SELECTDB 0xFE
#define RDB_OPCODE_EOF 0xFF
#define RDB_OPCODE_EXPIRETIME 0xFD
#define RDB_OPCODE_EXPIRETIME_MS 0xFC
/* 保存RDB快照(fork子进程执行,避免阻塞主线程)*/
int rdbSave(const char *filename) {
pid_t child_pid = fork();
if (child_pid < 0) {
LOG_ERROR("fork失败,无法保存RDB");
return C_ERR;
}
if (child_pid == 0) {
// 子进程执行保存
int ret = rdbSaveToFile(filename);
exit(ret == C_OK ? 0 : 1);
}
// 父进程返回
LOG_INFO("RDB保存已启动,子进程PID: %d", child_pid);
return C_OK;
}
/* 子进程实际保存函数 */
static int rdbSaveToFile(const char *filename) {
char tmpfile[256];
snprintf(tmpfile, sizeof(tmpfile), "%s.tmp", filename);
FILE *fp = fopen(tmpfile, "w");
if (!fp) {
LOG_ERROR("无法创建临时RDB文件: %s", tmpfile);
return C_ERR;
}
// 写入魔数
if (rdbWriteMagic(fp) != C_OK) goto error;
// 写入版本号
if (rdbWriteVersion(fp) != C_OK) goto error;
// 遍历所有数据库,写入数据
for (int i = 0; i < server.dbnum; i++) {
if (dictSize(server.db[i].dict) == 0) continue;
// 写入SELECTDB操作码和数据库编号
if (rdbSaveSelectDB(fp, i) != C_OK) goto error;
// 遍历字典,写入键值对
dictIterator *iter = dictGetIterator(server.db[i].dict);
dictEntry *de;
while ((de = dictNext(iter)) != NULL) {
robj *key = dictGetKey(de);
robj *val = dictGetVal(de);
// 检查过期时间
long long expire = getExpire(server.db + i, key);
if (expire != -1) {
if (rdbSaveExpire(fp, expire) != C_OK) goto error;
}
// 保存键值对
if (rdbSaveObject(fp, key, val) != C_OK) goto error;
}
dictReleaseIterator(iter);
}
// 写入EOF标记
if (rdbWriteOpcode(fp, RDB_OPCODE_EOF) != C_OK) goto error;
// 写入校验和
if (rdbWriteChecksum(fp) != C_OK) goto error;
fclose(fp);
// 原子替换临时文件
if (rename(tmpfile, filename) != 0) {
LOG_ERROR("重命名RDB文件失败");
return C_ERR;
}
LOG_INFO("RDB保存成功: %s", filename);
return C_OK;
error:
fclose(fp);
unlink(tmpfile);
LOG_ERROR("RDB保存失败");
return C_ERR;
}
5.2 AOF日志实现
// src/persistence/aof.c
#include "aof.h"
#include "../utils/alloc.h"
#include <unistd.h>
#include <fcntl.h>
/* AOF追加写入 */
void feedAppendOnlyFile(struct redisCommand *cmd, int argc, robj **argv) {
// 构造RESP格式的命令
sds cmdstr = sdsnew("*");
cmdstr = sdscatprintf(cmdstr, "%d\r\n", argc);
for (int i = 0; i < argc; i++) {
robj *obj = argv[i];
cmdstr = sdscatprintf(cmdstr, "$%d\r\n", sdslen(obj->ptr));
cmdstr = sdscatlen(cmdstr, obj->ptr, sdslen(obj->ptr));
cmdstr = sdscatlen(cmdstr, "\r\n", 2);
}
// 写入AOF缓冲区
server.aof_buf = sdscatlen(server.aof_buf, cmdstr, sdslen(cmdstr));
sdsfree(cmdstr);
}
/* AOF后台刷新(每2秒执行一次)*/
void flushAppendOnlyFile(int force) {
if (sdslen(server.aof_buf) == 0) return;
// 写入文件
ssize_t nwritten = write(server.aof_fd, server.aof_buf, sdslen(server.aof_buf));
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
LOG_ERROR("AOF写入失败: %s", strerror(errno));
return;
}
// 清空缓冲区
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
// fsync(根据配置决定是否立即同步)
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
fsync(server.aof_fd);
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC && force) {
fsync(server.aof_fd);
}
}