Skynet源码之:监视器(10)

JavenLaw

monitor初始化

1,首先让我们看看监视器monitor在那里调用

监视器monitor不像 服务管理handle_storage,消息队列message_queue,模块加载modules 等具有复杂的结构,它的功能和流程比较简单。因此没有特别的函数用于监视器monitor的初始化。像服务管理有skynet_handle_init(),消息队列有skynet_mq_init(),模块加载有skynet_module_init(),定时器模块也有属于自己的skynet_timer_init(),网络模块则有skynet_socket_init()

因此监视器monitor的初始化,就发生在被第一次调用时才进行。在skynet_start.c文件,第 186 行

// config->thread 又是通过skynet_main.c 中读取启动配置获得的
// thread == config->thread
// 因此 thread 就是启动配置的数值

struct monitor *m = skynet_malloc(sizeof(*m)); // 申请内存
memset(m, 0, sizeof(*m)); // 把内存都置为0
m->count = thread; // 把开启的worker线程数记录在count
m->sleep = 0; // 初始值为0

m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *)); // 很显然,老做法了
// 申请一段内存,实际就是一个数组,数组大小为需要监控的线程数量
// 数组的元素是什么呢?是指针
// 指针的类型是什么呢?指向 struct skynet_monitor 结构体的地址
int i;
for (i=0;i<thread;i++) {
	m->m[i] = skynet_monitor_new(); // 最后为每个数组分配一个 指向struct skynet_monitor 结构体的指针
}

让我们来看看struct monitor结构体是什么?

struct monitor {
	int count;// 需要监控线程的数量
	struct skynet_monitor ** m; // 指向 存放struct skynet_monitor *指针 地址的指针,属于二级指针
    // 即m指向的地址,里面存放的是一个指针数组的开头
    // 而数组元素里面存放的指针,才实际指向 struct skynet_monitor 的地址
	pthread_cond_t cond;
	pthread_mutex_t mutex;
	int sleep;
	int quit;
};


skynet_monitor结构体

2,那skynet_monitor_new()返回了啥呢?

肯定就是 struct skynet_monitor结构体 的地址啦

struct skynet_monitor * 
skynet_monitor_new() {
	struct skynet_monitor * ret = skynet_malloc(sizeof(*ret)); // 申请一块内存
	memset(ret, 0, sizeof(*ret)); // 把所有值都设为 0 
	return ret;
}

那struct skynet_monitor结构体又是啥?

struct skynet_monitor {
	ATOM_INT version; // 基础版本,0
	int check_version; // 检查版本,0
	uint32_t source; // 值为0
	uint32_t destination; // 值为0
};

就这么简单


监视器作用

3,所以,我们该怎么使用这个监视器monitor呢?有什么作用呢?

在回答这个问题前,我们先来看看 监视器monitor 其他的2个操作

因为在刚开始初始化的时候,需要监控的线程数就是固定的,也就说worker线程一旦启动,它的数量就确定了

也就是说不需要像服务管理handle_storage,消息队列message_queue动态增加对象,因此monitor 只剩下释放监控这个选择,接口如下:

void 
skynet_monitor_delete(struct skynet_monitor *sm) {
	skynet_free(sm); // free掉监控的线程所属的skynet_monitor就行了
}

另外一个比较重要的就是,触发监控,也就是把 monitor 打开,代码如下

void 
skynet_monitor_trigger(struct skynet_monitor *sm, uint32_t source, uint32_t destination) {
	sm->source = source; // 记录消息的源地址
	sm->destination = destination; // 记录消息的目的地址
	ATOM_FINC(&sm->version);
}

让我们看看在那里触发的:

struct message_queue * 
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
	
	// .....代码省略
	
	for (i=0;i<n;i++) {
		if (skynet_mq_pop(q,&msg)) {
			skynet_context_release(ctx);
			return skynet_globalmq_pop();
		} else if (i==0 && weight >= 0) {
			n = skynet_mq_length(q);
			n >>= weight;
		}
		int overload = skynet_mq_overload(q);
		if (overload) {
			skynet_error(ctx, "May overload, message queue length = %d", overload);
		}
		
        // 注意看这段代码:在消息处理前执行
		// 把 source 置为消息的source,即消息的源地址服务
        // 把 destination 置为消息的目的地址,即本服务
        // 并且把 sm->version 基础版本的值+1
		skynet_monitor_trigger(sm, msg.source, handle);

        // 此时时间进入 消息处理时段
		if (ctx->cb == NULL) {
			skynet_free(msg.data);
		} else {
			dispatch_message(ctx, &msg);
		}
        // 注意看这段代码:在消息处理后执行
        // 而skynet_monitor_trigger(sm, 0, 0)的作用是
        // 把 source 置为0,destination置为0
        // 并且把 sm->version 基础版本的值+1
        // 为什么要这么做呢?接着往下看
		skynet_monitor_trigger(sm, 0, 0);
	}
    
    // .....代码省略
	
	return q;

}

monitor的设置为什么要这么做呢?为什么要设置2遍 skynet_monitor_trigger() 呢? 接着往下看:


monitor线程的启动

4,monitor线程的启动!

在skynet_start.c中,第205行,monitor线程启动了!让我们看看实现的代码:

// 调用接口
struct monitor *m;
create_thread(&pid[0], thread_monitor, m);

static void *
thread_monitor(void *p) {
	struct monitor * m = p;
	int i;
	int n = m->count; // 获得需要监控的线程数量
	skynet_initthread(THREAD_MONITOR);
	for (;;) { // 死循环检测
		CHECK_ABORT
		for (i=0;i<n;i++) {
			skynet_monitor_check(m->m[i]); // 调用检测函数,检查worker线程是否陷入死循环
		}
        // 这里等待5秒,也就是5秒之后再检查worker线程
        // 为什么不是直接 sleep(5) ?
        // CHECK_ABORT 又是什么?  
        // #define CHECK_ABORT if (skynet_context_total()==0) break; 即
        // if (skynet_context_total()==0) 
        // 	{
        //		break;
    	//	}
        // 也就是说,没有服务的时候,monitor就不用检测了,可以退出
        // 因此需要for来循环5秒,这样万一(skynet_context_total()==0)才能及时退出
        // 如果直接sleep(5),会导致5秒之后才退出
		for (i=0;i<5;i++) {
			CHECK_ABORT
			sleep(1);
		}
	}

	return NULL;
}

看看skynet_monitor_check()的实现

void 
skynet_monitor_check(struct skynet_monitor *sm) {
	if (sm->version == sm->check_version) {// 版本没更新,出现问题了
		if (sm->destination) {
			skynet_context_endless(sm->destination); // 发现服务进入死循环,调用 服务的退出逻辑,具体代码看 服务模块
			skynet_error(NULL, "A message from [ :%08x ] to [ :%08x ] maybe in an endless loop (version = %d)",
                         sm->source , sm->destination, sm->version);
            // 特别注意看:A message from [ :%08x ] to [ :%08x ]
            // 如果真的打出这句话,这意味着第二个的 [ :%08x ] 服务才是出问题的服务
            // 第一个 [ :%08x ] 只是发出请求消息的服务
		}
	} else {
		sm->check_version = sm->version;
	}
}

第1次调用 skynet_monitor_trigger(sm, msg.source, handle) 的时候,进入函数,此时的变化就像上面说的:

把 source 置为消息的source,把 destination 置为本服务的handle,并且把 sm->version 基础版本的值+1

随着monitor线程5秒到了,就会触发skynet_monitor_check()函数

因为 source != destination ,则把 sm->check_version = sm->version 置为相同

又随着monitor线程5秒到了,再次触发skynet_monitor_check()函数,有2种情况:

情况1:

worker线程正在处理消息,并且5秒还没返回,source == destination ,并且 source == destination 并且值不等于0

此时立马打印worker线程可能陷入死循环的警告

情况2:

worker线程处理完了消息,并返回,再次调用skynet_monitor_trigger(sm, 0, 0),重置 source 和 destination 为0

这个时候虽然 source == destination 但值等于 0,不会触发警告

其实这个设计这样理解就很清晰:

服务第一次调用 skynet_monitor_trigger(sm, msg.source, handle) 等于给 worker线程 处理此条消息设置了一个定时器开始倒计时

如果在下一次monitor来检查前(执行skynet_monitor_check()前),worker线程已经把消息处理完了

worker线程 自然会 调用skynet_monitor_trigger(sm, 0, 0) 把这个定时器销毁

这个时候monitor来检查,就不会发出告警

如果在下一次monitor来检查前(执行skynet_monitor_check()前),worker线程还在处理消息(则很可能陷入死循环了)

这个时候monitor来检查,就会把sm->check_version = sm->version

等monitor又一次检查,就会触发 sm->check_version == sm->version,从而发出告警

极端情况:在监视器monitor刚检查完,就来了消息使得 source != destination,此时:sm->version != sm->check_version

需要等到5秒后,monitor才会把:sm->check_version = sm->version

又过了5秒后,monitor才检查到:sm->check_version = sm->version,并且不为0,这时候才会发出警告

也就是说最迟可能需要等待10秒,监视器monitor才会发现这个线程已经进入死循环

这个也是合理的设置:任何请求消息的处理,都不太可能超过10s,如果超过10s就意味着上层业务逻辑出现了重大问题


总结

到这里监视器monitor的全部内容就在这里了

struct monitor监视器的字段很简单,主要是记录一下监控的线程数 和 用来实现监控的结构体数组

这个数组的元素就是struct skynet_monitor,而struct skynet_monitor是实现监控的关键

数组的数量则是工作线程的数量

特别注意:

struct monitor监视器只会监控worker线程

而工作线程也只会在处理消息的时候调用监控