定时器结构
1,首先看看是在哪里第一次调用
在skynet_start.c文件中,第275行,进行初始化skynet_timer_init(),代码如下:
void
skynet_timer_init(void) {
TI = timer_create_timer(); // 创建timer并赋值给TI
uint32_t current = 0;
systime(&TI->starttime, ¤t);
// starttime,就是以系统时间为标准,取当时的1秒记录
TI->current = current;
// current,就是以当时取得1秒为起点,以10ms为单位,过了多少个滴答
TI->current_point = gettime();
}
接着看看什么是TI
static struct timer * TI = NULL; // 声明了一个全局单例TI指针,TI指针指向一个struct timer结构体的地址,并标明static属性
看看具体的struct timer结构体是怎么样的
struct timer {
struct link_list near[TIME_NEAR]; // TIME_NEAR == 256
struct link_list t[4][TIME_LEVEL]; // TIME_LEVEL == 64
struct spinlock lock; // 自旋锁
uint32_t time;
uint32_t starttime; // 初始化时间的 开始日期点,单位为1秒
uint64_t current; // 从初始化时间到现在,总共度过了多少滴答
uint64_t current_point; // 上一次系统所处的时间点
};
// 可以看到
// near 是个结构体数组,数组的元素都是struct link_list结构体,数组大小是256
// t 是个二级数组,一共分为4组,每个组大小是64,数组的元素都是struct link_list结构体
定时器链表
2,现在让我们看看具体的struct link_list是怎么样的
struct link_list {
struct timer_node head; // 链表的头部节点
struct timer_node *tail; // 指向 链表尾部节点 的指针
};
struct timer_node {
struct timer_node *next; // 链表的下一个节点
uint32_t expire; // 过期时间
};
// 可以看到
// struct link_list结构体并不属于链表本身,而是通过 head 和 tail 来控制链表
// struct timer_node结构体才是属于链表节点,链表由一个一个节点组成
我们为什么要设置struct link_list near数组的元素为一个链表呢?而不是直接一个数组存放一个定时器struct timer_node?
现在的struct link_list near数组一共有256个,每一个数组的间隔都代表10ms
那么在256 * 10 = 2560ms,也就是2.56秒内到期的定时器,都会被放到这个struct link_list near数组中
但存在一个问题:同一个时刻到期的定时器数量可能不止有1个,而是存在多个
也就是3ms,4ms,7ms,7ms,7ms到期的定时器,都需要被存放在struct link_list near[0]中
如果使用struct timer_node,则在同一个时间轮次,只能记录一个定时器
使用了struct link_list链表后,无论同一时刻的定时器的数量有多少,都可以无限加到链表中
(这种办法也有一个问题:假设链表的定时器太多,执行时间过长,会影响下一个时间轮的执行)
由上述的知识可知,near 数组和 t 二级数组都是存放 struct link_lis t结构体元素,也就是说数组中每个元素都控制着一条链表
链表的节点由 struct timer_node 描述,即每个链表节点都代表着一个定时器
时间轮算法
3,现在的问题是:为什么要设计 struct link_list near数组 和 struct link_list t二级数组?
知识背景
Linux旧内核中动态定时器的实现算法:时间轮
linux 内核定时器详解 - 博客园 (cnblogs.com)
基于Linux内核定时器时间轮实现方式 - 知乎 (zhihu.com)
Linux新内核中定时器的实现算法:红黑树
定时器三大实现:时间轮,红黑树,最小堆
这3种方法各有优点,适合不同的场景,可以根据实际业务来选择
问题描述
32位无符号整型,取值范围为:0 ~ 4294967295,最大长度为:4294967296 11111111 11111111 11111111 11111111 也就是说:当时钟滴答到达 4294967295 + 1 时,将会发生溢出 1滴答是10ms,4294967295 滴答就是 42949672950ms,也就是 42949672.95s 42949672.95 / 86400 == 497.10269618056 天 注意:再加1滴答就是 4294967296,也就是 42949672.96s,就会溢出为 0
每1次滴答,就是一个定时节点,并且这个定时节点控制着一条链表 现在摆在我们面前一个问题: 一个 struct link_list 结构体,代表着1滴答(10ms)内到期的定时节点 struct timer_node { struct timer_node *next; // 8字节指针 uint32_t expire; // 4字节整数 }; struct link_list { struct timer_node head; // 8 + 4 = 12字节 struct timer_node *tail; // 8字节指针 }; 由计算得知:一个 struct link_list 结构体的大小为:20Byte
现在我们有:4294967296 滴答,则有 20 Byte * 4294967296 = 85899345920 Byte 85899345920 Byte / 1024 / 1024 / 1024 = 80 GByte 也就是说:我们什么都没做,一个定时器都没加,就已经消耗了80GB的内存 这显然是不能接受的,我们需要一个办法来解决这个问题
解决方案
我们现在遇到的问题和日常生活中的时钟非常相似:60秒凑成1分钟,60分钟凑成1小时,12小时又凑成1圈
假设我们要用秒的单位来描述12小时,需要的数组大小就会是:12 * 60 * 60 = 43200
(这里忽略24小时制,12小时为1圈更直观,但他们的本质都是相同的)
我们通常都只会用秒针,分针,时针来描述时间,而不是43200这么大的数组
假设1天的时间从0秒开始算起,记为time = 0,并且这个 time 是单调递增的,范围为[0,43200]
记秒针为time_second,分针为time_minute,时钟为time_hour
那我们是如何通过time_second,time_minute,time_hour记录时间的呢?
随机一个在范围[0,43200]内的数字,random = 12500
可由计算得知:
time_second = 12500 % 3600 % 60 = 20秒
time_minute = math.floor(12500 % 3600 / 60) = 28分钟
time_hour = math.floor(12500 / 3600) = 3小时
也就是说,根据time_second,time_minute,time_hour的值,通过计算就能得知time的值
time_second数组的大小是60,time_minute数组的大小是60,time_hour数组的大小是12
一共就是60 + 60 + 12 = 132个数组就能代替43200的数组,节省了非常多的内存
当秒针time_second走完60格,分针time_minute就会走1格,并且秒针time_second会重置回第一格
当分针time_minute走完60格,时针time_hour就会走1格,并且分针time_minute会重置回第一格
当时针time_hour走完12格,就算走完1圈,并且时针time_hour会重置回第一格,time重置为0(我们人为规定12小时算半天)
也就是说:time的值虽然是单调递增的,但time对应的time_second,time_minute,time_hour的值却是在循环的
time_second,time_minute,time_hour都有自己的取值范围,并且变化的速率不一样
time_second变化的最快,time_minute比较一般,time_hour变化最慢
举个例子:
当time_second走完60秒,此时数组time_second[60]中的数据就已经过期了
我们根据time的值,算出time_minute的值为1,time_second的值为0,也就是下一分钟的位置应该读取time_minute[1]的数据
因此我们把time_minute[1]当中的数据重新映射到数组time_second[60]数组中
… … … …
当time_second又走完60秒,此时数组time_second[60]中的数据就再次过期
我们根据time的值,算出time_minute的值为2,time_second的值为0,也就是下一分钟的位置应该读取time_minute[2]的数据
因此我们把time_minute[2]当中的数据重新映射到数组time_second[60]中
直到最后,连time_minute[60]也遍历完了
我们根据time的值,算出time_out的值为1,time_minute的值为0,也就是下一分钟的位置应该读取time_hour[1]的数据
因此我们把time_hour[1]当中的数据重新映射到数组time_second[60]中
我们就根据时钟的原理,开发出时间轮定时器算法
上面内存占用过大的问题也能得到解决
时间精度问题
4,时间精度问题
Skynet根据游戏的业务需求,设置定时器的时间精度为10ms,具体的时间精度设定代码:
// 计算从1970年1月1日算起的系统当前时间
static void
systime(uint32_t *sec, uint32_t *cs) {
struct timespec ti;
clock_gettime(CLOCK_REALTIME, &ti); // 系统函数调用,用于获取系统日期时间
*sec = (uint32_t)ti.tv_sec; // 记录当前日期时间的 秒,单位为1秒
*cs = (uint32_t)(ti.tv_nsec / 10000000); // 记录当前日期时间的 厘秒,单位为0.01秒
}
// 计算linux系统从启动到现在的时间,不能被设置
static uint64_t
gettime() {
uint64_t t; // 用于存储时间值
struct timespec ti;
// linux系统从启动到现在的时间,不能被设置
// CLOCK_MONOTONIC 时钟,它是一个单调递增的时钟,不受系统时间调整的影响,适合用于测量时间间隔和计时器等应用
// 通过将秒数乘以 100 并加上纳秒数的一部分,函数将时间值转换为一个更大的整数,以提供更精确的时间表示
clock_gettime(CLOCK_MONOTONIC, &ti);
// 返回的值为:
// clock_gettime() 函数将返回一个 struct timespec 结构体
// 其中 ti.tv_sec 的值为系统开启到现在的秒数部分,ti.tv_nsec 的值为上一秒到这一秒的纳秒部分
// 但时间单位是以 0.01 秒为基础,即10ms,所以:
// ti.tv_sec * 100 的单位为 10ms
// ti.tv_nsec / 10000000 的单位为 10ms
//
// 假设 ti.tv_sec 的值为 78800,ti.tv_nsec 的值为 500000000(500万纳秒),那么根据函数的计算过程
// t = (uint64_t)ti.tv_sec * 100 + ti.tv_nsec / 10000000; (10ms就是10000000ns)
// t = (uint64_t)78800 * 100 + 500000000 / 10000000;
// t = 7880000 + 50;
// t = 7880050;
// 函数返回的时间值将是 7880050,即78800.5秒
t = (uint64_t)ti.tv_sec * 100;
t += ti.tv_nsec / 10000000;
return t;
}
// c语言自带的结构体
struct timespec {
time_t tv_sec; // seconds
long tv_nsec; // and nanoseconds
};
// struct timespec有两个成员,一个是秒,一个是纳秒, 所以最高精确度是纳秒
// 一般由函int clock_gettime(clockid_t, struct timespec *)获取特定时钟的时间,常用如下4种时钟:
// CLOCK_REALTIME 从1970年1月1日算起的系统当前时间
// CLOCK_MONOTONIC linux系统从启动到现在的时间,不能被设置
// CLOCK_PROCESS_CPUTIME_ID 本进程运行时间
// CLOCK_THREAD_CPUTIME_ID 本线程运行时间
// CLOCK_MONOTONIC 时钟是从系统启动开始计算的。它不受系统时间的影响,而是提供了一个单调递增的时间值,可以用于测量时间间隔和计时
// 具体来说,CLOCK_MONOTONIC 时钟的起始时间是系统启动时刻。它的值随着时间的推移而单调递增,不受系统时间调整(例如手动更改系统时间)的影响
// 这使得它非常适合于测量时间间隔和计时,而不会受到系统时间的干扰
// CLOCK_MONOTONIC 时钟并不保证与日历时间(即日期和时间)保持同步
// 如果您需要获取当前的日历时间,可以考虑使用 CLOCK_REALTIME 时钟,它受系统时间的影响,可以提供与日历时间相关的值
// CLOCK_MONOTONIC 时钟从系统启动时刻开始计算,提供了一个单调递增的时间值,适用于测量时间间隔和计时
// CLOCK_MONOTONIC 时钟可以用来表示系统的开机时间,而 CLOCK_REALTIME 时钟可以用来表示当前的日历时间
根据时间轮的算法,也就是每当时钟滴答一次,时间轮就会向前推进一步,也就是前进10ms
定时器代码分析
5,Skynet定时器代码分析
我们需要看看定时器是如何跑动执行的?首先出场的是定时器线程,因为定时器的执行都是由线程驱动的
static void *
thread_timer(void *p) {
struct monitor * m = p;
skynet_initthread(THREAD_TIMER);
for (;;) { // 不断循环
skynet_updatetime(); // 定时器线程在这里执行 skynet_updatetime()
skynet_socket_updatetime(); // 暂时忽略
CHECK_ABORT
wakeup(m, m->count-1); // 暂时忽略
// usleep()函数的单位是微秒,2500us则表示2.5ms
// 睡眠2.5ms后将会被唤醒
// 也就是每2.5ms会执行一下 skynet_updatetime()
usleep(2500);
if (SIG) {
signal_hup();
SIG = 0;
}
}
// wakeup socket thread
skynet_socket_exit();
// wakeup all worker thread
pthread_mutex_lock(&m->mutex);
m->quit = 1;
pthread_cond_broadcast(&m->cond);
pthread_mutex_unlock(&m->mutex);
return NULL;
}
再看看skynet_updatetime()的实现:
void
skynet_updatetime(void) {
uint64_t cp = gettime();
if(cp < TI->current_point) {
// 这里时间发生了回溯,产生错误
skynet_error(NULL, "time diff error: change from %lld to %lld", cp, TI->current_point);
TI->current_point = cp;
} else if (cp != TI->current_point) { // TI->current_point 就是在最开始初始化时间的时候,通过gettime()记录的时间值
uint32_t diff = (uint32_t)(cp - TI->current_point); // 计算出时间差值,差值以10ms为准,即10000000ns
TI->current_point = cp; // 把 [开始时间] 重置为 [现在的时间]
TI->current += diff; // 记录 [完成时间初始化] 到 [现在的时间] 一共走过多少个10ms,即过了多少时钟滴答
int i;
for (i=0;i<diff;i++) {
timer_update(TI); // 逐步更新每个时间滴答
}
}
// 如果按照定时器线程2.5ms执行一次的频率
// 在前3次的过程中,cp == TI->current_point
//
// 为什么在前3次的调用过程中,会一直是 cp == TI->current_point 这种情况呢?
//
// 还是以例子来说明:
// 假设 ti.tv_sec 的值为 78800,ti.tv_nsec 的值为 500000000(500万纳秒)
// 最后我们知道 TI->current_point == 7880050
//
// 在 78800s 和 500000000(500万纳秒)的基础上,时间继续过了2.5ms
// 现在我们知道:2.5ms等于 2500000 ns
// 则有:78800s +(2500000 + 500000000)/ 10000000 == 7880050.25,则小数部分会被丢弃
// 最后的结果还是TI->current_point == 7880050
// 而到了第4次,78800s +(2500000*4 + 500000000)/ 10000000 == 510000000/10000000 == 51
// 所以会造成前3次的空转,直到第4次,cp != TI->current_point
// 特别注意:这并不是严格的保持4次就会产生一次diff
// 因为有时候timer线程被唤醒的时候,cp = gettime()获取的时间点已经越过TI->current_point好几个diff了
// 因为timer线程到时间应该被唤醒的时候,并不一定立即能够得到CPU的时间片
// 所以到下一次timer线程被唤醒的时候,会执行多个diff,追上现在的时间点
}
然后我们看看每10ms的时钟滴答,都做了些什么:
static void
timer_update(struct timer *T) {
SPIN_LOCK(T);
// try to dispatch timeout 0 (rare condition)
timer_execute(T); // 执行到期的定时器,这个我们先忽略,后面再详细说
// shift time first, and then dispatch timer message
// 在这里执行了timer_shift()函数,也就说每隔10ms,就会执行一次这个函数
// 下面我们就需要重点看看这个函数的实现
timer_shift(T);
timer_execute(T); // 这里为什么又要执行一次呢?
SPIN_UNLOCK(T);
}
继续重点看timer_shift()函数做了什么:
// 目前我们有:
// 256位的near数组
// T0数组的64位
// T1数组的64位
// T2数组的64位
// T3数组的64位
static void
timer_shift(struct timer *T) {
int mask = TIME_NEAR; // TIME_NEAR == 256
// 根据前面 timer_create_timer() 函数我们知道 TI->time 被设置为了0
uint32_t ct = ++T->time; // 这里等于过了1滴答,也就是10ms,T->time就会+1
if (ct == 0) {
move_list(T, 3, 0); // 为什么会move_list(T, 3, 0)函数传 3 和 0?后面再详细解释
} else {
uint32_t time = ct >> TIME_NEAR_SHIFT; // TIME_NEAR_SHIFT == 8
// ct = 1,则time = 0
// ct = 2,则time = 0
// ...
// ct = 255,则time = 0
// ct = 256,则time = 1
//
// 当ct = 256,time = 1的时候,代表着最新的near数组已经完成遍历
// 此时轮到范围[2.57-5.12]到期的定时节点链表,也就是ct[2570 - 5120]的范围
// 2.57s-5.12s是相对于0时刻而言,对于现在时刻来说就是下一个0-2.56s
//
// 0 < ct <= 255,time = ct >> TIME_NEAR_SHIFT为0
// 256 <= ct < 512,time = ct >> TIME_NEAR_SHIFT变为了1
// 512 <= ct < 768,time = ct >> TIME_NEAR_SHIFT变为了2
// 768 <= ct < 1024,time = ct >> TIME_NEAR_SHIFT变为了3
int i = 0;
// ((ct & (mask-1))==0) 其实就是以 ct 的值,来判断near数组是否已经遍历了256
//
// ct == 256,ct & (mask-1)结果为0,总数为256
// ct == 512,ct & (mask-1)结果为0,总数为256
// ct == 768,ct & (mask-1)结果为0,总数为256
//
// 若是已经遍历了256,此时应该把下一个数组填到near数组
while ((ct & (mask-1))==0) {
// 此时的问题就是:在T0,T1,T2,T3中,应该如何选择?
// 在选择了T0,T1,T2,T3之后,在 0 ~ 63之间又该如何选择?
// idx的值为time & TIME_LEVEL_MASK = 1
//
// 当idx!=0,表明需要把T0[1]上的定时节点链表,放入near数组
// 以此类推,当ct = 256 * 64 = 16384
// 当ct==16384,ct & (mask-1) == 0
// time = ct >> TIME_NEAR_SHIFT变为了64,idx的值为time & TIME_LEVEL_MASK = 0
int idx = time & TIME_LEVEL_MASK; // TIME_LEVEL_MASK == 63
// 当 idx != 0,表明这个级别的数组还没用完
if (idx!=0) {
move_list(T, i, idx);
break;
}
// 当 idx == 0了,表示T0这个类别的数组也用完了
// 当上一级的数组已经用完了
// 此时就需要把mask掩码移动到下一级的数组
// 那应该移动多少呢?肯定是下一级数组的数量64,即向左移动6位
// 255的二进制11111111,刚好对应near数组的0 - 255位
// 当向左移动64个数组,就是向左移动6位:11111111000000
// 以此类推
mask <<= TIME_LEVEL_SHIFT; // TIME_LEVEL_SHIFT == 6,把mask往左移动6位,读取到65-128的数组,也就是T1的64个元素
time >>= TIME_LEVEL_SHIFT; // TIME_LEVEL_SHIFT == 6,time也置为1,等于读取T1[1]上的定时节点链表
++i;// 把数组等级定位到T1
}
}
}
我们再来看看现在拥有的数组
256位的near数组 T0数组的64位 T1数组的64位 T2数组的64位 T3数组的64位
可以发现我们的数组就是跟秒针,分针,时针一样排列,每个数组代表的权重不同,速率也不同,只不过我们的层级比较多而已,原理是一样的
疑问解答
还有个疑问:
当我们的ct时间,已经落在T3[62]的部分时,此时还剩下T3[63]这一个0-2.56s的定时器链表,那么这个时候我们设置一个很大的定时器,它会被安排到哪里呢?
此时我们需要看看定时器插入的实现:
static void
add_node(struct timer *T, struct timer_node *node) {
uint32_t time = node->expire; // 先读取 过期的时间点
uint32_t current_time = T->time; // 读取当前的时间
// 时间落在near数组上
// (time|TIME_NEAR_MASK)==(current_time|TIME_NEAR_MASK)比较,两个时间数字的前24位是否一样
// 如果一样,代表这个事件的触发时间在当前256个1/100秒内,然后将这个事件节点加入到对应的队列中
// 如果两个时间数字的前24位不一样,则应该将这个事件加入相应的高位的时间队列里面
if ((time|TIME_NEAR_MASK)==(current_time|TIME_NEAR_MASK)) {
// time&TIME_NEAR_MASK 是找到对应的数组index
// link()函数就是把定时器time_node加入到对应数组位置的链表中
link(&T->near[time&TIME_NEAR_MASK],node);
} else {
int i;
// i = 0,代表 T0
// i = 1,代表 T1
// i = 2,代表 T2
// i = 3,没有下一个数组了
uint32_t mask = TIME_NEAR << TIME_LEVEL_SHIFT;
for (i=0;i<3;i++) {
// (time|(mask-1)) 算出应该位于第几等级的数组
if ((time|(mask-1))==(current_time|(mask-1))) {
break;
}
mask <<= TIME_LEVEL_SHIFT;
}
// link(&T->t[i][((time>>(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)], node);
// 上面已经知道了i,即第几等级的数组
//
// 其实& TIME_LEVEL_MASK的操作就是保证不要超过63,可以先去掉,变为:
// link(&T->t[i][(time>>(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT))], node);
//
// 现在需要确定time应该放在该等级数组的哪个index(这个操作对于每个level的数组都一样的)
// 因为time是要过期的点,等于 time = node->expire = 参数(300) + T->time(0)
// 当time==300落在T0数组时候,time>>(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT)实际就是:
// 300>>8 + 0,即落在T0[1]组
// 而如果是level=1的数组,此时移动的位置就需要加上i*TIME_LEVEL_SHIFT
// 因为要移动前一个等级的数组位,因此就是TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT
link(&T->t[i][((time>>(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)], node);
}
}
其实我们根据 time = node->expire = 参数 + T->time可以得知:
当我们的ct时间,已经落在T3[62]的部分时,这个时候T->time的值是非常接近32位的最大值了
如果这个时候我们设置一个很大的定时器,node->expire = 参数(很大的定时器) + T->time,就会发生溢出
这个时候得到的time就会从0,1,2,3……开始重新分布
但轮回的地方并不是在near数组,或者是对应的T0,T1,T2等数组
这里有个非常特殊的点:
对于T0,T1,T2来说,time的值都是不可能为0的,因此time»(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT)也就不可能为0
这意味着T0,T1,T2就不可能会有T0[0],T1[0],T2[0]的存在
而对于落在T3的time来说,此时可能会被置为0,假设time = node->expire = 参数(4294967295) + T->time(1)
time = 参数(4294967295) + T->time(1)= 4294967296(发生溢出) = 0
0 & TIME_LEVEL_MASK = 0,这个时候就会把time放到T3[0]的位置
其实任意 参数(x)+ T->time(y)- 4294967295 < 2^26 = 67108864 的情况,都会把time放进T3[0]的位置
这个公式也就是说:凡是溢出的部分,小于67108864(也是T0 + T1 + T2的大小),都会被放进T3[0]的位置
那溢出的部分大于等于67108864呢?会被放进T3[1],T3[2]对于的位置,这个时候的定时器是不能正常工作的
我们再看看代码
if (ct == 0) {
move_list(T, 3, 0);
这个代码代表着time此时已经重置为0,此时需要把之前溢出的部分,重新映射到near数组,故有move_list(T, 3, 0)
此时如果还有新的定时器过来,就会按规则加到对应的T0,T1,T2,T3,从而完成循环
另外要注意:定时器的取值一定有个范围
根据 skynet框架中定时器实现 时间轮算法 中的时钟滴答数为: T -> time
T -> time 的类型是uint32,即表示范围为[0,4294967295]
32位无符号整型,取值范围为:0 ~ 4294967295,最大长度为:4294967296 11111111 11111111 11111111 11111111 也就是说:当时钟滴答到达 4294967295 + 1 时,将会发生溢出
1滴答是10ms,4294967295 滴答就是 42949672950ms,也就是 42949672.95s 42949672.95 / 86400 == 497.10269618056 天
在lua层调用skynet.timeout(ti, call_back_func)中,会来到lua层-c层的接口
// 文件在skynet/lualib-src/lua-skynet.c
// 代码在该文件的 static int lintcommand() 函数中
if (lua_gettop(L) == 2) {
if (lua_isnumber(L, 2)) {
int32_t n = (int32_t)luaL_checkinteger(L,2); // 这里获取到 lua层skynet.timeout()传入的ti
// 这里n的正数部分:[0, 2147483647]
sprintf(tmp, "%d", n);
parm = tmp;
} else {
parm = luaL_checkstring(L,2);
}
}
当我lua层设置的 expire > 2147483647时,即 skynet.timeout(expire,call_back_func)
会直接导致 n 发生溢出,此时 n < 0,最后在skynet的判定下 expire < 0,立马执行了
1滴答是10ms,2147483647 滴答就是 21474836470 ms,也就是 21474836.47 s 21474836.47 / 86400 == 248.55134803241 天
我的疑问:
假设我要设一个300天后到期的定时器(虽然不会这么写),按照定时器的实现,300天的时间是支持的
但是通过skynet.timeout(300 * 86400 * 100,call_back_func),传进去的300 * 86400 * 100最后溢出变为-1702967296
-1702967296 = time < 0,此定时器立即执行
既然在定时器的实现中,已经把定时器有效时间范围设定在[0,4294967295]
在调用的API层的时间范围却只能是[0,2147483647]
为什么要这么设计呢?
云风的回答:
因为实现和接口是两码事 skynet 并不支持超过 300 天的定时器 我认为这个限制是 300 天还是 600 天并无区别,终究它有一个限制
我的讨论:skynet定时器的问题 · cloudwu/skynet · Discussion #1851 (github.com)
如何使用定时器
6,在进一步探究定时器功能前,我们需要知道:Skynet上层是如何调用定时器接口的?即定时器的请求从哪里来?需要带什么参数?
Skynet提供的定时器接口如下:
skynet.timeout(time_duration,call_back_func) -- 注意:这个call_back_func是lua层的函数
在lualib目录下,skynet.lua文件,第478行就是skynet.timeout()的实现
function skynet.timeout(ti, func)
local session = auxtimeout(ti)
assert(session)
local co = co_create_for_timeout(func, ti)
assert(session_id_coroutine[session] == nil)
session_id_coroutine[session] = co
return co -- for debug
end
这里的函数实现比较复杂,auxtimeout(ti)函数是为了修复一个bug而新增的,bug详情:GitHub中的issue编号#1798 #1797
skynet.timeout()旧版本的实现和新版本的实现是一致的,不同的只是新版本对返回的值做了一下检查
我们暂时不关注这个bug的解决方案,所以回退到旧版本看看skynet.timeout()的实现:
-- 版本号93ea565bcb1754fdac3c0b6219c24206cadb9851
function skynet.timeout(ti, func)
local session = c.intcommand("TIMEOUT",ti)
assert(session)
local co = co_create_for_timeout(func, ti)
assert(session_id_coroutine[session] == nil)
session_id_coroutine[session] = co -- 建立 请求消息session 和 协程co 的映射关系
return co -- for debug
end
先看看究竟什么是c.intcommand()函数
c的实质是一份c代码,被编译成so库,这份c代码存储的目录是lualib-src,so库的存储目录在luaclib下,也就是说lua层最后引用的是so库
-- lua层引用c代码
local c = require "skynet.core" -- 在lualib目录中,skynet.lua文件,引用skynet.so库
c层的实现代码
// lualib-src源目录中的lua-skynet.c文件
// 在luaclib目录中,被编译为skynet.so
LUAMOD_API int
luaopen_skynet_core(lua_State *L) {
luaL_checkversion(L);
luaL_Reg l[] = {
{ "send" , lsend },
{ "genid", lgenid },
{ "redirect", lredirect },
{ "command" , lcommand },
{ "intcommand", lintcommand }, // 出现了我们使用的调用 c.intcommand(),intcommand 映射为 lintcommand
{ "addresscommand", laddresscommand },
{ "error", lerror },
{ "harbor", lharbor },
{ "callback", lcallback },
{ "trace", ltrace },
{ NULL, NULL },
};
// functions without skynet_context
// 这里注册的 function 是没有服务context的
luaL_Reg l2[] = {
{ "tostring", ltostring },
{ "pack", luaseri_pack },
{ "unpack", luaseri_unpack },
{ "packstring", lpackstring },
{ "trash" , ltrash },
{ "now", lnow },
{ "hpc", lhpc }, // getHPCounter
{ NULL, NULL },
};
lua_createtable(L, 0, sizeof(l)/sizeof(l[0]) + sizeof(l2)/sizeof(l2[0]) -2);
// 这个操作非常重要:把lua虚拟机中的skynet_context放入到栈里面
// lua_getfield 函数的作用是从栈中的指定表中获取指定字段的值,并将该值压入栈顶
// 如果字段不存在,或者表不是一个有效的 Lua 表,函数将会将一个 nil 值压入栈顶
lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
// 这个操作非常重要:把栈中的数据取出来
// lua_touserdata 函数的作用是从指定位置的 Lua 值中获取用户数据,并将其解释为一个指针
// 如果指定位置的值不是用户数据类型,或者索引无效,函数将返回 NULL。
struct skynet_context *ctx = lua_touserdata(L,-1);
if (ctx == NULL) {
return luaL_error(L, "Init skynet context first");
}
// luaL_setfuncs 函数的作用是将一个 C 函数数组中的多个函数注册到 Lua 表中
// 每个函数都会被注册为表的一个字段,并且可以在 Lua 环境中直接调用这些函数
luaL_setfuncs(L,l,1);
luaL_setfuncs(L,l2,0);
return 1;
}
实际上luaclib上所有的so库函数,都是为了给lua层调用c层提供的接口
接着我们可以看到我们调用的函数,被映射到c代码的lcommand函数,实现如下:
static int
lintcommand(lua_State *L) {
// 直接获取到了lua虚拟机的context
// 即获取搞了skynet中一个服务的skynet_context
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
const char * cmd = luaL_checkstring(L,1); // 读取到命令 TIMEOUT
const char * result;
const char * parm = NULL;
char tmp[64]; // for integer parm, 记录时间参数
if (lua_gettop(L) == 2) {
if (lua_isnumber(L, 2)) {
int32_t n = (int32_t)luaL_checkinteger(L,2);
sprintf(tmp, "%d", n);
parm = tmp;
} else {
parm = luaL_checkstring(L,2);
}
}
result = skynet_command(context, cmd, parm);
if (result) {
char *endptr = NULL;
lua_Integer r = strtoll(result, &endptr, 0);
if (endptr == NULL || *endptr != '\0') {
// may be real number
double n = strtod(result, &endptr);
if (endptr == NULL || *endptr != '\0') {
return luaL_error(L, "Invalid result %s", result);
} else {
lua_pushnumber(L, n);
}
} else {
lua_pushinteger(L, r);
}
return 1;
}
return 0;
}
继续看skynet_command()函数的实现:
const char *
skynet_command(struct skynet_context * context, const char * cmd , const char * param) {
struct command_func * method = &cmd_funcs[0]; // 这里是一系列命令的集合,具体实现看下面
while(method->name) {
if (strcmp(cmd, method->name) == 0) { // 把传入来的cmd命令和cmd_funcs命令的集合进行对比
return method->func(context, param); // 命令对比成功,直接执行该命令的执行函数,这里会把生成的session返回去给lua层的skynet服务
}
++method;
}
return NULL;
}
static struct command_func cmd_funcs[] = {
{ "TIMEOUT", cmd_timeout }, // lua层 c.intcommand("TIMEOUT",ti) 的调用,最终在这里进行映射,会调用cmd_timeout
{ "REG", cmd_reg },
{ "QUERY", cmd_query },
{ "NAME", cmd_name },
{ "EXIT", cmd_exit },
{ "KILL", cmd_kill },
{ "LAUNCH", cmd_launch },
{ "GETENV", cmd_getenv },
{ "SETENV", cmd_setenv },
{ "STARTTIME", cmd_starttime },
{ "ABORT", cmd_abort },
{ "MONITOR", cmd_monitor },
{ "STAT", cmd_stat },
{ "LOGON", cmd_logon },
{ "LOGOFF", cmd_logoff },
{ "SIGNAL", cmd_signal },
{ NULL, NULL },
};
接着看cmd_timeout()函数的实现
static const char *
cmd_timeout(struct skynet_context * context, const char * param) {
char * session_ptr = NULL;
// strtol() 是 C 语言标准库中的一个函数,用于将字符串转换为长整型(long)数值
// 为什么 skynet.timeout(100,call_back_func)传入的是100这个整型
// 现在 param 变为 const char * param 呢?
// 是因为 lintcommand(lua_State *L) 函数在接收lua层的参数时
// 将其放入了一个数组中进行存储,因此变为 const char * param
int ti = strtol(param, &session_ptr, 10);
// 这里根据skynet服务的context生成一个session
// 由本服务生成一个新的session,用来标记本服务的消息
// 为什么在这里能生成skynet服务的session呢?
// 因为这里是c层,可以直接调用skynet_context_newsession
// skynet服务的底层本身就是c层实现的
int session = skynet_context_newsession(context);
skynet_timeout(context->handle, ti, session); // 传递目的服务handle,过期时间,以及消息的session
sprintf(context->result, "%d", session);
return context->result; // 这里把生成的session返回去给lua层的skynet服务
}
最后我们可以看到调用了定时器模块的skynet_timeout()函数:
int
skynet_timeout(uint32_t handle, int time, int session) {
if (time <= 0) { // 如果time时间小于0,表示这个定时器立即生效
// 构建一个消息,发给调用定时器的服务
// 服务处理这个消息时,就知道时间到了
struct skynet_message message;
message.source = 0;
message.session = session;
message.data = NULL;
message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;
if (skynet_context_push(handle, &message)) { // 把定时消息推送给服务
return -1;
}
} else {
struct timer_event event;
event.handle = handle; // 记录消息的handle,方便后面构造消息
event.session = session; // 记录消息的session,方便后面构造消息
timer_add(TI, &event, sizeof(event), time); // 在这里增加定时器
}
return session;
}
最后来到增加定时器的API实现:
static void
timer_add(struct timer *T, void *arg, size_t sz, int time) {
// 分配struct timer_node结构体的大小 + struct timer_event结构体的大小
struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz);
// 把struct timer_event = {handle = handle,session = sessin}放入内存
memcpy(node+1, arg, sz);
SPIN_LOCK(T);
node->expire = time + T->time;
add_node(T,node); // 正式加入定时器链表
SPIN_UNLOCK(T);
}
现在让我们来总结一下定时器的调用流程:
skynet.timeout(ti, func) —> c.intcommand(“TIMEOUT”, ti) —> static int lcommand(lua_State *L) —> skynet_command(context, cmd, parm) —>
static const char * cmd_timeout(struct skynet_context * context, const char * param) —> skynet_timeout(context->handle, ti, session) —>
timer_add(TI, &event, sizeof(event), time) —> add_node(T, node)
现在我们知道了如何在Skynet的上层,即在Lua层调用定时器,并且需要携带time_duration + call_back_func一共2个参数
如何添加定时器
7,现在我们可以继续看看是如何把定时器加到对应的位置的?
对外的接口就是:
static void
timer_add(struct timer *T, void *arg, size_t sz, int time) {
struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz);
memcpy(node+1, arg, sz);
SPIN_LOCK(T);
// 实际的过期时间 = T现在的时间点 + time时长
// time时长是由用户设置,传进来的参数
// 特别需要注意的是这个node->expire可能会发生溢出
node->expire = time + T->time;
add_node(T,node);
SPIN_UNLOCK(T);
}
执行每个定时节点链表的函数
static inline void
timer_execute(struct timer *T) {
int idx = T->time & TIME_NEAR_MASK;
while (T->near[idx].head.next) {
struct timer_node *current = link_clear(&T->near[idx]);
SPIN_UNLOCK(T);
// dispatch_list don't need lock T
dispatch_list(current); // 开始执行链表上每个节点的定时器
SPIN_LOCK(T);
}
}
通过定时器上的数据,给对应的服务发送消息
static inline void
dispatch_list(struct timer_node *current) {
do {
struct timer_event * event = (struct timer_event *)(current+1);
struct skynet_message message;
message.source = 0;
message.session = event->session;
message.data = NULL;
message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;
skynet_context_push(event->handle, &message);
struct timer_node * temp = current;
current=current->next;
skynet_free(temp);
} while (current);
}
最后就剩下
// 把某个定时节点链表清理掉
static inline struct timer_node *
link_clear(struct link_list *list) {
struct timer_node * ret = list->head.next;
list->head.next = 0;
list->tail = &(list->head);
return ret;
}
// 把某个定时器,添加到定时节点链表的最后面
static inline void
link(struct link_list *list,struct timer_node *node) {
list->tail->next = node;
list->tail = node;
node->next = 0;
}
特意要说明一下的是:link_clear()函数并不是把整个定时器链表free掉,而是从定时节点中摘出来,返回给调用者
static void
move_list(struct timer *T, int level, int idx) {
struct timer_node *current = link_clear(&T->t[level][idx]);//把这个定时节点链表,摘下来给调用者
while (current) {
struct timer_node *temp = current->next;
add_node(T,current);// 把链表上的节点,也就是定时器,重新映射,加入到数组中
current=temp;
}
}
定时器的回调函数
8,还剩最后一个问题:当定时器把定时消息发给对应服务的时候,服务是如何找到正确的call_back_func的呢?
就是依靠处理定时器的代码
static inline void
dispatch_list(struct timer_node *current) {
do {
struct timer_event * event = (struct timer_event *)(current+1);
struct skynet_message message;
message.source = 0;
message.session = event->session;// 此时用到前面存的,该服务消息的session了
message.data = NULL;
message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;
// 构造一个消息,把定时消息发给对应的服务
skynet_context_push(event->handle, &message);// 此时用到前面存的,该服务的handle了
struct timer_node * temp = current;
current=current->next;
skynet_free(temp);
} while (current);
}
让我们重温一下最初的代码:
function skynet.timeout(ti, func)
local session = c.intcommand("TIMEOUT",ti) -- 直接调用c层的函数intcommand(),并传入命令TIMEOUT,参数ti
assert(session)
local co = co_create(func) -- 调用协程执行call_back_func
assert(session_id_coroutine[session] == nil)
session_id_coroutine[session] = co
return session
end
看到这里我们现在明白了:skynet服务新建了一个协程,并且把call_back_func作为该协程的执行函数
c.intcommand(“TIMEOUT”,ti)函数返回的session,这个session也是skynet.timeout(ti, func)的返回值
被记录在session_id_coroutine数组中,通过 session_id_coroutine[session] = co 把某个消息和具体的session对应起来
这样当skynet服务执行邮箱的消息时,发现该消息的session对应的协程是某个co,而该co又绑定了call_back_func
从而执行了用户设定的call_back_func,最后完成了定时器的功能!
本质就是:
1,skynet.timeout(ti, func) 函数通过调用底层c函数,获得了此服务的一个session,并把这个session放到定时器的节点中存储
2,skynet.timeout(ti, func) 函数也会把这session返回给调用者,并在lua层创建一个协程co,存储在session_id_coroutine
3,定时器到期后,会给此服务发送一个消息,消息就带有这个session,从而唤醒对应的协程co
4,最后对应的协程co会执行在创建时,传入的func
留下疑问:为什么设定了session_id_coroutine[session] = co之后,该消息就会被此协程执行呢?
这部分知识已经不属于定时器的部分了,将在服务实现模块进行讲解
另外定时器的取消,也是根据 session_id_coroutine[session] = co 的设计来把定时器取消,所以到时候一起详细说明
总结
到这里定时器timer的全部内容就在这里了
定时器timer主要内容是时间轮算法,可以根据时钟的原理进行思考
一般情况下我们都是使用定时器,并不会去改变它,但依然需要知道一些规则:
1,定时器的大小设定范围[0, 2147483647],也就是248.55134803241 天
因此我们在业务层的代码逻辑中,应该注意,不要设置过大的定时器
而是应该通过封装,不断重置定时器,循环定时
2,我们不应该在同一个时间点设置过多的定时器
一个定时节点链表如果长度太长,必定会引起执行时间过长
影响下一个节点的执行,也会导致同一时间发出的消息太多,造成卡顿
3,尽量少地设置定时器,通过业务逻辑的设计,以及跟客户端的协商来避免定时器
如果实在没办法避免定时器,也尽可能地把这些定时器做到公用
由一个公共定时器发消息给某个服务,某个服务再通知所有订阅者,而不是每个订阅者都自己设一个定时器
4,做好定时器的统一管理和记录,也要即使销毁定时器