Skynet源码之:进程启动(15)

JavenLaw

回顾

我们可以回过头来看看skynet之前的准备

​ 1,环境准备

​ 2,守护进程

​ 3,节点建立

​ 4,服务管理

​ 5,消息队列

​ 6,模块加载

​ 7,定时器

​ 8,监视器

​ 9,网络模块

​ 10,日志打印

在经过一系列步骤后,我们最终还是要把这些内容交由线程去执行

后面就会触发下面的内容

​ 11,service_logger

​ 12,service_sblua

​ 13,service_harbor

​ 14,服务的实现

现在就让我们带着以上的内容,开启我们的进程

让skynet正式跑起来


创建线程

让我们看看在哪里调用的线程创建

// skynet_server.c文件,289行
start(config->thread);

// 具体实现如下
// skynet_server.c文件,182行
static void
start(int thread) {
    // 可以看到,我们创建的线程数是:配置的线程字段 + 3
    // 是因为 config->thread 的含义是:worker线程的数量
    // 多的3个线程分别是:timer,socket,monitor
	pthread_t pid[thread+3];

    // 对monitor线程进程初始化
	struct monitor *m = skynet_malloc(sizeof(*m));
	memset(m, 0, sizeof(*m));
	m->count = thread;
	m->sleep = 0;
	// 为每个工作线程都创建一个monitor监视器
	m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *));
	int i;
	for (i=0;i<thread;i++) {
		m->m[i] = skynet_monitor_new();
	}
    
    // pthread_mutex_init函数的知识,详细见《Skynet专题之:线程》
	if (pthread_mutex_init(&m->mutex, NULL)) {
		fprintf(stderr, "Init mutex error");
		exit(1);
	}
    
    // pthread_cond_init函数的知识,详细见《Skynet专题之:线程》
	if (pthread_cond_init(&m->cond, NULL)) {
		fprintf(stderr, "Init cond error");
		exit(1);
	}

    // 直接先创建3个线程,给timer,socket,monitor
	create_thread(&pid[0], thread_monitor, m); // thread_monitor看代码段1
	create_thread(&pid[1], thread_timer, m); // thread_timer看代码段2
	create_thread(&pid[2], thread_socket, m); // thread_timer看代码段3
	
    // skynet为了最大化消息队列的执行效率
    // 即避免频繁地对全局队列加锁以及线程频繁切换服务
    // 对worker线程获取每个服务的处理消息的数量做了优化
	static int weight[] = { 
		-1, -1, -1, -1, 0, 0, 0, 0,
		1, 1, 1, 1, 1, 1, 1, 1, 
		2, 2, 2, 2, 2, 2, 2, 2, 
		3, 3, 3, 3, 3, 3, 3, 3, };
	struct worker_parm wp[thread];
    // 循环建立worker线程
	for (i=0;i<thread;i++) {
		wp[i].m = m;
		wp[i].id = i;
		if (i < sizeof(weight)/sizeof(weight[0])) {
			wp[i].weight= weight[i];
		} else {
			wp[i].weight = 0;
		}
		create_thread(&pid[i+3], thread_worker, &wp[i]);
	}

    // 最终开始启动线程
	for (i=0;i<thread+3;i++) {
		pthread_join(pid[i], NULL); 
	}

	free_monitor(m);
}

代码段1

// 在《Skynet源码之:监视器》中,有对本函数的说明
static void *
thread_monitor(void *p) {
	struct monitor * m = p;
	int i;
	int n = m->count;
	skynet_initthread(THREAD_MONITOR); // 初始化线程私有数据
	for (;;) {
		CHECK_ABORT
		for (i=0;i<n;i++) {
			skynet_monitor_check(m->m[i]); // 开始检查每一个worker线程是否陷入死循环
		}
		for (i=0;i<5;i++) {
			CHECK_ABORT
			sleep(1);
		}
	}

	return NULL;
}

代码段2

// 在《Skynet源码之:定时器》中,有对本函数的 详细说明
static void *
thread_timer(void *p) {
	struct monitor * m = p;
	skynet_initthread(THREAD_TIMER); // 初始化线程私有数据
	for (;;) {
		skynet_updatetime();
		skynet_socket_updatetime();
		CHECK_ABORT
		wakeup(m,m->count-1);
		usleep(2500);
		if (SIG) {
			signal_hup();
			SIG = 0;
		}
	}
	// wakeup socket thread
	skynet_socket_exit();
	// wakeup all worker thread
	pthread_mutex_lock(&m->mutex);
	m->quit = 1;
	pthread_cond_broadcast(&m->cond);
	pthread_mutex_unlock(&m->mutex);
	return NULL;
}

代码段3

// 在这里没有独立对socket线程进程分析
// 是因为《Skynet源码之:网络模块》+《Skynet源码service_gatete》+《Skynet专题之:网络》一共3个模块比较复杂
// 我还没完成这部分的内容,没有搞透彻,等待后面再来更新
// 
// 可以先简单理解为:
// 		skynet开启独立的网络线程来处理网络数据,并通知各个服务网路数据到达
// 		没有采用多线程的原因是单线程性能足够并且可以减少加锁竞争等
static void *
thread_socket(void *p) {
	struct monitor * m = p;
	skynet_initthread(THREAD_SOCKET); // 初始化线程私有数据
	for (;;) {
		int r = skynet_socket_poll();
		if (r==0)
			break;
		if (r<0) {
			CHECK_ABORT
			continue;
		}
		wakeup(m,0);
	}
	return NULL;
}


worker线程

其实真正干活最多的还是worker线程,可以说是真正运行服务的

我们来看看代码

static void *
thread_worker(void *p) {
	struct worker_parm *wp = p;
	int id = wp->id;
	int weight = wp->weight;
	struct monitor *m = wp->m;
	struct skynet_monitor *sm = m->m[id]; // 获取到线程对应的monitor监视器
	skynet_initthread(THREAD_WORKER); // 初始化线程数据
	struct message_queue * q = NULL;
    
    // 正式进入无限循环
	while (!m->quit) {
        // worker线程开始不断地从全局消息队列中,获取对应服务的次级消息队列
		q = skynet_context_message_dispatch(sm, q, weight);
		if (q == NULL) {
			if (pthread_mutex_lock(&m->mutex) == 0) {
				++ m->sleep;
				// "spurious wakeup" is harmless,
				// because skynet_context_message_dispatch() can be call at any time.
				if (!m->quit)
					pthread_cond_wait(&m->cond, &m->mutex);
				-- m->sleep;
				if (pthread_mutex_unlock(&m->mutex)) {
					fprintf(stderr, "unlock mutex error");
					exit(1);
				}
			}
		}
	}
	return NULL;
}

看看 skynet_context_message_dispatch() 函数的实现

skynet_context_message_dispatch() 函数可以说是整个skynet设计思路的重要实践

即:通过启动成百上千的服务,给worker线程轮流执行服务的消息

struct message_queue * 
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
	// 从全局消息队列获取某个服务的次级消息队列
    // 一旦获取到,表明这个服务即将被线程执行
    if (q == NULL) {
		q = skynet_globalmq_pop();
		if (q==NULL)
			return NULL;
	}

    // 获取到次级消息队列对应的handle
    uint32_t handle = skynet_mq_handle(q);

    // 获取到该服务的实例 skynet_context
    struct skynet_context * ctx = skynet_handle_grab(handle);
    if (ctx == NULL) {
        // 如果没有找到服务实例ctx,需要把这个服务引用释放掉
        // 详细见《Skynet源码之:消息队列》
        struct drop_t d = { handle };
        skynet_mq_release(q, drop_message, &d);
        // 立马再次从全局消息队列获取一个次级消息队列
        // 这样线程就不会空跑一次,而是立马投入到下一个服务消息的执行中去
        return skynet_globalmq_pop();
    }

    int i,n=1;
    struct skynet_message msg;

    for (i=0;i<n;i++) {
        // 从该服务的次级消息队列中取一个消息出来进行处理
        // 假如次级消息队列没有数据,则立即释放此次引用
        // 并且线程立即再次从全局消息队列获取新的一个服务
        // 免得无功而返
        if (skynet_mq_pop(q,&msg)) {
            skynet_context_release(ctx);
            return skynet_globalmq_pop();
        } else if (i==0 && weight >= 0) {
            n = skynet_mq_length(q);
            n >>= weight;
        }
        
        // 检查此服务的消息数量是否过载
        int overload = skynet_mq_overload(q);
        if (overload) {
            skynet_error(ctx, "May overload, message queue length = %d", overload);
        }

        // 触发监视器,详细见《Skynet源码之:监视器》
        skynet_monitor_trigger(sm, msg.source , handle);

        if (ctx->cb == NULL) { // 不存在回调函数,需要把此消息的内存释放
            skynet_free(msg.data);
        } else {
            // 在这里正式开始处理具体的消息和任务
            dispatch_message(ctx, &msg);
        }

        // 解除监视器,详细见《Skynet源码之:监视器》
        skynet_monitor_trigger(sm, 0,0);
    }

    assert(q == ctx->queue);
    struct message_queue *nq = skynet_globalmq_pop();
    if (nq) {
        // If global mq is not empty , push q back, and return next queue (nq)
        // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
        skynet_globalmq_push(q);
        q = nq;
    } 
    skynet_context_release(ctx);

    return q;
}

继续看 dispatch_message() 的实现

static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
	assert(ctx->init);
	CHECKCALLING_BEGIN(ctx)
    // 设置线程私有数据,进而绑定了 线程-服务handle的关系
	pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
	int type = msg->sz >> MESSAGE_TYPE_SHIFT;
	size_t sz = msg->sz & MESSAGE_TYPE_MASK;
    // 这个logfile就是《Skynet源码之:日志打印》
	FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile);
	if (f) {
		skynet_log_output(f, msg->source, type, msg->session, msg->data, sz);
	}
	++ctx->message_count;
	int reserve_msg;
    // 这个profile就是《Skynet源码之:性能分析》
	if (ctx->profile) {
		ctx->cpu_start = skynet_thread_time();
		reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
		uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
		ctx->cpu_cost += cost_time;
	} else {
    // 这个最终就是服务的具体执行
    // 详细看《Skynet源码之:service_snlua》和《Skynet源码之:服务实现》
		reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
	}
	if (!reserve_msg) {
		skynet_free(msg->data);
	}
	CHECKCALLING_END(ctx)
}


总结

其实有了前面这么多部分的铺垫,进程启动这部分已经非常简单了

主要的内容其实都是前面所说的

更应该关注《Skynet源码之:service_snlua》和《Skynet源码之:服务实现》