1、redis 网络协议
1.1、redis 网络
微观上:reactor
- 组成:IO 多路复用 + 非阻塞 IO
- IO 职责:IO 检测和 IO 操作
- 事件:异步事件处理流程,先注册事件,事件循环中处理事件 callback
宏观上:可以忽略其他流程,只关注数据包处理流程。当管道(连接)构成一个完整的包,处理对应的事件。
1.2、redis 协议
redis 协议设计
- 消息边界:字符流头部 + 分隔符
- 消息类型:字符串的第一个字符。
redis 采用 RESP 序列化协议,协议的不同部分使用以CRLF(\r\n)
结束。
RESP 支持的数据类型,通过第一个字符判断数据类型
+
Simple Strings
+OK\r\n-
Errors:
-Error <message>\r\n:
Integers
:<数值>\r\n$
Bulk Strings
$<数据长度>\r\n<数据内容>\r\n*
Arrays
*<元素个数n>\r\n<元素内容>...<元素n>
RESP 在 redis 请求-响应协议中的作用方式
- 客户端发送字符串数组 ( Array + Bulk Strings) 到 redis 服务器
*<参数数量>\r\n$<参数1的长度>\r\n<参数1的数据>\r\n...$<参数n的长度>\r\n<参数n的数据>\r\n - redis 服务器根据命令实现回复一种 RESP 数据类型到客户端。
来看下面例子
在 redis-cli,发送一条命令 set key value
,对应的报文为:
*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue
执行成功 OK
,回应的报文为:
+OK\r\n
若执行失败,回应的报文为
-ERR unknown command `ket`, with args beginning with: `key`, `value`, \r\n
2、redis pipline
redis pipline 是 redis 客户端提供的机制,与 redis 本身无关,是为了节约网络传输时间而设计的。具体来说,客户端一次性发送多个请求,redis 服务器按序依次回复,与 http 1.1 类似。
pipeline
3、redis 事务
事务:用户定义一系列数据库操作,这些操作视为一个完整的逻辑处理工作单元,要么全部执行,要么全部不执行,是不可分割的工作单元。
探讨事务的前提:在并发连接的情况下,不同连接异步执行命令造成的不可预期的冲突。
3.1、事务的特征
- 原子性 Atomicity:事务不可分割,要么全部成功,要么全部失败,如果执行失败必须提供回滚机制。原子操作的原子性,只有执行或不执行,其他线程不可能看到其他状态。
- 一致性 Consistency:事务的前后,所有的数据都保持一个一致的状态,不能违反数据的一致性检测。这里的一致性指的是预期的一致性不是异常后的一致性。类型一致性,逻辑一致性,数据一致性(主从数据库一致)。
- 隔离性 Isolation:并发事务间的要相互隔离。redis 单线程执行,天然具备隔离性。mysql 一条连接对应一个线程。多线程环境下需要对临界资源进行加锁。
- 持久性 Durability:事务一旦提交,对数据的改变就是永久性的,即数据落盘。
3.2、事务命令
redis 客户端以 MULTI 开启一个事务,发送多个命令到服务端的队列,直到发送 EXEC 命令后redis 服务端才会执行队列中的命令,将队列作为一个整体来执行。
# 开启事务 MULTI # 提交事务 EXEC # 取消事务 DISCARD # 监视 key 的变动,在事务开启前调用,乐观锁 cas 实现。若在事务执行中,key 变动则取消事务返回 nil。 WATCH key
实际工作中不会使用,这是因为事务命令是由乐观锁实现的,失败需要重试,会增加业务逻辑的复杂程度。
3.3、* lua 脚本
redis 内置 lua 解释器来执行 lua 脚本,通过 lua 脚本实现原子性。
面试点:lua 脚本满足原子性和隔离性,不满足一致性和持久性。
- 原子性:Lua 脚本通过一个命令执行,脚本中所有的命令一起执行,具有原子性。
- 一致性:不具备一致性,lua 脚本执行失败,已经成功的命令作用数据库,无法回滚。
- 隔离性:redis 单线程执行,且 lua 脚本作为单数据包运行。
- 持久性:不具备持久性,只有在 aof 并且 appendfsync = always 才具备,实际工作不会采用该方法。
3.3.1、命令
# 测试使用 # 执行 lua 脚本 EVAL script numkeys [key...] arg [arg...] # 实际使用 # 只保存40位哈希字符串,减少数据传输量 # 1、缓存脚本,将用户给定的脚本缓存在服务器中,并返回脚本对应的SHA1校验和(40位字符串)作为结果 SCRIPT LOAD script # 2、执行缓存的脚本 EVALSHA sha1 numkeys key [key ...] arg [arg ...] # 附:脚本管理命令 # 检查脚本是否缓存 SCRIPT EXISTS sha1 [sha1...] # 清除所有脚本缓存 SCRIPT FLUSH # 强制停止正在运行的脚本,如死循环 SCRIPT KILL
3.3.2、应用
- 项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本 script load
- 项目中若需要热更新,通过 redis-cli 执行 script flush。然后可以通过订阅发布功能通知所有服务器重新加载lua脚本
- 若项目中lua脚本发生阻塞,可通过 script kill 暂停当前阻塞脚本的执行
例:执行加倍操作
set mark 1 # 测试使用 eval "local val=redis.call('get',KEYS[1]);if val then redis.call('set', KEYS[1], 2*val);return 2*val;end;return 0;" 1 mark (integer) 2 127.0.0.1:6379> eval "local val=redis.call('get',KEYS[1]);if val then redis.call('set', KEYS[1], 2*val);return 2*val;end;return 0;" 1 darren (integer) 0 # 实际使用 # 1、缓存脚本 script load "local val=redis.call('get',KEYS[1]);if val then redis.call('set', KEYS[1], 2*val);return 2*val;end;return 0;" "9da2e1ac090f2e1df67087370de115a4291cd0bd" # 2、执行缓存脚本 evalsha "9da2e1ac090f2e1df67087370de115a4291cd0bd" 1 mark (integer) 4
4、redia 发布订阅
为了支持消息的多播机制,redis 引入了发布订阅模块,是一种分布式消息队列机制。订阅者通过特定的频道来接收发送者发送至该频道的消息。该机制并不保证消息一定到达,可以采用 stream 方式确保可达。
存在的问题有:发送者发送一条消息,若没有订阅者,则消息直接丢弃。若发送期间,一个订阅者断开连接,那么在断开连接期间消息对于该订阅者来说彻底丢失了。此外,redis 停机重启,pubsub 的消息是不会持久化的,所有的消息被直接丢弃。
4.1、命令
# 向频道发送消息 publish channel message # 订阅频道 subscribe channel [channel...] # 取消订阅频道 unsubscribe [channel...] # 订阅模式 psubscribe pattern [pattern...] # 退订模式 punsubscribe [pattern...] # 查看发布与订阅的相关信息 PUBSUB CHANNELS [pattern]
4.2、应用
发布订阅功能一般要重新开启一个连接,这是因为命令连接严格遵循请求回应模式,pubsub 能收到 redis 主动推送的内容。所以实际项目中如果支持 pubsub 的话,需要另开一条连接用于处理发布订阅。
# 一个客户端订阅频道 SUBSCRIBE news.shanxi news.henan news.shandong # 另一个客户端订阅频道,模式匹配 PSUBSCRIBE news.* # 向频道发送信息,该频道所有订阅者收到消息 publish news.shanxi 'harmony'
5、redis 异步连接
hiredis 是一个 redis 的 C 客户端库函数,服务端可以使用它来访问 redis 服务器。
5.1、同步连接
同步连接采用阻塞 io 来实现,但是会阻塞当前线程,直至 redis 返回结果。
参考文档:hiredis 的使用
例如:访问 redis,并对 counter 实现自增1000次,统计用时。
// gcc redis-test-sync.c -o sync -lhiredis #include <stdio.h> #include <stdlib.h> #include <string.h> #include <time.h> #include <hiredis/hiredis.h> int current_tick() { int t = 0; struct timespec ti; clock_gettime(CLOCK_MONOTONIC, &ti); t = (int)ti.tv_sec * 1000; t += ti.tv_nsec / 1000000; return t; } int main(int argc, char **argv) { unsigned int j, isunix = 0; redisContext *c; redisReply *reply; const char *hostname = "127.0.0.1"; int port = 6379; struct timeval timeout = { 1, 500000 }; // 1.5 seconds c = redisConnectWithTimeout(hostname, port, timeout); if (c == NULL || c->err) { if (c) { printf("Connection error: %s\n", c->errstr); redisFree(c); } else { printf("Connection error: can't allocate redis context\n"); } exit(1); } int num = (argc > 1) ? atoi(argv[1]) : 1000; int before = current_tick(); reply = redisCommand(c, "auth 123456"); freeReplyObject(reply); for (int i = 0; i < num; ++i) { reply = redisCommand(c, "INCR counter"); printf("INCR counter: %lld\n", reply->integer); freeReplyObject(reply); } int used = current_tick() - before; printf("after %d exec redis command, used %d ms\n", num, used); /* Disconnects and frees the context */ redisFree(c); return 0; }
5.2、异步连接
异步连接采用非阻塞 io 实现,不会阻塞当前线程。缺点是代码书写异步,业务逻辑割裂,可以通过携程解决。在有大量并发请求的情况,配合 redis 6.0 以后的 io 多线程,异步连接池,能更好解决应用层的数据访问性能
5.2.1、redis 驱动
redis 驱动:服务端使用异步连接,需要自己来实现 redis 驱动,也就是说需要把 redis 连接融合自己项目中的 reactor 进行管理。
接着还需要设计 redis 适配器,其主要功能有:
- 构建 redis 事件对象,其中包括:hiredis 事件对象和 reactor 事件对象。
- 适配事件控制,复用项目中 reactor 的事件循环。
综上所述,hiredis 的封装规则有:
- reactor 的实现:所有的 IO 由用户实现。
- 适配器的实现:hiredis 提供了事件操作接口,用户需要适配这些事件接口。
// 用户需要适配的 hiredis 事件接口有 addRead // 添加读事件 delRead // 删除读事件 addWrite // 添加写事件 delWrite // 删除写事件 cleanup // 事件对象释放 scheduleTimer
5.2.2、范例
这里对 4.1 的例子使用异步的方法来实现。
第 1 步,实现 redis 驱动
#ifndef _REACTOR_ #define _REACTOR_ #include <stdio.h> #include <unistd.h> // read write #include <fcntl.h> // fcntl #include <sys/types.h> // listen #include <sys/socket.h> // socket #include <errno.h> // errno #include <arpa/inet.h> // inet_addr htons // #include <netinet/tcp.h> #include <assert.h> // assert #include <sys/epoll.h> #include <stdlib.h> // malloc #include <string.h> // memcpy memmove #include "chainbuffer/buffer.h" // #include "ringbuffer/buffer.h" #define MAX_EVENT_NUM 512 // 每次用户拷贝事件的最大数目 #define MAX_CONN ((1<<16)-1) // 事件对象的最大数目:65535 typedef struct event_s event_t; typedef void (*event_callback_fn)(int fd, int events, void *privdata); typedef void (*error_callback_fn)(int fd, char * err); // reactor对象,管理 io 全局变量 typedef struct { int epfd; // epfd int listenfd; // 监听的fd int stop; // 停止循环标记 event_t *events; // 存储监听的所有事件(event_t),存储在堆上,记得释放 int iter; // 用于遍历events,获取没有被使用的位置 struct epoll_event fire[MAX_EVENT_NUM]; // 用户态数组,用于拷贝io事件到用户态 } reactor_t; // 事件对象,sockitem,保存每个fd对应的io状态 struct event_s { int fd; // 对应的事件 fd reactor_t *r; // 指向 reactor 全局对象 buffer_t in; // 读缓冲,待读取 buffer_t out; // 写缓冲,待发送 event_callback_fn read_fn; // 读回调 event_callback_fn write_fn; // 写回调 error_callback_fn error_fn; // 错误回调 }; int event_buffer_read(event_t *e); int event_buffer_write(event_t *e, void * buf, int sz); // 创建 reactor 对象 reactor_t * create_reactor() { // 堆上申请 reactor 对象 reactor_t *r = (reactor_t *)malloc(sizeof(*r)); r->epfd = epoll_create(1); r->listenfd = 0; r->stop = 0; r->iter = 0; // 堆上申请 reactor 中的events数组 r->events = (event_t*)malloc(sizeof(event_t)*MAX_CONN); memset(r->events, 0, sizeof(event_t)*MAX_CONN); memset(r->fire, 0, sizeof(struct epoll_event) * MAX_EVENT_NUM); // init_timer(); return r; } // 释放 reactor 对象 void release_reactor(reactor_t * r) { free(r->events); // 释放reactor在堆上申请的events close(r->epfd); // 关闭epoll free(r); // 释放reactor } // 从 reactor 的事件堆上获取空闲的事件对象 event_t * _get_event_t(reactor_t *r) { r->iter ++; // 寻找没有被使用的事件对象 while (r->events[r->iter & MAX_CONN].fd > 0) { r->iter++; } return &r->events[r->iter]; } // 基于事件的操作 // 1、创建事件对象 event_t * new_event(reactor_t *R, int fd, event_callback_fn rd, event_callback_fn wt, error_callback_fn err) { assert(rd != 0 || wt != 0 || err != 0); // 获取空闲的事件对象 event_t *e = _get_event_t(R); // 初始化事件对象 e->r = R; e->fd = fd; buffer_init(&e->in, 1024*16); buffer_init(&e->out, 1024*16); e->read_fn = rd; e->write_fn = wt; e->error_fn = err; return e; } // 2、添加事件 int add_event(reactor_t *R, int events, event_t *e) { struct epoll_event ev; ev.events = events; ev.data.ptr = e; if (epoll_ctl(R->epfd, EPOLL_CTL_ADD, e->fd, &ev) == -1) { printf("add event err fd = %d\n", e->fd); return 1; } return 0; } // 释放事件所占空间 void free_event(event_t *e) { buffer_free(&e->in); buffer_free(&e->out); } // 3、删除事件 int del_event(reactor_t *R, event_t *e) { epoll_ctl(R->epfd, EPOLL_CTL_DEL, e->fd, NULL); free_event(e); return 0; } // 4、修改事件,由后面两个参数决定是读事件还是写事件 int enable_event(reactor_t *R, event_t *e, int readable, int writeable) { struct epoll_event ev; ev.events = (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0); ev.data.ptr = e; if (epoll_ctl(R->epfd, EPOLL_CTL_MOD, e->fd, &ev) == -1) { return 1; } return 0; } // 一次事件循环 void eventloop_once(reactor_t * r, int timeout) { int n = epoll_wait(r->epfd, r->fire, MAX_EVENT_NUM, timeout); for (int i = 0; i < n; ++i) { struct epoll_event *e = &r->fire[i]; // 获取事件 int mask = e->events; // 获取事件类型 // 用 io 函数捕获具体的错误信息 if (e->events & EPOLLERR) mask |= EPOLLIN | EPOLLOUT; // 用 io 函数捕获断开的具体信息 if (e->events & EPOLLHUP) mask |= EPOLLIN | EPOLLOUT; event_t *et = (event_t*) e->data.ptr; // 获取事件关联的用户数据 // 处理读事件 if (mask & EPOLLIN) { if (et->read_fn) { et->read_fn(et->fd, EPOLLIN, et); // 执行读回调 } } // 处理写事件 if (mask & EPOLLOUT) { if (et->write_fn) { et->write_fn(et->fd, EPOLLOUT, et); // 执行写回调 } else { uint8_t *buf = buffer_write_atmost(&et->out); event_buffer_write(et, buf, buffer_len(&et->out)); } } } } // 停止事件循环 void stop_eventloop(reactor_t * r) { r->stop = 1; } // 事件循环 void eventloop(reactor_t * r) { while (!r->stop) { // int timeout = find_nearest_expire_timer(); eventloop_once(r, /*timeout*/ -1); // expire_timer(); } } // 设置非阻塞fd int set_nonblock(int fd) { int flag = fcntl(fd, F_GETFL, 0); return fcntl(fd, F_SETFL, flag | O_NONBLOCK); } // 创建服务器 int create_server(reactor_t *R, short port, event_callback_fn func) { // 1、socket int listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { printf("create listenfd error!\n"); return -1; } struct sockaddr_in addr; memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = INADDR_ANY; // 设置地址可重用 int reuse = 1; if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(int)) == -1) { printf("reuse address error: %s\n", strerror(errno)); return -1; } // 2、bind if (bind(listenfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) { printf("bind error %s\n", strerror(errno)); return -1; } // 3、listen if (listen(listenfd, 5) < 0) { printf("listen error %s\n", strerror(errno)); return -1; } // 设置 listenfd 非阻塞 if (set_nonblock(listenfd) < 0) { printf("set_nonblock error %s\n", strerror(errno)); return -1; } R->listenfd = listenfd; // 注册读事件 event_t *e = new_event(R, listenfd, func, 0, 0); add_event(R, EPOLLIN, e); printf("listen port : %d\n", port); return 0; } // 读数据 int event_buffer_read(event_t *e) { int fd = e->fd; int num = 0; // 读取的数据总量 while (1) { // TODO: dont use char buf[] here char buf[1024] = {0}; int n = read(fd, buf, 1024); // 1、read=0, 服务器收到FIN包,半关闭状态 // Todo: 半关闭状态逻辑处理,参考 skynet if (n == 0) { // printf("close connection fd = %d\n", fd); if (e->error_fn) { e->error_fn(fd, "close socket"); } del_event(e->r, e); close(fd); return 0; } // 2、read=-1,读异常 else if (n < 0) { // 2.1、EINTR:中断,重试 if (errno == EINTR) { continue; } // 2.2、EWOULDBLOCK:阻塞,读缓冲区为空 if (errno == EWOULDBLOCK) { break; } // 其他错误,执行错误回调,删除该事件,关闭当前连接 printf("read error fd = %d err = %s\n", fd, strerror(errno)); if (e->error_fn) e->error_fn(fd, strerror(errno)); del_event(e->r, e); close(fd); return 0; } // 3、read>0, 正常,读取数据,处理业务逻辑 else { printf("recv data from client:%s", buf); buffer_add(&e->in, buf, n); } num += n; } return num; } // 向对端发送数据 int _write_socket(event_t *e, void * buf, int sz) { int fd = e->fd; while (1) { int n = write(fd, buf, sz); // 1、write=-1,写异常 if (n < 0) { // 2.1、EINTR:中断,重试 if (errno == EINTR) { continue; } // 2.2、EWOULDBLOCK:阻塞,需要注册写事件 if (errno == EWOULDBLOCK) { break; } // 其他错误,执行错误回调,删除该事件,关闭当前连接 if (e->error_fn) { e->error_fn(fd, strerror(errno)); } del_event(e->r, e); close(e->fd); } return n; } return 0; } // 写数据 int event_buffer_write(event_t *e, void * buf, int sz) { // 指向用户写缓冲 buffer_t *r = &e->out; // 1、用户写缓冲已满,开始发送 if (buffer_len(r) == 0) { // 向对端发送数据 int n = _write_socket(e, buf, sz); // 1.1、本次数据未发送完,未发送的数据写入缓冲,并注册写事件 if (n == 0 || n < sz) { // 1.1、将没有发送完的数据写入缓冲区 buffer_add(&e->out, (char *)buf + n, sz - n); // 1.2、注册写事件,等待下次事件触发接着发送 enable_event(e->r, e, 1, 1); return 0; } // 1.2、本次没有发送数据 else if (n < 0) { return 0; } // 1.3、本次数据发送完成 return 1; } // 2、用户写缓冲未满,写入缓冲,等待发送 buffer_add(&e->out, (char *)buf, sz); return 1; } #endif
第 2 步,实现 redis 适配器,主要是构建 redis 事件对象和适配 hiredis 的事件控制接口。
// adapter_async.h #ifndef _ADAPTER_ #define _ADAPTER_ #include <hiredis/hiredis.h> #include <hiredis/alloc.h> #include "reactor.h" // redis 事件对象 typedef struct { event_t e; // reactor 事件对象 int mask; // 存储注册的事件 redisAsyncContext *ctx; // hiredis 事件对象 } redis_event_t; // redis 对象读事件回调 static void redisReadHandler(int fd, int events, void *privdata) { ((void)fd); ((void)events); event_t *e = (event_t*)privdata; redis_event_t *re = (redis_event_t *)(char *)e; redisAsyncHandleRead(re->ctx); } // redis 对象写事件读回调 static void redisWriteHandler(int fd, int events, void *privdata) { ((void)fd); ((void)events); event_t *e = (event_t*)privdata; redis_event_t *re = (redis_event_t *)(char *)e; redisAsyncHandleWrite(re->ctx); } /** * @brief 对 reactor 管理的事件对象进行更新 * @param privdata redis 事件对象 * @param flag 要设置的 epoll 事件类型 * @param remove 1 删除该事件 0 添加该事件 */ static void redisEventUpdate(void *privdata, int flag, int remove) { redis_event_t *re = (redis_event_t *)privdata; reactor_t *r = re->e.r; int prevMask = re->mask; int enable = 0; // redis 事件对象删除该事件 if (remove) { if ((re->mask & flag) == 0) { return; } re->mask &= ~flag; enable = 0; } // redis 事件对象添加该事件 else { if (re->mask & flag) { return; } re->mask |= flag; enable = 1; } // 对 reactor 事件对象的处理 // 1、reactor 事件对象删除该事件 if (re->mask == 0) { del_event(r, &re->e); } // 2、reactor 事件对象添加该事件(第一次加入) else if (prevMask == 0) { add_event(r, re->mask, &re->e); } // 3、reactor 事件对象修改该事件 else { // 注册读事件 if (flag & EPOLLIN) { enable_event(r, &re->e, enable, 0); } // 注册写事件 else if (flag & EPOLLOUT) { enable_event(r, &re->e, 0, enable); } } } // 需要适配的 hiredis 事件接口 // 1、redis 事件对象添加读事件 static void redisAddRead(void *privdata) { redis_event_t *re = (redis_event_t *)privdata; re->e.read_fn = redisReadHandler; redisEventUpdate(privdata, EPOLLIN, 0); } // 2、redis 事件对象删除读事件 static void redisDelRead(void *privdata) { redis_event_t *re = (redis_event_t *)privdata; re->e.read_fn = 0; redisEventUpdate(privdata, EPOLLIN, 1); } // 3、redis 事件对象添加写事件 static void redisAddWrite(void *privdata) { redis_event_t *re = (redis_event_t *)privdata; re->e.write_fn = redisWriteHandler; redisEventUpdate(privdata, EPOLLOUT, 0); } // 4、redis 事件对象删除写事件 static void redisDelWrite(void *privdata) { redis_event_t *re = (redis_event_t *)privdata; re->e.write_fn = 0; redisEventUpdate(privdata, EPOLLOUT, 1); } // 5、redis 事件对象释放 static void redisCleanup(void *privdata) { redis_event_t *re = (redis_event_t *)privdata; reactor_t *r = re->e.r; del_event(r, &re->e); hi_free(re); } // redis 事件对象绑定,reactor 对象和 redis 异步上下文 static int redisAttach(reactor_t *r, redisAsyncContext *ac) { redisContext *c = &(ac->c); // redis 同步上下文 redis_event_t *re; // redis 事件对象 /* Nothing should be attached when something is already attached */ if (ac->ev.data != NULL) return REDIS_ERR; /* Create container for ctx and r/w events */ re = (redis_event_t*)hi_malloc(sizeof(*re)); if (re == NULL) { return REDIS_ERR; } // redis 事件对象绑定 reactor 对象和 redis 异步上下文 re->ctx = ac; // 绑定 redis 异步上下文 re->e.fd = c->fd; // 绑定 redis 的fd re->e.r = r; // 绑定 reacotr re->mask = 0; // 绑定事件 // redis 异步上下文设置,需要适配事件控制 // hiredis 提供事件接口,用户实现事件接口 ac->ev.addRead = redisAddRead; ac->ev.delRead = redisDelRead; ac->ev.addWrite = redisAddWrite; ac->ev.delWrite = redisDelWrite; ac->ev.cleanup = redisCleanup; ac->ev.data = re; return REDIS_OK; } #endif
接下来,实现主体代码,实现功能
// redis-test-async.c // gcc redis-test-async.c chainbuffer/buffer.c -o async -lhiredis #include <hiredis/hiredis.h> #include <hiredis/async.h> #include <time.h> #include "reactor.h" #include "adapter_async.h" static reactor_t *R; static int cnt, before, num; int current_tick() { int t = 0; struct timespec ti; clock_gettime(CLOCK_MONOTONIC, &ti); t = (int)ti.tv_sec * 1000; t += ti.tv_nsec / 1000000; return t; } void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisReply *reply = r; if (reply == NULL) return; printf("argv[%s]: %lld\n", (char*)privdata, reply->integer); /* Disconnect after receiving the reply to GET */ cnt++; if (cnt == num) { int used = current_tick()-before; printf("after %d exec redis command, used %d ms\n", num, used); redisAsyncDisconnect(c); } } void connectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); stop_eventloop(R); return; } printf("Connected...\n"); } void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); stop_eventloop(R); return; } printf("Disconnected...\n"); stop_eventloop(R); } int main(int argc, char **argv) { redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); if (c->err) { /* Let *c leak for now... */ printf("Error: %s\n", c->errstr); return 1; } R = create_reactor(); redisAttach(R, c); redisAsyncSetConnectCallback(c, connectCallback); redisAsyncSetDisconnectCallback(c, disconnectCallback); before = current_tick(); num = (argc > 1) ? atoi(argv[1]) : 1000; redisAsyncCommand(c, NULL, NULL, "auth 123456"); for (int i = 0; i < num; i++) { redisAsyncCommand(c, getCallback, "count", "INCR counter"); } eventloop(R); release_reactor(R); return 0; }