四、利用timefd实现定时器
timefd实现方式跟红黑树很相似,主要区别在于驱动方式。
4.1 流程
1)创建一个 timerfd 文件描述符。可以使用 timerfd_create 函数来创建它。
2)设置定时器参数。构建一个 struct itimerspec 结构体,指定定时器的起始时间和间隔时间。
3)使用 timerfd_settime 函数将定时器参数应用到 timerfd 文件描述符上
4)等待定时器事件触发。可以使用 select、poll、epoll 等函数来等待文件描述符上的可读事件。当定时器事件触发,timerfd 文件描述符会变为可读,你可以在相应的事件处理逻辑中进行处理。
5)当不再需要定时器时,关闭 timerfd 文件描述符。使用 close 函数来关闭它。
4.2 代码实现
#include <sys/epoll.h> #include <sys/timerfd.h> #include <time.h> // for timespec itimerspec #include <unistd.h> // for close #include <functional> #include <chrono> #include <set> #include <memory> #include <iostream> using namespace std; struct TimerNodeBase { time_t expire; uint64_t id; }; struct TimerNode : public TimerNodeBase { using Callback = std::function<void(const TimerNode &node)>; Callback func; TimerNode(int64_t id, time_t expire, Callback func) : func(func) { this->expire = expire; this->id = id; } }; bool operator < (const TimerNodeBase &lhd, const TimerNodeBase &rhd) { if (lhd.expire < rhd.expire) { return true; } else if (lhd.expire > rhd.expire) { return false; } else return lhd.id < rhd.id; } class Timer { public: static inline time_t GetTick() { return chrono::duration_cast<chrono::milliseconds>(chrono::steady_clock::now().time_since_epoch()).count(); } TimerNodeBase AddTimer(int msec, TimerNode::Callback func) { time_t expire = GetTick() + msec; if (timeouts.empty() || expire <= timeouts.crbegin()->expire) { auto pairs = timeouts.emplace(GenID(), expire, std::move(func)); return static_cast<TimerNodeBase>(*pairs.first); } auto ele = timeouts.emplace_hint(timeouts.crbegin().base(), GenID(), expire, std::move(func)); return static_cast<TimerNodeBase>(*ele); } void DelTimer(TimerNodeBase &node) { auto iter = timeouts.find(node); if (iter != timeouts.end()) timeouts.erase(iter); } void HandleTimer(time_t now) { auto iter = timeouts.begin(); while (iter != timeouts.end() && iter->expire <= now) { iter->func(*iter); iter = timeouts.erase(iter); } } public: virtual void UpdateTimerfd(const int fd) { struct timespec abstime; // 获取最小触发时间 auto iter = timeouts.begin(); if (iter != timeouts.end()) { abstime.tv_sec = iter->expire / 1000; abstime.tv_nsec = (iter->expire % 1000) * 1000000; } else { abstime.tv_sec = 0; abstime.tv_nsec = 0; } struct itimerspec its = { .it_interval = {}, .it_value = abstime }; // 通过文件描述符 fd 设置定时器的参数 timerfd_settime(fd, TFD_TIMER_ABSTIME, &its, nullptr); } private: static inline uint64_t GenID() { return gid++; } static uint64_t gid; set<TimerNode, std::less<>> timeouts; }; uint64_t Timer::gid = 0; int main() { int epfd = epoll_create(1); // 创建一个定时器文件描述符 int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); struct epoll_event ev = {.events=EPOLLIN | EPOLLET}; epoll_ctl(epfd, EPOLL_CTL_ADD, timerfd, &ev); unique_ptr<Timer> timer = make_unique<Timer>(); int i = 0; timer->AddTimer(1000, [&](const TimerNode &node) { cout << Timer::GetTick() << " node id:" << node.id << " revoked times:" << ++i << endl; }); timer->AddTimer(1000, [&](const TimerNode &node) { cout << Timer::GetTick() << " node id:" << node.id << " revoked times:" << ++i << endl; }); timer->AddTimer(3000, [&](const TimerNode &node) { cout << Timer::GetTick() << " node id:" << node.id << " revoked times:" << ++i << endl; }); auto node = timer->AddTimer(2100, [&](const TimerNode &node) { cout << Timer::GetTick() << " node id:" << node.id << " revoked times:" << ++i << endl; }); timer->DelTimer(node); cout << "now time:" << Timer::GetTick() << endl; struct epoll_event evs[64] = {0}; while (true) { //设置定时器的参数 timer->UpdateTimerfd(timerfd); int n = epoll_wait(epfd, evs, 64, -1); time_t now = Timer::GetTick(); for (int i = 0; i < n; i++) { // for network event handle } timer->HandleTimer(now); } epoll_ctl(epfd, EPOLL_CTL_DEL, timerfd, &ev); close(timerfd); close(epfd); return 0; }
五、利用多级时间轮实现定时器
时间轮 timewheel 是一个环形结构,使用 hash + list 实现。高层级的每一个格子,存储底层级的一个list。
根据轮子的类型,可以分为主动轮(层级1)和从动轮。
- 主动轮:当刻度指针指向当前槽的时候,槽内的任务被顺序执行。
- 从动轮:当对应轮的刻度指针指向当前槽位的时候,槽内的任务链依次向低级轮(序号较高的轮)转移,从动轮没有执行任务权限,只是对任务进行记录与缓存。
以 skynet 为例,skynet 作为单 reactor模型,适用于 cpu 密集型的场景。timer 由 timer 线程管理,当有定时任务时将任务派发给其他线程执行。
5.1 数据结构
如上图,定义了5个链表数组,每个数组里包含多个定时器链表,near 数组大小为256 =,其余数组大小为64 = ,表示的时间范围 。
我们只关注主动轮near数组,因为里面的任务是最近要触发执行的。
注意到 −1刚好是uint32_t的最大值。因此我们只需要一个time指针,就可以根据位运算,得到任务触发时间在各层级的位置。当 time 溢出时,32位无符号循环,再次从0开始计数。
举个例子,如果触发时间是562568,对应二进制是100010 010101 10001000。near位的十进制是136,t[0]位的十进制是41,t[1]位的十进制是67。
1)定时器的结构
typedef struct timer { link_list_t near[TIME_NEAR]; // 最低级的时间轮,主动轮 link_list_t t[4][TIME_LEVEL]; // 其他层级的时间轮,从动轮 struct spinlock lock; // 自旋锁,O(1) uint32_t time; // tick 指针,当前时间片 uint64_t current; // 从系统开始时刻到现在的时长,timer运行时间,时间精度10ms uint64_t current_point; // 系统启动时长,时间精度10ms }s_timer_t;
2)任务结点的设计
任务结点使用链表存储,链表中存储同一时间触发的任务结点
struct timer_node { struct timer_node *next; // 指向的下一个任务 uint32_t expire; // 任务触发时间 handler_pt callback; // 任务回调函数 uint8_t cancel; // 删除任务的标记,取消任务的执行 int id; // 执行该任务的线程 id };
5.2 接口设计
5.2.1 初始化定时器
// 初始化定时器 void init_timer(void) { TI = timer_create_timer(); // 创建定时器 TI->current_point = gettime(); // 获取系统当前运行时间 } // 创建定时器 s_timer_t* timer_create_timer() { s_timer_t *r = (s_timer_t *)malloc(sizeof(s_timer_t)); memset(r, 0, sizeof(*r)); int i, j; // 创建主动轮,最低级时间轮 for (i = 0; i < TIME_NEAR; ++i) { //清除指定链表,并返回指向链表的第一个节点的指针 link_clear(&r->near[i]); } // 创建从动轮,高层级时间轮 for (i = 0; i < 4; ++i) { for (j = 0;j < TIME_LEVEL; ++j) { link_clear(&r->t[i][j]); } } // 初始化自旋锁 spinlock_init(&r->lock); r->current = 0; return r; } // 获得从系统启动开始计时的时间,不受系统时间被用户改变的影响,精确到1/100秒 uint64_t gettime() { uint64_t t; #if !defined(__APPLE__) || defined(AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER) struct timespec ti; clock_gettime(CLOCK_MONOTONIC, &ti); // CLOCK_MONOTONIC t = (uint64_t)ti.tv_sec * 1000; t += ti.tv_nsec / 1000000; #else struct timeval tv; gettimeofday(&tv, NULL); t = (uint64_t)tv.tv_sec * 100; t += tv.tv_usec / 10000; #endif return t; }
5.2.2 添加定时器
32位无符号整数time记录时间片分别对应数组near[256]和t[4][64],每次添加节点时,如果expire - time < 256则将节点添加到near数组对应元素的链表中,否则从高位往低位依次比较expire的第i个6位二进制的值n与time的第i个6位二进制的值m,哪个不相等则将节点添加到数组t[4-i][n]对应的元素链表中)
具体来说:
- 首先检查节点的expire与time的高24位是否相等,相等则将该节点添加到expire低8位值对应数组near的元素的链表中,不相等则进行下一步。
- 检查expire与time的高18位是否相等,相等则将该节点添加到expire低第9位到第14位对应的6位二进制值对应数组t[0]的元素的链表中,如果不相等则进行下一步。
- 检查expire与time的高12位是否相等,相等则将该节点添加到expire低第15位到第20位对应的6位二进制值对应数组t[1]的元素的链表中,如果不相等则进行下一步。
- 检查expire与time的高6位是否相等,相等则将该节点添加到expire低第21位到第26位对应的6位二进制值对应数组t[2]的元素的链表中,如果不相等则进行下一步。
- 将该节点添加到expire低第27位到第32位对应的6位二进制值对应数组t[3]的元素的链表中
先看一下位操作:
1 << n:表示将二进制数 1 向左移动 n 位,即 。
(time>>TIME_NEAR_SHIFT) & TIME_LEVEL_MASK):表示time右移8位,然后与TIME_LEVEL_MASK相与,得到的是t[0]处的位置。
// 添加任务结点到定时器中 // 根据 msec 判断结点应该放入时间轮的层级 void add_node(s_timer_t *T, timer_node_t *node) { uint32_t time = node->expire; // 过期时间 uint32_t current_time=T->time; // 当前时间 uint32_t msec = time - current_time; // 剩余时间 //根据 expire-time 的差值将结点放入相应的层级 //[0, 2^8) if (msec < TIME_NEAR) { link(&T->near[time&TIME_NEAR_MASK],node); } //[2^8, 2^14) else if (msec < (1 << (TIME_NEAR_SHIFT+TIME_LEVEL_SHIFT))) { link(&T->t[0][((time>>TIME_NEAR_SHIFT) & TIME_LEVEL_MASK)],node); } //[2^14, 2^20) else if (msec < (1 << (TIME_NEAR_SHIFT+2*TIME_LEVEL_SHIFT))) { link(&T->t[1][((time>>(TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); } //[2^20, 2^26) else if (msec < (1 << (TIME_NEAR_SHIFT+3*TIME_LEVEL_SHIFT))) { link(&T->t[2][((time>>(TIME_NEAR_SHIFT + 2*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); } //[2^26, 2^32) else { link(&T->t[3][((time>>(TIME_NEAR_SHIFT + 3*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); } } // 添加定时任务 timer_node_t* add_timer(int time, handler_pt func, int threadid) { timer_node_t *node = (timer_node_t *)malloc(sizeof(*node)); spinlock_lock(&TI->lock); // 设置定时任务结点的属性 node->expire = time + TI->time; // 添加触发时间 = 触发时间间隔 + 当前时间 node->callback = func; // 添加任务回调函数 node->id = threadid; // 添加执行该任务的线程id // 判断是否需要立即执行任务 if (time <= 0) { spinlock_unlock(&TI->lock); node->callback(node); free(node); return NULL; } // 添加任务结点到定时器中 add_node(TI, node); spinlock_unlock(&TI->lock); return node; }
5.2.3 删除定时器
由于结点位置可能发生变化(重新映射),不能找到任务结点的位置,无法删除。
在结点中添加一个 cancel 字段,任务触发碰到该标记则不执行任务,之后统一释放空间。
void del_timer(timer_node_t *node) { node->cancel = 1; }
5.2.4 更新定时器
主要包括对到期任务的处理和对从动轮任务(time高24位对应的链表)的重新映射。
void timer_update(s_timer_t *T) { spinlock_lock(&T->lock); // shift time first, and then dispatch timer message timer_execute(T); //检查当前的时间片的低8位对应的数组元素的链表是否为空,不为空则取出 // shift time first, and then dispatch timer message timer_shift(T); //检查当前的时间片的低8位对应的数组元素的链表是否为空,不为空则取出 // 若发生重新映射,若time的指向有任务,则需要执行 timer_execute(T); spinlock_unlock(&T->lock); }
5.2.5 到期任务的处理
以当前 tick 值的低8位作为索引,取出 near 数组中对应的 list。list 里面包含了所有在该 tick 到期的定时器任务
//检查当前的时间片的低8位对应的数组元素的链表是否为空,不为空则取出 void timer_execute(s_timer_t *T) { // 取出time低8位对应的索引值 int idx = T->time & TIME_NEAR_MASK; // 如果低8位值对应的near数组元素有链表,则取出 while (T->near[idx].head.next) { // 取出对应的定时器list timer_node_t *current = link_clear(&T->near[idx]); spinlock_unlock(&T->lock); // 将链表各结点的任务派发出去 dispatch_list(current); spinlock_lock(&T->lock); } } // 任务派发 void dispatch_list(timer_node_t *current) { do { timer_node_t *temp = current; current=current->next; // cancel 标记为0,执行任务回调函数;否则,不执行任务回调 if (temp->cancel == 0) temp->callback(temp); free(temp); } while (current); }
5.2.6 刷新时间片,重新映射
只有主动轮的结点要执行,从动轮只是存储结点,主动轮结点执行完后,需要从动轮补充。
具体操作是
- 检查time是否溢出,如果溢出则将t[3][0]这个链表取出并依次将该链表中的节点添加(即实现该链表的移动操作),如果time未溢出,则进行下一步。
- 检查time低8位是否溢出产生进位,没有则结束,有则检查time的低第9位到第14位是否产生溢出,没有则将time的低第9位到第14位对应的值对应数组t[0]中的链表取出,并依次将该链表中的节点添加(即实现该链表的移动操作),如果有溢出,则进行下一步。
- 检查time低14位是否溢出产生进位,没有则结束,有则检查time的低第15位到第20位是否产生溢出,没有则将time的低第15位到第20位对应的值对应数组t[1]中的链表取出,并依次将该链表中的节点添加(即实现该链表的移动操作),如果有溢出,则进行下一步。
- 检查time低20位是否溢出产生进位,没有则结束,有则检查time的低第21位到第26位是否产生溢出,没有则将time的低第21位到第26位对应的值对应数组t[2]中的链表取出,并依次将该链表中的节点添加(即实现该链表的移动操作),如果有溢出,则进行下一步。
- 检查time低26位是否溢出产生进位,没有则结束,有则检查time的低第27位到第32位是否产生溢出,没有则将time的低第27位到第32位对应的值对应数组t[3]中的链表取出,并依次将该链表中的节点添加(即实现该链表的移动操作)。
所谓溢出,就是移动一轮了,就跟秒针转动一圈,重新计数一样。重新计数,分针也需要移动。也就是主动轮移动一轮,现在要移动下一轮,此时需要把从动轮的任务映射过去。
比如。低八位11111111对应是255,加1就是1 00000000对应是256,低八位都是0溢出。
// 重新映射,判断是否需要重新映射 // 时间片time自加1,将高24位对应的4个6位的数组中的各个元素的链表往低位移 void timer_shift(s_timer_t *T) { int mask = TIME_NEAR; // 时间片+1 uint32_t ct = ++T->time; // 时间片溢出,无符号整数,循环,time重置0 if (ct == 0) { // 将对应的t[3][0]链表取出,重新移动到定时器中 move_list(T, 3, 0); } else { // ct右移8位,进入到从动轮 uint32_t time = ct >> TIME_NEAR_SHIFT; // 第 i 层时间轮 int i = 0; // 判断是否需要重新映射? // 即循环判断当前层级对应的数位是否全0,即溢出产生进位 while ((ct & (mask-1))==0) { // 取当前层级的索引值 int idx = time & TIME_LEVEL_MASK; // idx=0 说明当前层级溢出,继续循环判断直至当前层级不溢出 if (idx != 0) { // 将对应的t[i][idx]链表取出,依次移动到定时器中 move_list(T, i, idx); break; } mask <<= TIME_LEVEL_SHIFT; // mask 右移 time >>= TIME_LEVEL_SHIFT; // time 左移 ++i; // 时间轮层级增加 } } }
5.3 定时器的驱动
// timer 线程中,每过1/4时间精度,即2.5ms,执行一次定时器的检测 while (!ctx.quit) { expire_timer(); usleep(2500); }
刷新定时器,每过1/4时间精度执行一次
// 原因是 dispatch 分发任务花费时间,影响精度 void expire_timer(void) { // 获取当前系统运行时间,不受用户的影响 uint64_t cp = gettime(); // 当前系统启动时间与定时器记录的系统启动时间不相等 if (cp != TI->current_point) { // 获取上述两者的差值 uint32_t diff = (uint32_t)(cp - TI->current_point); // 更新定时器记录的系统运行时间 TI->current_point = cp; // 更新timer的运行时间 TI->current += diff; // 更新定时器的时间(time的值),并执行对应的过期任务 int i; for (i=0; i<diff; i++) { // 每执行一次timer_update,其内部的函数 // timer_shift: time+1,time代表从timer启动后至今一共经历了多少次tick // timer_execute: 执行near中的定时器 timer_update(TI); } } }
5.4 代码实现
1)timer_wheel.h
#ifndef _MARK_TIMEWHEEL_ #define _MARK_TIMEWHEEL_ #include <stdint.h> #define TIME_NEAR_SHIFT 8 #define TIME_NEAR (1 << TIME_NEAR_SHIFT) // 1 << 8 表示将二进制数 1 向左移动 8 位,即 2^8 = 256 #define TIME_LEVEL_SHIFT 6 #define TIME_LEVEL (1 << TIME_LEVEL_SHIFT) // 1 << 6 表示将二进制数 1 向左移动 6 位,即 2^6 = 64 #define TIME_NEAR_MASK (TIME_NEAR-1) // 255 #define TIME_LEVEL_MASK (TIME_LEVEL-1) // 63 typedef struct timer_node timer_node_t; typedef void (*handler_pt) (struct timer_node *node); // 任务结点 struct timer_node { struct timer_node *next; // 相同过期时间的待执行的下一个任务 uint32_t expire; // 任务过期时间 handler_pt callback; // 任务回调函数 uint8_t cancel; // 删除任务,遇到该标记则取消任务的执行 int id; // 此时携带参数 }; timer_node_t* add_timer(int time, handler_pt func, int threadid); void expire_timer(void); void del_timer(timer_node_t* node); void init_timer(void); void clear_timer(); #endif
2)timer_wheel.c
// timer_wheel.c #include "spinlock.h" #include "timewheel.h" #include <string.h> #include <stddef.h> #include <stdlib.h> #if defined(__APPLE__) #include <AvailabilityMacros.h> #include <sys/time.h> #include <mach/task.h> #include <mach/mach.h> #else #include <time.h> #endif typedef struct link_list { timer_node_t head; timer_node_t *tail; }link_list_t; // 定时器的数据结构 typedef struct timer { link_list_t near[TIME_NEAR]; // 最低级的时间轮,主动轮 link_list_t t[4][TIME_LEVEL]; // 其他层级的时间轮,从动轮 struct spinlock lock; // 自旋锁,O(1) uint32_t time; // tick 指针,当前时间片 uint64_t current; // timer运行时间,精度10ms uint64_t current_point; // 系统运行时间,精度10ms }s_timer_t; static s_timer_t * TI = NULL; // 清空链表 // 并返回指向链表的第一个结点的指针 timer_node_t* link_clear(link_list_t *list) { // 指向头指针的下一个位置 timer_node_t * ret = list->head.next; // 头结点断链 list->head.next = 0; // 尾指针指向头结点 list->tail = &(list->head); return ret; } // 尾插法 void link(link_list_t *list, timer_node_t *node) { list->tail->next = node; list->tail = node; node->next=0; } // 添加任务结点到定时器中 // 根据 time 判断结点应该放入时间轮的层级 void add_node(s_timer_t *T, timer_node_t *node) { uint32_t time = node->expire; // 过期时间 uint32_t current_time=T->time; // 当前时间 uint32_t msec = time - current_time; // 剩余时间 //根据 expire-time 的差值将结点放入相应的层级 //[0, 2^8) if (msec < TIME_NEAR) { link(&T->near[time&TIME_NEAR_MASK],node); } //[2^8, 2^14) else if (msec < (1 << (TIME_NEAR_SHIFT+TIME_LEVEL_SHIFT))) { link(&T->t[0][((time>>TIME_NEAR_SHIFT) & TIME_LEVEL_MASK)],node); } //[2^14, 2^20) else if (msec < (1 << (TIME_NEAR_SHIFT+2*TIME_LEVEL_SHIFT))) { link(&T->t[1][((time>>(TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); } //[2^20, 2^26) else if (msec < (1 << (TIME_NEAR_SHIFT+3*TIME_LEVEL_SHIFT))) { link(&T->t[2][((time>>(TIME_NEAR_SHIFT + 2*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); } //[2^26, 2^32) else { link(&T->t[3][((time>>(TIME_NEAR_SHIFT + 3*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); } } // 添加定时任务 timer_node_t* add_timer(int time, handler_pt func, int threadid) { timer_node_t *node = (timer_node_t *)malloc(sizeof(*node)); spinlock_lock(&TI->lock); // 设置定时任务结点的属性 node->expire = time + TI->time; // 添加任务触发时间 expire = time + tick node->callback = func; // 添加任务回调函数 node->id = threadid; // 添加执行该任务的线程id // 判断是否需要立即执行任务 if (time <= 0) { spinlock_unlock(&TI->lock); node->callback(node); free(node); return NULL; } // 添加任务结点到定时器中 add_node(TI, node); spinlock_unlock(&TI->lock); return node; } // 移动链表 // 第level层第idx个位置的链表结点重新添加到定时器T中 void move_list(s_timer_t *T, int level, int idx) { timer_node_t *current = link_clear(&T->t[level][idx]); while (current) { timer_node_t *temp = current->next; add_node(T, current); current = temp; } } // 重新映射,判断是否需要重新映射 // 时间片time自加1,将高24位对应的4个6位的数组中的各个元素的链表往低位移 void timer_shift(s_timer_t *T) { int mask = TIME_NEAR; // 时间片+1 uint32_t ct = ++T->time; // 时间片溢出,无符号整数,循环,time重置0 if (ct == 0) { // 将对应的t[3][0]链表取出,重新移动到定时器中 move_list(T, 3, 0); } else { // ct右移8位,进入到从动轮 uint32_t time = ct >> TIME_NEAR_SHIFT; // 第 i 层时间轮 int i = 0; // 判断是否需要重新映射? // 即循环判断当前层级对应的数位是否全0,即溢出产生进位 while ((ct & (mask-1))==0) { // 取当前层级的索引值 int idx = time & TIME_LEVEL_MASK; // idx=0 说明当前层级溢出,继续循环判断直至当前层级不溢出 if (idx != 0) { // 将对应的t[i][idx]链表取出,依次移动到定时器中 move_list(T, i, idx); break; } mask <<= TIME_LEVEL_SHIFT; // mask 右移 time >>= TIME_LEVEL_SHIFT; // time 左移 ++i; // 时间轮层级增加 } } } // 任务派发给其他线程执行 void dispatch_list(timer_node_t *current) { do { timer_node_t *temp = current; current=current->next; // cancel 标记为0,执行任务回调函数 if (temp->cancel == 0) temp->callback(temp); free(temp); } while (current); } // 执行任务 // 以time的低8位对应的near数组的索引,取出该位置对应的list void timer_execute(s_timer_t *T) { // 取出time低8位对应的值 int idx = T->time & TIME_NEAR_MASK; // 如果低8位值对应的near数组元素有链表,则取出 while (T->near[idx].head.next) { // 取出对应的定时器list timer_node_t *current = link_clear(&T->near[idx]); spinlock_unlock(&T->lock); // 将链表各结点的任务派发出去 dispatch_list(current); spinlock_lock(&T->lock); } } // 定时器更新 void timer_update(s_timer_t *T) { spinlock_lock(&T->lock); // 执行任务 timer_execute(T); /// time+1,并判断是否进行重新映射 timer_shift(T); // 若发生重新映射,若time的指向有任务,则需要执行 timer_execute(T); spinlock_unlock(&T->lock); } // 删除定时器任务 void del_timer(timer_node_t *node) { node->cancel = 1; } // 创建定时器 s_timer_t * timer_create_timer() { s_timer_t *r = (s_timer_t *)malloc(sizeof(s_timer_t)); memset(r, 0, sizeof(*r)); int i, j; // 创建主动轮,最低级时间轮 for (i = 0; i < TIME_NEAR; ++i) { link_clear(&r->near[i]); } // 创建从动轮,高层级时间轮 for (i = 0; i < 4; ++i) { for (j = 0;j < TIME_LEVEL; ++j) { link_clear(&r->t[i][j]); } } // 初始化自旋锁 spinlock_init(&r->lock); r->current = 0; return r; } // 获取当前时间,时间精度10ms uint64_t gettime() { uint64_t t; #if !defined(__APPLE__) || defined(AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER) struct timespec ti; clock_gettime(CLOCK_MONOTONIC, &ti);// CLOCK_MONOTONIC,从系统启动这一刻起开始计时,不受系统时间被用户改变的影响 t = (uint64_t)ti.tv_sec * 1000; t += ti.tv_nsec / 1000000; #else struct timeval tv; gettimeofday(&tv, NULL); t = (uint64_t)tv.tv_sec * 100; t += tv.tv_usec / 10000; #endif return t; } // 检测定时器,时间精度10ms,每过1/4时间精度2.5ms执行1次 // 原因是dispatch分发任务花费时间,影响精度 void expire_timer(void) { // 获取当前系统运行时间,不受系统时间被用户的影响 uint64_t cp = gettime(); // 当前系统启动时间与定时器记录的系统启动时间不相等 if (cp != TI->current_point) { // 获取上述两者的差值 uint32_t diff = (uint32_t)(cp - TI->current_point); // 更新定时器记录的系统运行时间 TI->current_point = cp; // 更新timer的运行时间 TI->current += diff; // 更新定时器的时间(time的值),并执行对应的过期任务 int i; for (i=0; i<diff; i++) { // 每执行一次timer_update,其内部的函数 // timer_shift: time+1,time代表从timer启动后至今一共经历了多少次tick // timer_execute: 执行near中的定时器 timer_update(TI); } } } // 初始化定时器 void init_timer(void) { TI = timer_create_timer(); // 创建定时器 TI->current_point = gettime(); // 获取当前时间 } void clear_timer() { int i,j; for (i=0;i<TIME_NEAR;i++) { link_list_t * list = &TI->near[i]; timer_node_t* current = list->head.next; while(current) { timer_node_t * temp = current; current = current->next; free(temp); } link_clear(&TI->near[i]); } for (i=0;i<4;i++) { for (j=0;j<TIME_LEVEL;j++) { link_list_t * list = &TI->t[i][j]; timer_node_t* current = list->head.next; while (current) { timer_node_t * temp = current; current = current->next; free(temp); } link_clear(&TI->t[i][j]); } } }
3)tw-timer.c
// tw-timer.c // gcc tw-timer.c timewheel.c -o tw -I./ -lpthread #include <stdio.h> #include <unistd.h> #include <pthread.h> #include <time.h> #include <stdlib.h> #include "timewheel.h" struct context { int quit; int thread; }; struct thread_param { struct context *ctx; int id; }; static struct context ctx = {0}; void do_timer(timer_node_t *node) { printf("do_timer expired:%d - thread-id:%d\n", node->expire, node->id); add_timer(100, do_timer, node->id); } void do_clock(timer_node_t *node) { static int time; time ++; printf("---time = %d ---\n", time); add_timer(100, do_clock, node->id); } void* thread_worker(void *p) { struct thread_param *tp = p; int id = tp->id; struct context *ctx = tp->ctx; int expire = rand() % 200; add_timer(expire, do_timer, id); while (!ctx->quit) { usleep(1000); } printf("thread_worker:%d exit!\n", id); return NULL; } void do_quit(timer_node_t * node) { ctx.quit = 1; } int main() { srand(time(NULL)); ctx.thread = 2; pthread_t pid[ctx.thread]; init_timer(); add_timer(6000, do_quit, 100); add_timer(0, do_clock, 100); struct thread_param task_thread_p[ctx.thread]; int i; for (i = 0; i < ctx.thread; i++) { task_thread_p[i].id = i; task_thread_p[i].ctx = &ctx; if (pthread_create(&pid[i], NULL, thread_worker, &task_thread_p[i])) { fprintf(stderr, "create thread failed\n"); exit(1); } } while (!ctx.quit) { expire_timer(); usleep(2500); // 2.5ms } clear_timer(); for (i = 0; i < ctx.thread; i++) { pthread_join(pid[i], NULL); } printf("all thread is closed\n"); return 0; }
5.5 总结
skynet 是怎么样运转定时器的?
skynet的 timer 线程会不断触发 expire_timer函数,在该函数中会不断执行timer_execute对 near 中的时器执行超时操作。执行完毕后,调用 timer_shift 从t[0] ~ t[3]中选择合适的定时器节点加入到 near 中,这过程就相当于提高了定时器节点的紧急程度(因为随着时间的流逝,定时器节点的紧急程度会越来越向near逼近)。
参考资料