monitor初始化
1,首先让我们看看监视器monitor在那里调用
监视器monitor不像 服务管理handle_storage,消息队列message_queue,模块加载modules 等具有复杂的结构,它的功能和流程比较简单。因此没有特别的函数用于监视器monitor的初始化。像服务管理有skynet_handle_init(),消息队列有skynet_mq_init(),模块加载有skynet_module_init(),定时器模块也有属于自己的skynet_timer_init(),网络模块则有skynet_socket_init()
因此监视器monitor的初始化,就发生在被第一次调用时才进行。在skynet_start.c文件,第 186 行
// config->thread 又是通过skynet_main.c 中读取启动配置获得的
// thread == config->thread
// 因此 thread 就是启动配置的数值
struct monitor *m = skynet_malloc(sizeof(*m)); // 申请内存
memset(m, 0, sizeof(*m)); // 把内存都置为0
m->count = thread; // 把开启的worker线程数记录在count
m->sleep = 0; // 初始值为0
m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *)); // 很显然,老做法了
// 申请一段内存,实际就是一个数组,数组大小为需要监控的线程数量
// 数组的元素是什么呢?是指针
// 指针的类型是什么呢?指向 struct skynet_monitor 结构体的地址
int i;
for (i=0;i<thread;i++) {
m->m[i] = skynet_monitor_new(); // 最后为每个数组分配一个 指向struct skynet_monitor 结构体的指针
}
让我们来看看struct monitor结构体是什么?
struct monitor {
int count;// 需要监控线程的数量
struct skynet_monitor ** m; // 指向 存放struct skynet_monitor *指针 地址的指针,属于二级指针
// 即m指向的地址,里面存放的是一个指针数组的开头
// 而数组元素里面存放的指针,才实际指向 struct skynet_monitor 的地址
pthread_cond_t cond;
pthread_mutex_t mutex;
int sleep;
int quit;
};
skynet_monitor结构体
2,那skynet_monitor_new()返回了啥呢?
肯定就是 struct skynet_monitor结构体 的地址啦
struct skynet_monitor *
skynet_monitor_new() {
struct skynet_monitor * ret = skynet_malloc(sizeof(*ret)); // 申请一块内存
memset(ret, 0, sizeof(*ret)); // 把所有值都设为 0
return ret;
}
那struct skynet_monitor结构体又是啥?
struct skynet_monitor {
ATOM_INT version; // 基础版本,0
int check_version; // 检查版本,0
uint32_t source; // 值为0
uint32_t destination; // 值为0
};
就这么简单
监视器作用
3,所以,我们该怎么使用这个监视器monitor呢?有什么作用呢?
在回答这个问题前,我们先来看看 监视器monitor 其他的2个操作
因为在刚开始初始化的时候,需要监控的线程数就是固定的,也就说worker线程一旦启动,它的数量就确定了
也就是说不需要像服务管理handle_storage,消息队列message_queue动态增加对象,因此monitor 只剩下释放监控这个选择,接口如下:
void
skynet_monitor_delete(struct skynet_monitor *sm) {
skynet_free(sm); // free掉监控的线程所属的skynet_monitor就行了
}
另外一个比较重要的就是,触发监控,也就是把 monitor 打开,代码如下
void
skynet_monitor_trigger(struct skynet_monitor *sm, uint32_t source, uint32_t destination) {
sm->source = source; // 记录消息的源地址
sm->destination = destination; // 记录消息的目的地址
ATOM_FINC(&sm->version);
}
让我们看看在那里触发的:
struct message_queue *
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
// .....代码省略
for (i=0;i<n;i++) {
if (skynet_mq_pop(q,&msg)) {
skynet_context_release(ctx);
return skynet_globalmq_pop();
} else if (i==0 && weight >= 0) {
n = skynet_mq_length(q);
n >>= weight;
}
int overload = skynet_mq_overload(q);
if (overload) {
skynet_error(ctx, "May overload, message queue length = %d", overload);
}
// 注意看这段代码:在消息处理前执行
// 把 source 置为消息的source,即消息的源地址服务
// 把 destination 置为消息的目的地址,即本服务
// 并且把 sm->version 基础版本的值+1
skynet_monitor_trigger(sm, msg.source, handle);
// 此时时间进入 消息处理时段
if (ctx->cb == NULL) {
skynet_free(msg.data);
} else {
dispatch_message(ctx, &msg);
}
// 注意看这段代码:在消息处理后执行
// 而skynet_monitor_trigger(sm, 0, 0)的作用是
// 把 source 置为0,destination置为0
// 并且把 sm->version 基础版本的值+1
// 为什么要这么做呢?接着往下看
skynet_monitor_trigger(sm, 0, 0);
}
// .....代码省略
return q;
}
monitor的设置为什么要这么做呢?为什么要设置2遍 skynet_monitor_trigger() 呢? 接着往下看:
monitor线程的启动
4,monitor线程的启动!
在skynet_start.c中,第205行,monitor线程启动了!让我们看看实现的代码:
// 调用接口
struct monitor *m;
create_thread(&pid[0], thread_monitor, m);
static void *
thread_monitor(void *p) {
struct monitor * m = p;
int i;
int n = m->count; // 获得需要监控的线程数量
skynet_initthread(THREAD_MONITOR);
for (;;) { // 死循环检测
CHECK_ABORT
for (i=0;i<n;i++) {
skynet_monitor_check(m->m[i]); // 调用检测函数,检查worker线程是否陷入死循环
}
// 这里等待5秒,也就是5秒之后再检查worker线程
// 为什么不是直接 sleep(5) ?
// CHECK_ABORT 又是什么?
// #define CHECK_ABORT if (skynet_context_total()==0) break; 即
// if (skynet_context_total()==0)
// {
// break;
// }
// 也就是说,没有服务的时候,monitor就不用检测了,可以退出
// 因此需要for来循环5秒,这样万一(skynet_context_total()==0)才能及时退出
// 如果直接sleep(5),会导致5秒之后才退出
for (i=0;i<5;i++) {
CHECK_ABORT
sleep(1);
}
}
return NULL;
}
看看skynet_monitor_check()的实现
void
skynet_monitor_check(struct skynet_monitor *sm) {
if (sm->version == sm->check_version) {// 版本没更新,出现问题了
if (sm->destination) {
skynet_context_endless(sm->destination); // 发现服务进入死循环,调用 服务的退出逻辑,具体代码看 服务模块
skynet_error(NULL, "A message from [ :%08x ] to [ :%08x ] maybe in an endless loop (version = %d)",
sm->source , sm->destination, sm->version);
// 特别注意看:A message from [ :%08x ] to [ :%08x ]
// 如果真的打出这句话,这意味着第二个的 [ :%08x ] 服务才是出问题的服务
// 第一个 [ :%08x ] 只是发出请求消息的服务
}
} else {
sm->check_version = sm->version;
}
}
第1次调用 skynet_monitor_trigger(sm, msg.source, handle) 的时候,进入函数,此时的变化就像上面说的:
把 source 置为消息的source,把 destination 置为本服务的handle,并且把 sm->version 基础版本的值+1
随着monitor线程5秒到了,就会触发skynet_monitor_check()函数
因为 source != destination ,则把 sm->check_version = sm->version 置为相同
又随着monitor线程5秒到了,再次触发skynet_monitor_check()函数,有2种情况:
情况1:
worker线程正在处理消息,并且5秒还没返回,source == destination ,并且 source == destination 并且值不等于0
此时立马打印worker线程可能陷入死循环的警告
情况2:
worker线程处理完了消息,并返回,再次调用skynet_monitor_trigger(sm, 0, 0),重置 source 和 destination 为0
这个时候虽然 source == destination 但值等于 0,不会触发警告
其实这个设计这样理解就很清晰:
服务第一次调用 skynet_monitor_trigger(sm, msg.source, handle) 等于给 worker线程 处理此条消息设置了一个定时器开始倒计时
如果在下一次monitor来检查前(执行skynet_monitor_check()前),worker线程已经把消息处理完了
worker线程 自然会 调用skynet_monitor_trigger(sm, 0, 0) 把这个定时器销毁
这个时候monitor来检查,就不会发出告警
如果在下一次monitor来检查前(执行skynet_monitor_check()前),worker线程还在处理消息(则很可能陷入死循环了)
这个时候monitor来检查,就会把sm->check_version = sm->version
等monitor又一次检查,就会触发 sm->check_version == sm->version,从而发出告警
极端情况:在监视器monitor刚检查完,就来了消息使得 source != destination,此时:sm->version != sm->check_version
需要等到5秒后,monitor才会把:sm->check_version = sm->version
又过了5秒后,monitor才检查到:sm->check_version = sm->version,并且不为0,这时候才会发出警告
也就是说最迟可能需要等待10秒,监视器monitor才会发现这个线程已经进入死循环
这个也是合理的设置:任何请求消息的处理,都不太可能超过10s,如果超过10s就意味着上层业务逻辑出现了重大问题
总结
到这里监视器monitor的全部内容就在这里了
struct monitor监视器的字段很简单,主要是记录一下监控的线程数 和 用来实现监控的结构体数组
这个数组的元素就是struct skynet_monitor,而struct skynet_monitor是实现监控的关键
数组的数量则是工作线程的数量
特别注意:
struct monitor监视器只会监控worker线程
而工作线程也只会在处理消息的时候调用监控