全局消息队列
1,看下消息队列首先被调用的地方
在skynet_start.c文件中,第273行被调用skynet_mq_init(),代码如下:
void
skynet_mq_init() {
struct global_queue *q = skynet_malloc(sizeof(*q)); // 分配内存给global_queue结构体
memset(q, 0, sizeof(*q)); // 初始化全局消息队列,把所有的数据置为0
SPIN_INIT(q); // 初始化自旋锁
Q = q; // 赋值给全局单列Q
}
其中Q是skynet_mq.c中定义的global_queue结构体,代码如下:
static struct global_queue *Q = NULL; // static 表示Q指针这个全局变量只对定义在同一文件中的函数可见
那么global_queue结构体的定义是什么呢?
struct global_queue {
struct message_queue *head; // 指向全局消息队列中,头部的消息
struct message_queue *tail; // 指向全局消息队列中,尾部的消息
struct spinlock lock; // 自旋锁结构定义
};
该如何理解这个global_queue结构体的作用呢?
假设你管理了200个班级,并且每个班级的人数都不相同,此时你要批改200个班级所有学生的作业,你会如何安排每个班级的作业批改呢?
这个global_queue结构体就是用来管理这200个班级的批改记录
struct message_queue *head用来记录现在你批改到第几个班级,struct message_queue *tail用来记录最后一个班级是哪个
刚开始的时候,head记录的是第1个班级,tail记录的是第100个班级。随着作业的批改,head记录的是第50个班级
在批改作业的时候剩余班级的作业也都交了上来,tail记录变为最后第200个班级
global_queue结构体通过记录全局消息队列的头部和尾部来达到控制全局消息队列的目的,但其本身不属于全局消息队列
全局消息队列:入队和出队
2,既然是队列,那自然就是有进队列和出队列的操作,全局消息队列的操作实现如下:
// 入队列,把消息 插入 全局消息队列
void
skynet_globalmq_push(struct message_queue * queue) {
struct global_queue *q = Q; // 获取到全局单列Q
SPIN_LOCK(q) // 上锁,免得被别人打扰
assert(queue->next == NULL);
if(q->tail) {
q->tail->next = queue; // 最后一个节点的 next 节点设置为queue
q->tail = queue; // 把 最后一个节点 设置为queue
} else {
q->head = q->tail = queue; // 这种是只有一个节点的情况
}
SPIN_UNLOCK(q)
}
// 出队列,把消息 弹出 全局消息队列
struct message_queue *
skynet_globalmq_pop() {
struct global_queue *q = Q; // 获取到全局单列Q
SPIN_LOCK(q) // 上锁,免得被别人打扰
struct message_queue *mq = q->head; // 记录好现在的头部节点
if(mq) {
q->head = mq->next; // 把现在头部节点的下一个节点 设置为 新的头部节点
if(q->head == NULL) { // 假设新的头部节点为空,则表示全局队列空了
assert(mq == q->tail);
q->tail = NULL;// 把最后的节点也置为NULL
}
mq->next = NULL; // 把弹出节点的 next 去掉,免得被人获得
}
SPIN_UNLOCK(q)
return mq;
}
总结
对于全局消息队列的操作只有3个
1,初始化:skynet_mq_init(),初始化用于控制全局消息队列的global_queue结构体
2,插入操作:skynet_globalmq_push(struct message_queue * queue)
3,弹出操作:struct message_queue * skynet_globalmq_pop()
次级消息队列
3,那我们插入全局消息队列到底是什么呢?
实际插入到全局消息队列中的节点,就是message_queue结构体,代码如下:
struct message_queue {
struct spinlock lock; // 自旋锁结构定义
uint32_t handle; // 用于标记本条次级消息队列属于哪个服务,每个服务都有自己唯一的handle
int cap; // 次级消息队列的容量
int head; // 次级消息队列头部消息的位置
int tail; // 次级消息队列尾部消息的位置
int release; // 用于标记本条次级消息队列是否需要释放,即:用于标记是否需要把本条次级消息队列的内存释放
int in_global; // 用于标记本条次级消息队列是否已经在全局消息队列global_queue中排队
int overload; // 用于标记本条次级消息队列是否已经过载,即:本条次级消息队列的消息数量是不是太多了
int overload_threshold; // 用于设置本条次级消息队列的过载的阈值
struct skynet_message *queue; // 指向本条次级消息队列 实际存放消息的数组
struct message_queue *next; // 指向下一个全局队列消息节点
};
该如何理解这个全局消息队列message_queue结构体的作用呢?
global_queue结构体用于记录200个班级的顺序,谁是head,谁是tail,但global_queue结构体本身不属于此队列
我们只是用global_queue结构体来描述:1班级–2班级–3班级–…–199班级–200班级这个全局消息队列而已
按照这个全局消息队列的排序,你先批改1班级的作业,问题是:如何安排班级内每个学生的作业呢?
message_queue结构体的head用于记录批改到本班级内第几个学生的作业,tail记录本班级最后一个学生的作业
实际存放消息的queue是以数组的形式存在,自然只需要int型的head和tail就能找到
那为什么不像global_queue结构体那样记录指针head和指针tail呢?(后面会解答)
next是指向下一个班级的指针,即:全局消息队列中的下一个节点
因此我们把数组 struct skynet_message *queue 称为次级消息队列
此时让我们看看skynet_message结构体的构成:
struct skynet_message {
uint32_t source; // 用于记录此消息来源于哪个服务,即:另一个服务的handle
int session; // 记录本服务内消息的id,一般都是递增
void * data; // 直接传递指向消息地址的指针,这样可以不用复制消息,提高效率和节省内存
size_t sz; // 因为指向消息的指针是void,没有具体类型,即:指针指到某个地址,却不知道往后面读取多少字节的数据,因此需要size_t
};
我们看看次级消息队列是如何初始化这个数组的,代码如下:
/* 创建64位的数组来实现队列 */
struct message_queue *
skynet_mq_create(uint32_t handle) {
struct message_queue *q = skynet_malloc(sizeof(*q)); // 为message_queque初始化内存
q->handle = handle; // 绑定此消息队列属于哪个服务
q->cap = DEFAULT_QUEUE_SIZE; // 默认消息队列容量DEFAULT_QUEUE_SIZE = 64
q->head = 0; // 头部指向0
q->tail = 0; // 尾部指向0
SPIN_INIT(q)
// When the queue is create (always between service create and service init) ,
// set in_global flag to avoid push it to global queue .
// If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.
// 原本这里的正常的流程是:
// 把q->in_global = MQ_IN_GLOBAL设置为在全局消息队列,再接着调用skynet_globalmq_push(q)把次级消息队列放入全局队列
// 这里却虚假的标记q->in_global = MQ_IN_GLOBAL已经在全局消息队列了,实际却没有执行skynet_globalmq_push(q)
// 没有执行skynet_globalmq_push(q),工作线程便不会有机会获取到 本条次级消息队列
// 而是等待服务初始化成功,会再一次调用skynet_globalmq_push(q),才把次级消息队列真正放入全局消息队列
// 在skynet_server.c中第169行,服务初始化完成后,执行:skynet_globalmq_push(queue)
q->in_global = MQ_IN_GLOBAL; // MQ_IN_GLOBAL == 1, 0表示不在全局消息队列,1表示在全局消息队列或者此消息已经被处理
q->release = 0;
q->overload = 0;
q->overload_threshold = MQ_OVERLOAD; // 消息负载的阈值 MQ_OVERLOAD == 1024
q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap); // 在此被分配了64个skynet_message结构体
q->next = NULL;
return q;
}
可以看到,q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap) 为次级消息队列分配了默认的64个skynet_message结构体
次级消息队列:入队和出队
4,既然说skynet_message结构体也是消息队列(由数组实现),那它应该也有进队列和出队列的操作
再看看进队列和出队列的操作,代码如下:
// 入队列,把消息 插入 次级消息队列queue,queue由skynet_mq_create()初始化了64位的数组
void
skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
assert(message);
SPIN_LOCK(q) // 上锁
q->queue[q->tail] = *message;
// ++ q->tail 表示把tail+1,再与容量cap比较
// 我们假设次级消息队列一直不消费消息,累计到了tail=64,本次消息还有容量,但下一个消息就没位置了
// 此时++ q->tail为65,65 > q->cap,此时会导致q->tail == 0
// tail == 0表明已经没有空间可用了
if (++ q->tail >= q->cap) {
q->tail = 0;
}
// 又因为我们假设次级消息队列一直不消费消息,则表明head一直为0
// 此时tail == 0 == head,触发扩容
// 另一种情况是:head不等于0,假设head == n,但是tail已经达到最大值,则会导致tail是0,占位发生回溯
// 当发生回溯时,tail没有赶上head,新的消息就会占用旧的位置,不会触发扩容,可以重复利用
// 当发生回溯时,tail赶上了head,此时就会触发扩容,此时不要求 tail == 0 == head
if (q->head == q->tail) {
expand_queue(q); // 扩大次级消息队列的容量
}
if (q->in_global == 0) { // 如果次级消息队列没有在全局消息队列,就把它加入进去
q->in_global = MQ_IN_GLOBAL; // 设置已经在全局消息队列的标记
skynet_globalmq_push(q); // 加入全局消息队列
}
SPIN_UNLOCK(q)
}
// 出队列,把消息 弹出 次级消息队列queue,queue由skynet_mq_create()初始化了64位的数组
int
skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {
int ret = 1;
SPIN_LOCK(q) // 上锁
if (q->head != q->tail) {
// q->head++ 表示先使用head,再把head+1
// 当head == 0,tail为1,即次级消息队列只有1条消息时,q->queue[q->head++]才不会读取错误
// 因为C的数组下标是从0开始的
*message = q->queue[q->head++];
ret = 0; // q->head != q->tail表明还有消息,把ret 设置为0,后面需要根据ret的值设置是否其在全局消息队列
int head = q->head;
int tail = q->tail;
int cap = q->cap;
// 在上面我们假设次级消息队列一直不消费消息,累计到了tail == 64,导致了扩容
// 这里的情况刚好相反:次级消息队列一直消费,却没有新的消息到来
// 此时head一定会慢慢增加到最大容量,也就是说所有消息都被消费完了
// 这里把这种情况q->head设置为0,而不是主动去扩容
// 因为很可能这个服务不会有消息再来,不必为它申请新内存
// 而是把扩容的操作,统一到skynet_mq_push中,即如果有新的消息到来,且容量不足才扩容,
// 消费消息不应该导致扩容,做到功能分离
// 新的消息到来,上面的skynet_mq_push实现代码会进行自动扩容,代码如下:
// 此时tail == head == 0,触发扩容
// if (q->head == q->tail) {
// expand_queue(q); // 扩大次级消息队列的容量
// }
// 将head指回最开始的地方
if (head >= cap) {
q->head = head = 0;
}
int length = tail - head; // 计算出消息的长度
if (length < 0) {
length += cap; // length小于0,表明发生了回溯,此时length = length(负数) + cap(画图就很好理解了)
}
while (length > q->overload_threshold) { // 假设消息长度大于设置的阈值
q->overload = length;
q->overload_threshold *= 2; // 把消息过载的阈值调大
}
} else {
// reset overload_threshold when queue is empty
q->overload_threshold = MQ_OVERLOAD; // 把消息过载的阈值调回默认值
}
if (ret) {
q->in_global = 0; // q->head == q->tail,表示没有消息了,退出全局消息队列
}
SPIN_UNLOCK(q)
return ret;
}
次级消息队列:扩容
5,最后,我们来看看队列扩容的实现代码:
static void
expand_queue(struct message_queue *q) {
struct skynet_message *new_queue = skynet_malloc(sizeof(struct skynet_message) * q->cap * 2); // 申请新的内存块
int i;
for (i=0;i<q->cap;i++) {
// 在这里只把没有读取的消息复制到新的内存
// 即只需要关注head即可,而不用管tail
// 为什么不用管tail呢?难道tail发生回溯,tail < head都不用管吗?
// 答案是肯定的,因为发生扩容的前提是: head == tail
// tail发生回溯,tail < head的情况,只会占用已经读取消息的位置
new_queue[i] = q->queue[(q->head + i) % q->cap];
// 那么为什么还要做 % q-cap的取模操作呢?
// 因为取模运算 (q->head + i) % q->cap 是为了确保索引在合法范围内,防止越界
// 对任意一个大于0的数K取模,它的结果都限定在[0,k-1]
// 这样即使(q->head + i)无论是什么数值,最后都会落在[0, q->cap -1]
}
q->head = 0;
// 此时就是 tail == q-cap,而不是新的q->cap
// 因为扩容本来的前提就是:head == tail
// head == tail意味着:
// 1,tail发生回溯,刚上了head,此时(head 到 队列尾部的长度)+ (回溯的tail 到 head的长度) == q->cap
// 2,消息一直不消费,head == 0,tail累计到64后,发生回溯,tail == 0
// 这个时候也是 head == 0 == tail,发生扩容,长度刚好等于 q->cap
q->tail = q->cap;
q->cap *= 2;
skynet_free(q->queue); // 释放掉旧的消息队列
q->queue = new_queue; // 获得新的消息队列内存
}
代码看到这里,心中的一个疑问随之也解开了:为什么全局消息队列使用列表的形式,而次级消息队列使用数组的形式呢?
我们可以看到全局消息队列中一个节点message_queue是属于某个服务的消息邮箱,而服务是不会经常性退出的,换句话说这个服务的消息邮箱存在时间会比较长,不会经常性的skynet_free,因此使用skynet_malloc分配内存作为节点,比较适用。而次级消息队列中,一旦来了一条消息就需要skynet_malloc分配内存,每条消息一旦读取完毕,就不再使用,是需要skynet_free释放的。考虑到消息的数量将会非常多,那么如果同样采用链表来存储数据的话,就需要经常分配内存和释放内存。因此次级消息队列一次性使用skynet_malloc获取一定数量的数组,还可以重复利用数组。等到head和tail使用完之后,也就是数组容量实在不够的时候,再统一skynet_free释放掉数组的内存,并skynet_malloc新的更大的数组。这样设计次级消息队列还有个好处,任何需要读取消息的线程,只需要读取数据的地址就行了,而不用负责释放这部分内存,而是统一交给expand_queue扩容时再释放即可。另外,全局消息对列是链表结构,也就没有扩容的操作
释放消息内存
6,那skynet_message结构体中,data的内存由谁释放呢?
struct skynet_message {
uint32_t source; // 消息来源
int session; // 消息编号
void * data; // 消息数据,这块内存由谁管理呢?
size_t sz; // 消息大小
};
这里就关涉到Skynet的另一个模块了:服务模块
简单来说:就是Skynet的服务模块,也就是工作线程,在执行skynet_mq_pop(q,&msg)后,就把次级队列的消息读取出来了
这个时候有段代码:
// 此段代码处于skynet_server.c文件,skynet_context_message_dispatch()函数的实现,文件第353行
if (ctx->cb == NULL) {
skynet_free(msg.data); // 由此可见,skynet_message结构体中的data由谁读取,就由谁释放
} else {
dispatch_message(ctx, &msg);
}
// 进入dispatch_message(ctx, &msg)函数的实现
// 此段代码处于skynet_server.c文件,dispatch_message()函数的实现,文件第302行
if (!reserve_msg) {
skynet_free(msg->data); // 也是由谁使用,就由谁释放
}
释放次级消息队列
7,现在我们还有个疑问,既然次级消息队列有释放内存的操作,也就是销毁次级队列,那么为什么不见全局消息队列的释放操作呢?
实际我们要销毁全局消息队列的某个节点,也就是销毁某个message_queue的时候,会调用以下代码来执行:
void
skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud) {
SPIN_LOCK(q) // 上锁
if (q->release) { // 被标记能释放 那是由谁标记呢?下面会讲到
SPIN_UNLOCK(q)
_drop_queue(q, drop_func, ud); // 把次级消息队列从全局消息队列中去除
} else {
skynet_globalmq_push(q); // 没有释放标记的话,为什么又把本该释放的的节点重新加入全局队列?
SPIN_UNLOCK(q)
}
}
让我们看看最后的执行代码:
static void
_release(struct message_queue *q) {
assert(q->next == NULL);
SPIN_DESTROY(q)
skynet_free(q->queue); // 因为次级消息队列的数组,也是skynet_malloc分配的,所以最后一定要通过skynet_free释放掉
skynet_free(q);// 最后再释放掉本身节点message_queue,因为在skynet_mq_create()中,也是通过skynet_malloc分配的message_queue
}
static void
_drop_queue(struct message_queue *q, message_drop drop_func, void *ud) {
struct skynet_message msg;
while(!skynet_mq_pop(q, &msg)) {
drop_func(&msg, ud);
}
_release(q);
}
辅助操作
8,剩下的就是辅助操作:skynet_mq_mark_release(),skynet_mq_length(),skynet_mq_overload()
skynet_mq_length()的作用比较简单,就是返回某个服务的消息队列的长度,实现如下:
int
skynet_mq_length(struct message_queue *q) {
int head, tail,cap;
SPIN_LOCK(q)
head = q->head;
tail = q->tail;
cap = q->cap;
SPIN_UNLOCK(q)
if (head <= tail) {
return tail - head;
}
return tail + cap - head;
}
skynet_mq_overload()的作用也简单,实现如下:
int
skynet_mq_overload(struct message_queue *q) {
if (q->overload) {
int overload = q->overload;
q->overload = 0;
return overload;
}
return 0;
}
服务退出流程
9,比较存有疑问的是:skynet_mq_mark_release()函数的作用是什么?另外根据代码:
void
skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud) {
SPIN_LOCK(q)
if (q->release) { // 为什么要先检查释放标记,才直接释放?这个标记是由谁标记的呢?
SPIN_UNLOCK(q)
_drop_queue(q, drop_func, ud); // 把次级消息队列从全局消息队列中去除
} else {
skynet_globalmq_push(q); // 没有释放标记的话,为什么又把本该释放的的节点重新加入全局队列?
SPIN_UNLOCK(q)
}
}
其实这个做法跟 服务退出 有很深的关系。服务和消息是很紧密的关系,每个服务一定有自己的消息队列,因此服务的退出,消息队列的销毁需要相互配合,妥善处理没有处理的消息,详细的bug问题可以看看这篇文章:云风的 BLOG: 记录一个并发引起的 bug (codingnow.com)
让我们看看服务的结构体:
struct skynet_context {
void * instance;
struct skynet_module * mod;
void * cb_ud;
skynet_cb cb;
struct message_queue *queue; // 我们只关注这个:此服务绑定了一个message_queue节点
ATOM_POINTER logfile;
uint64_t cpu_cost;
uint64_t cpu_start;
char result[32];
uint32_t handle;
int session_id;
ATOM_INT ref;
int message_count;
bool init;
bool endless;
bool profile;
CHECKCALLING_DECL
};
再回顾message_queue的结构:
struct message_queue {
struct spinlock lock;
uint32_t handle; // 我们只关注这个:标记本条消息队列属于哪个服务,记录该服务的handle
int cap;
int head;
int tail;
int release;
int in_global;
int overload;
int overload_threshold;
struct skynet_message *queue;
struct message_queue *next;
};
从上面可以知道
服务通过message_queue指针找到消息队列,消息队列通过handle找到服务
struct skynet_context *
skynet_handle_grab(uint32_t handle) { // 传入handle,查找到服务
struct handle_storage *s = H;
struct skynet_context * result = NULL;
rwlock_rlock(&s->lock);
uint32_t hash = handle & (s->slot_size-1);
struct skynet_context * ctx = s->slot[hash];
if (ctx && skynet_context_handle(ctx) == handle) {
result = ctx;
skynet_context_grab(result);
}
rwlock_runlock(&s->lock);
return result; // 返回服务地址
}
现在我们考虑下面的情况:
假设服务ctx需要退出,但是属于该服务的消息队列message_queue,被放在了global全局消息队列中了,请问谁来最后处理这个消息队列?
云风一开始的做法:当 ctx 销毁的那一刻,检查 mq 是否在 globalmq 中。如果不在,就重压入 globalmq 。如果在,就什么都不用做了。等工作线程从 globalmq 中取出 mq ,发现其中的 handle 找不到配对的 ctx 后,再将 mq 销毁掉。可以看到,在服务ctx退出时,只做了非常简单的检查操作,等于是把消息队列的销毁工作扔给了工作线程。(其实这么设计没毛病,这个消息队列本就该由消息模块去维护,而不归服务模块管)
问题出现在:handle 和 ctx 的绑定关系是在 ctx 模块外部实现的,当工作线程从 globalmq 中取出 mq ,发现 handle 找不到配对的 ctx ,立即将 mq 销毁掉了。但是:服务ctx并没有被销毁!简单说就是:handle—-ctx原来是绑定的,ctx服务退出的时候,把handle和ctx解绑了,但此时ctx因为别的原因,暂时没有被释放掉内存!ctx 还活着!(另一个工作线程还持有其引用),而持有这个 ctx 的工作线程可能正在它生命的最后一刻,向其发送消息,结果属于本服务的消息队列 mq 已经被销毁了,这导致出现了问题
解决办法:当 ctx 销毁时,会先检查其引用,如果此时ctx是最后一个引用,表明服务ctx可以被销毁,则由它向其 mq 设置一个清理标记。然后工作线程在 globalmq 取出 mq ,发现已经找不到 handle 对应的 ctx 时,先判断是否有清理标记,如果有则立即清理。如果没有,再将 mq 重放进 globalmq ,直到清理标记有效,再销毁 mq
让我们看看用户在lua层调用了skynet.exit()后发生了什么?
在用户层调用skynet.exit(),是通过在lua-skynet.c文件中的接口,lua层调用最终会调用到c层面的函数
static struct command_func cmd_funcs[] = {
{ "TIMEOUT", cmd_timeout },
{ "REG", cmd_reg },
{ "QUERY", cmd_query },
{ "NAME", cmd_name },
{ "EXIT", cmd_exit }, // 最终会调用这个退出函数
{ "KILL", cmd_kill },
{ "LAUNCH", cmd_launch },
{ "GETENV", cmd_getenv },
{ "SETENV", cmd_setenv },
{ "STARTTIME", cmd_starttime },
{ "ABORT", cmd_abort },
{ "MONITOR", cmd_monitor },
{ "STAT", cmd_stat },
{ "LOGON", cmd_logon },
{ "LOGOFF", cmd_logoff },
{ "SIGNAL", cmd_signal },
{ NULL, NULL },
};
static const char *
cmd_exit(struct skynet_context * context, const char * param) {
handle_exit(context, 0); // 直接调用handle_exit(),会传入是哪个服务context让哪个服务handle(就是传入的param)退出
return NULL;
}
static void
handle_exit(struct skynet_context * context, uint32_t handle) {
if (handle == 0) {
handle = context->handle;
skynet_error(context, "KILL self");// 自己杀自己
} else {
skynet_error(context, "KILL :%0x", handle); // 别人杀自己
}
if (G_NODE.monitor_exit) {
skynet_send(context, handle, G_NODE.monitor_exit, PTYPE_CLIENT, 0, NULL, 0); //给杀自己的人,发出自己退出的消息
}
skynet_handle_retire(handle); // 这里开始解绑handle和ctx的连接
}
// 这里正式来到 skynet_handle.c 文件中的 skynet_handle_retire() 函数
int
skynet_handle_retire(uint32_t handle) {
int ret = 0;
struct handle_storage *s = H; // 拿到服务管理的地址
rwlock_wlock(&s->lock); // 上锁,免得被人打扰
uint32_t hash = handle & (s->slot_size-1); // 计算出handle的hash值
struct skynet_context * ctx = s->slot[hash]; // 根据hash值拿到服务ctx
if (ctx != NULL && skynet_context_handle(ctx) == handle) { // 最后再确认一下,取出的ctx中的handle和传入的handle是否一致
s->slot[hash] = NULL; // 把存放 指向服务ctx地址 的指针置为空
ret = 1;
int i;
int j=0, n=s->name_count;
for (i=0; i<n; ++i) {
if (s->name[i].handle == handle) { // 假如这个服务注册了名字,也需要清理名字的内存
skynet_free(s->name[i].name);
continue;
} else if (i!=j) {
// 一旦清理了名字,那么下一步i++,就会空出一个位置
// 因此需要往前移动一格
s->name[j] = s->name[i];
}
++j;
}
s->name_count = j; // 最后更新现有命名的数量
} else {
ctx = NULL;
}
rwlock_wunlock(&s->lock); // 解锁
if (ctx) {
// release ctx may call skynet_handle_* , so wunlock first.
skynet_context_release(ctx); // 上面代码完成解绑,skynet_context_release()直接去释放ctx服务的内存
}
return ret;
}
服务退出的代码如下:
struct skynet_context *
skynet_context_release(struct skynet_context *ctx) {
if (ATOM_FDEC(&ctx->ref) == 1) { // 这个判断非常重要!!!
delete_context(ctx);
return NULL;
}
return ctx;
}
// 特别注意流程:用户调用skynet.exit() ---> handle_exit() ---> skynet_handle_retire()(在这一步解开了handle和ctx的关联)
// 最后由skynet_handle_retire() ---> skynet_context_release(),问题是skynet_context_release()并不一定会释放服务!
// skynet_context_release()为什么不直接释放服务呢?
// 因为skynet_context_release()会检查服务ctx的引用,只有当引用剩余1个时候
// 才会真正触发释放,如果引用不是ref==1,表明还有别的对象需要用到服务ctx
// 因此每个引用过服务ctx的对象,都会执行 skynet_context_release()
// 由最后遇到引用等于1的对象,真正释放服务ctx
// 这段代码很重要!!
// if (ATOM_FDEC(&ctx->ref) == 1) { // 很有可能别的服务引用了这个ctx,这就导致ref不会是0,则ctx此时没办法被释放掉!
// delete_context(ctx);
// return NULL;
// }
// 而上一步却已经解开了handle和ctx的关联,导致问题出现
static void
delete_context(struct skynet_context *ctx) {
FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile);
if (f) {
fclose(f);
}
skynet_module_instance_release(ctx->mod, ctx->instance); // 把服务加载的模块内存释放掉
skynet_mq_mark_release(ctx->queue);// 标记此服务需要退出
CHECKCALLING_DESTROY(ctx)
skynet_free(ctx); // 直接释放掉ctx的内存
context_dec(); // 同时告知服务的总数量G_NODE.total减少1
}
再看看工作线程找不到 handle 对应的 ctx 时的检查代码
struct message_queue *
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
if (q == NULL) {
q = skynet_globalmq_pop();
if (q==NULL)
return NULL;
}
uint32_t handle = skynet_mq_handle(q);
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) { // 找不到对应的ctx
struct drop_t d = { handle };
skynet_mq_release(q, drop_message, &d); // 进入skynet_mq_release判断,如果release不是1,会把这个消息队列重新压回队列
return skynet_globalmq_pop();
}
// 省略的代码
return q;
}
最后我们再回头来看看,就清楚了:
void
skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud) {
SPIN_LOCK(q) // 上锁
if (q->release) {
SPIN_UNLOCK(q)
_drop_queue(q, drop_func, ud);
} else {
skynet_globalmq_push(q); // 还不能释放,压回全局队列
SPIN_UNLOCK(q)
}
}
当用户命令ctx服务退出时候,先解开了handle和ctx的关联,明面上服务已经退出了,但实际服务ctx内存其实还存活
假设ctx服务的引用还有剩余,那么即使命令ctx服务退出,服务ctx内存也不会释放,而是等待着最后一个引用来释放
引用还存在时,命令ctx服务退出,ctx服务退出的时候只是把次级消息队列插入到全局消息队列里面
最后由工作线程取出次级消息队列,准备处理消息
当工作线程找不到handle和ctx的关系时,就去释放消息队列,但释放前检查一下,release是否被标记
当ctx服务的最后一个引用还是存在,delete_context不会被执行,release一直是0,消息队列就不能被释放,需要再次把它放入全局消息队列,等待下一次检查
当ctx服务的最后一个引用不存在了,delete_context被执行,release变为1,消息队列可以被释放,此时工作线程再真正地把消息队列销毁
假设有个很坏的情况,当ctx服务的最后一个引用因为某种原因,一直存在,消息队列将会一直被压入全局消息队列,一直检查
处理未读消息
10,另外前面说到的,哪个服务读取了skynet_meaasge消息,虽然不用负责skynet_meaasge结构体本身的释放,但是skynet_meaasge.data是需要读取该data的服务去释放的。那问题来了:当服务退出,消息队列也被工作线程销毁,谁妥善处理剩余没读取的消息呢?答案是:
struct message_queue *
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
if (q == NULL) {
q = skynet_globalmq_pop();
if (q==NULL)
return NULL;
}
uint32_t handle = skynet_mq_handle(q);
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
struct drop_t d = { handle };
// 注意这个drop_message函数
skynet_mq_release(q, drop_message, &d); // 进入skynet_mq_release判断,如果release不是1,会把这个消息队列重新压回队列
return skynet_globalmq_pop();
}
// 省略的代码
return q;
}
// 把drop_message作为drop_func传给skynet_mq_release
static void
drop_message(struct skynet_message *msg, void *ud) {
struct drop_t *d = ud;
skynet_free(msg->data); // 这里释放掉属于该服务消息的data
uint32_t source = d->handle;
assert(source);
// report error to the message source
skynet_send(NULL, source, msg->source, PTYPE_ERROR, 0, NULL, 0); // 向给自己发过消息的服务,都通知一下,自己退出了
}
void
skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud) {
SPIN_LOCK(q)
if (q->release) {
SPIN_UNLOCK(q)
_drop_queue(q, drop_func, ud); // 把drop_message作为drop_func传给_drop_queue
} else {
skynet_globalmq_push(q);
SPIN_UNLOCK(q)
}
}
static void
_drop_queue(struct message_queue *q, message_drop drop_func, void *ud) {
struct skynet_message msg;
while(!skynet_mq_pop(q, &msg)) {
drop_func(&msg, ud); // 不断地执行 drop_message() 函数,直至把所有消息消息处理完
}
_release(q);
}
可以看到服务模块和消息队列的销毁是紧密关联的,等到服务模块再详细讲解服务模块的东西
而消息队列的退出已经讲解清楚了
总结
到这里消息队列message_queue的全部内容就在这里了
全局消息队列由struct global_queue结构体进行管理,但组成的节点是message_queue
每一个message_queue节点都代表着一个服务的邮箱
多个服务,也就是多个message_queue节点,连接在一起构成了全局消息队列
在每个单独的message_queue节点中,又有次级消息队列
次级消息队列由message_queue.head和message_queue.tail管理
次级消息队列实质是一个数组,数组元素就是skynet_message结构体
使用数组的好处非常明显:
可以重复使用内存,并且不用经常申请或释放内存
减少迁移数据的频率,只在扩容数组时候才会发生
每一个数组元素skynet_message结构体都按顺序存放着发给此服务的消息
全局消息队列有创建(初始化struct global_queue结构体),插入节点,销毁节点的操作
次级消息队列有创建(分配数组内存),插入,扩容,销毁的操作
全局消息队列节点的销毁最简单,只是把次级消息队列pop出全局消息队列即可,并没有大的操作
而次级消息队列的销毁,本身就是因为服务的退出才会触发,因此跟服务模块紧密关联
次级消息队列的销毁需要严格按流程来,一步一步释放内存