回顾
我们可以回过头来看看skynet之前的准备
1,环境准备
2,守护进程
3,节点建立
4,服务管理
5,消息队列
6,模块加载
7,定时器
8,监视器
9,网络模块
10,日志打印
在经过一系列步骤后,我们最终还是要把这些内容交由线程去执行
后面就会触发下面的内容
11,service_logger
12,service_sblua
13,service_harbor
14,服务的实现
现在就让我们带着以上的内容,开启我们的进程
让skynet正式跑起来
创建线程
让我们看看在哪里调用的线程创建
// skynet_server.c文件,289行
start(config->thread);
// 具体实现如下
// skynet_server.c文件,182行
static void
start(int thread) {
// 可以看到,我们创建的线程数是:配置的线程字段 + 3
// 是因为 config->thread 的含义是:worker线程的数量
// 多的3个线程分别是:timer,socket,monitor
pthread_t pid[thread+3];
// 对monitor线程进程初始化
struct monitor *m = skynet_malloc(sizeof(*m));
memset(m, 0, sizeof(*m));
m->count = thread;
m->sleep = 0;
// 为每个工作线程都创建一个monitor监视器
m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *));
int i;
for (i=0;i<thread;i++) {
m->m[i] = skynet_monitor_new();
}
// pthread_mutex_init函数的知识,详细见《Skynet专题之:线程》
if (pthread_mutex_init(&m->mutex, NULL)) {
fprintf(stderr, "Init mutex error");
exit(1);
}
// pthread_cond_init函数的知识,详细见《Skynet专题之:线程》
if (pthread_cond_init(&m->cond, NULL)) {
fprintf(stderr, "Init cond error");
exit(1);
}
// 直接先创建3个线程,给timer,socket,monitor
create_thread(&pid[0], thread_monitor, m); // thread_monitor看代码段1
create_thread(&pid[1], thread_timer, m); // thread_timer看代码段2
create_thread(&pid[2], thread_socket, m); // thread_timer看代码段3
// skynet为了最大化消息队列的执行效率
// 即避免频繁地对全局队列加锁以及线程频繁切换服务
// 对worker线程获取每个服务的处理消息的数量做了优化
static int weight[] = {
-1, -1, -1, -1, 0, 0, 0, 0,
1, 1, 1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3, };
struct worker_parm wp[thread];
// 循环建立worker线程
for (i=0;i<thread;i++) {
wp[i].m = m;
wp[i].id = i;
if (i < sizeof(weight)/sizeof(weight[0])) {
wp[i].weight= weight[i];
} else {
wp[i].weight = 0;
}
create_thread(&pid[i+3], thread_worker, &wp[i]);
}
// 最终开始启动线程
for (i=0;i<thread+3;i++) {
pthread_join(pid[i], NULL);
}
free_monitor(m);
}
代码段1
// 在《Skynet源码之:监视器》中,有对本函数的说明
static void *
thread_monitor(void *p) {
struct monitor * m = p;
int i;
int n = m->count;
skynet_initthread(THREAD_MONITOR); // 初始化线程私有数据
for (;;) {
CHECK_ABORT
for (i=0;i<n;i++) {
skynet_monitor_check(m->m[i]); // 开始检查每一个worker线程是否陷入死循环
}
for (i=0;i<5;i++) {
CHECK_ABORT
sleep(1);
}
}
return NULL;
}
代码段2
// 在《Skynet源码之:定时器》中,有对本函数的 详细说明
static void *
thread_timer(void *p) {
struct monitor * m = p;
skynet_initthread(THREAD_TIMER); // 初始化线程私有数据
for (;;) {
skynet_updatetime();
skynet_socket_updatetime();
CHECK_ABORT
wakeup(m,m->count-1);
usleep(2500);
if (SIG) {
signal_hup();
SIG = 0;
}
}
// wakeup socket thread
skynet_socket_exit();
// wakeup all worker thread
pthread_mutex_lock(&m->mutex);
m->quit = 1;
pthread_cond_broadcast(&m->cond);
pthread_mutex_unlock(&m->mutex);
return NULL;
}
代码段3
// 在这里没有独立对socket线程进程分析
// 是因为《Skynet源码之:网络模块》+《Skynet源码service_gatete》+《Skynet专题之:网络》一共3个模块比较复杂
// 我还没完成这部分的内容,没有搞透彻,等待后面再来更新
//
// 可以先简单理解为:
// skynet开启独立的网络线程来处理网络数据,并通知各个服务网路数据到达
// 没有采用多线程的原因是单线程性能足够并且可以减少加锁竞争等
static void *
thread_socket(void *p) {
struct monitor * m = p;
skynet_initthread(THREAD_SOCKET); // 初始化线程私有数据
for (;;) {
int r = skynet_socket_poll();
if (r==0)
break;
if (r<0) {
CHECK_ABORT
continue;
}
wakeup(m,0);
}
return NULL;
}
worker线程
其实真正干活最多的还是worker线程,可以说是真正运行服务的
我们来看看代码
static void *
thread_worker(void *p) {
struct worker_parm *wp = p;
int id = wp->id;
int weight = wp->weight;
struct monitor *m = wp->m;
struct skynet_monitor *sm = m->m[id]; // 获取到线程对应的monitor监视器
skynet_initthread(THREAD_WORKER); // 初始化线程数据
struct message_queue * q = NULL;
// 正式进入无限循环
while (!m->quit) {
// worker线程开始不断地从全局消息队列中,获取对应服务的次级消息队列
q = skynet_context_message_dispatch(sm, q, weight);
if (q == NULL) {
if (pthread_mutex_lock(&m->mutex) == 0) {
++ m->sleep;
// "spurious wakeup" is harmless,
// because skynet_context_message_dispatch() can be call at any time.
if (!m->quit)
pthread_cond_wait(&m->cond, &m->mutex);
-- m->sleep;
if (pthread_mutex_unlock(&m->mutex)) {
fprintf(stderr, "unlock mutex error");
exit(1);
}
}
}
}
return NULL;
}
看看 skynet_context_message_dispatch() 函数的实现
skynet_context_message_dispatch() 函数可以说是整个skynet设计思路的重要实践
即:通过启动成百上千的服务,给worker线程轮流执行服务的消息
struct message_queue *
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
// 从全局消息队列获取某个服务的次级消息队列
// 一旦获取到,表明这个服务即将被线程执行
if (q == NULL) {
q = skynet_globalmq_pop();
if (q==NULL)
return NULL;
}
// 获取到次级消息队列对应的handle
uint32_t handle = skynet_mq_handle(q);
// 获取到该服务的实例 skynet_context
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
// 如果没有找到服务实例ctx,需要把这个服务引用释放掉
// 详细见《Skynet源码之:消息队列》
struct drop_t d = { handle };
skynet_mq_release(q, drop_message, &d);
// 立马再次从全局消息队列获取一个次级消息队列
// 这样线程就不会空跑一次,而是立马投入到下一个服务消息的执行中去
return skynet_globalmq_pop();
}
int i,n=1;
struct skynet_message msg;
for (i=0;i<n;i++) {
// 从该服务的次级消息队列中取一个消息出来进行处理
// 假如次级消息队列没有数据,则立即释放此次引用
// 并且线程立即再次从全局消息队列获取新的一个服务
// 免得无功而返
if (skynet_mq_pop(q,&msg)) {
skynet_context_release(ctx);
return skynet_globalmq_pop();
} else if (i==0 && weight >= 0) {
n = skynet_mq_length(q);
n >>= weight;
}
// 检查此服务的消息数量是否过载
int overload = skynet_mq_overload(q);
if (overload) {
skynet_error(ctx, "May overload, message queue length = %d", overload);
}
// 触发监视器,详细见《Skynet源码之:监视器》
skynet_monitor_trigger(sm, msg.source , handle);
if (ctx->cb == NULL) { // 不存在回调函数,需要把此消息的内存释放
skynet_free(msg.data);
} else {
// 在这里正式开始处理具体的消息和任务
dispatch_message(ctx, &msg);
}
// 解除监视器,详细见《Skynet源码之:监视器》
skynet_monitor_trigger(sm, 0,0);
}
assert(q == ctx->queue);
struct message_queue *nq = skynet_globalmq_pop();
if (nq) {
// If global mq is not empty , push q back, and return next queue (nq)
// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
skynet_globalmq_push(q);
q = nq;
}
skynet_context_release(ctx);
return q;
}
继续看 dispatch_message() 的实现
static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
assert(ctx->init);
CHECKCALLING_BEGIN(ctx)
// 设置线程私有数据,进而绑定了 线程-服务handle的关系
pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
int type = msg->sz >> MESSAGE_TYPE_SHIFT;
size_t sz = msg->sz & MESSAGE_TYPE_MASK;
// 这个logfile就是《Skynet源码之:日志打印》
FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile);
if (f) {
skynet_log_output(f, msg->source, type, msg->session, msg->data, sz);
}
++ctx->message_count;
int reserve_msg;
// 这个profile就是《Skynet源码之:性能分析》
if (ctx->profile) {
ctx->cpu_start = skynet_thread_time();
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
ctx->cpu_cost += cost_time;
} else {
// 这个最终就是服务的具体执行
// 详细看《Skynet源码之:service_snlua》和《Skynet源码之:服务实现》
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
}
if (!reserve_msg) {
skynet_free(msg->data);
}
CHECKCALLING_END(ctx)
}
总结
其实有了前面这么多部分的铺垫,进程启动这部分已经非常简单了
主要的内容其实都是前面所说的
更应该关注《Skynet源码之:service_snlua》和《Skynet源码之:服务实现》