服务实现的复杂性
服务实现是最为复杂的一个部分,其复杂不是在于本身的代码,而是服务几乎把所有的模块都联系到了一起
从消息队列message_queue,服务管理handle_storage,模块加载modules,监视器monitor,Skynet中API的实现
以及c-lua的调用,lualib的调用等,各种命令的传递,都深度关涉到服务实现
可以说服务实现是使用这些部分的一个综合协调体
服务会使用模块加载的创建,初始化,释放,通知
服务会使用服务管理的创建,释放,查找,命名
服务会使用消息队列的发消息,收消息,处理消息
服务还会被worker线程获取执行,被监视器monitor监控
服务更需要实现对Skynet中各类API接口的封装
同时服务也被网络通信绑定
也关系到lua协程、以及任务的执行
对此,我会尽最大的努力来描述,并尽可能地把上面的模块串联起来
第一个启动的服务
随着以下步骤的实现,我们迎来了第一个服务的启动
if (config->daemon) { // 守护进程
if (daemon_init(config->daemon)) {
exit(1);
}
}
skynet_harbor_init(config->harbor); // 节点建立
skynet_handle_init(config->harbor); // 服务管理
skynet_mq_init(); // 消息队列
skynet_module_init(config->module_path);// 模块加载
skynet_timer_init(); // 定时器
skynet_socket_init(); // 网络
skynet_profile_enable(config->profile); // 性能监控
第一个启动的就是logger服务
struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);
if (ctx == NULL) {
fprintf(stderr, "Can't launch %s service\n", config->logservice);
exit(1);
}
服务启动函数
skynet_context_new() 函数用来新建一个服务,实现如下:
// 位于skynet_server.c文件中
struct skynet_context *
skynet_context_new(const char * name, const char *param) {
// 第一步:查找模块,详细见《Skynet源码之:模块加载》
struct skynet_module * mod = skynet_module_query(name);
if (mod == NULL)
return NULL;
// 第二步:模块实例化,详细见《Skynet源码之:模块加载》
void *inst = skynet_module_instance_create(mod);
if (inst == NULL)
return NULL;
// 第三步:分配服务内存
struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
// 这个宏是在开发阶段检查框架是否有bug而存在的
// 详细见:https://github.com/cloudwu/skynet/discussions/1753
CHECKCALLING_INIT(ctx)
// 第四步:对服务结构体进行一些赋值
ctx->mod = mod;
ctx->instance = inst;
// 对服务的引用进程初始化,并设置为2
// 详细见《Skynet专题之:原子操作》
// 这里为什么要设置为2呢?后面会再说
ATOM_INIT(&ctx->ref, 2);
ctx->cb = NULL;
ctx->cb_ud = NULL;
ctx->session_id = 0;
// 对服务的日志打印进行初始化
// 详细见《Skynet专题之:原子操作》
// 这里可以全局搜索:logfile,也能了解其用法
// 或参考:https://github.com/cloudwu/skynet/discussions/1851
// 这是有关debug_console中logon和logoff命令的实现
ATOM_INIT(&ctx->logfile, (uintptr_t)NULL);
ctx->init = false;
// 有关debug_console中endless命令的实现
ctx->endless = false;
ctx->cpu_cost = 0;
ctx->cpu_start = 0;
ctx->message_count = 0;
ctx->profile = G_NODE.profile;
// Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
ctx->handle = 0;
ctx->handle = skynet_handle_register(ctx);
struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
// init function maybe use ctx->handle, so it must init at last
// 往全局管理中添加服务节点,详细见《Skynet源码之:环境准备》
context_inc();
CHECKCALLING_BEGIN(ctx)
int r = skynet_module_instance_init(mod, inst, ctx, param);
CHECKCALLING_END(ctx)
if (r == 0) {
struct skynet_context * ret = skynet_context_release(ctx);
if (ret) {
ctx->init = true;
}
// 这里为什么需要把次级消息队列放入全局队列中呢?
// 在skynet_mq.c文件中,第85行,有明确的注释
// When the queue is create (always between service create and service init) ,
// set in_global flag to avoid push it to global queue .
// If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.
// 详细请看《Skynet源码之:消息队列》
skynet_globalmq_push(queue);
if (ret) {
skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
}
return ret;
} else {
skynet_error(ctx, "FAILED launch %s", name);
uint32_t handle = ctx->handle;
skynet_context_release(ctx);
skynet_handle_retire(handle);
struct drop_t d = { handle };
skynet_mq_release(queue, drop_message, &d);
return NULL;
}
}
查找服务模块
函数:skynet_module_query()
首先是skynet_module结构体,在skynet_module.h文件中定义
详细见《Skynet源码之:模块加载》,里面有详细的分析和说明
最后,我们得到一个指向 struct skynet_module * mod 的指针,被赋值给mod
代码:
struct skynet_module * mod = skynet_module_query(name);
if (mod == NULL)
return NULL;
创建模块实例化
函数:skynet_module_instance_create()
详细见《Skynet源码之:模块加载》,里面有详细的分析和说明
但是不同的模块具有不同的 module_create() 函数
重点可以看看《Skynet源码之:service_logger》+ 《Skynet源码之:service_snlua》
最后,我们得到一个指向 该服务的实例 的指针,被赋值给inst
代码:
void *inst = skynet_module_instance_create(mod);
if (inst == NULL)
return NULL;
服务结构体
先来看看服务的结构体以及它的赋值
// 服务结构体
struct skynet_context {
void * instance; // 指向服务的实例,即前面的inst
struct skynet_module * mod; // 指向服务所属的模块,即前面的mod
void * cb_ud;
skynet_cb cb; // 属于服务的回调函数
struct message_queue *queue; // 属于服务的次级消息队列
ATOM_POINTER logfile; // 用于日志打印,详细见《Skynet源码之:日志打印》
uint64_t cpu_cost; // in microsec,统计此服务花费的cpu时间总数
uint64_t cpu_start; // in microsec,记录服务处理某条消息时的开始时间,等消息处理完毕时,会根据 cpu_end - cpu_start 来计算
char result[32]; // 用于存放stat命令的结果,展示给控制台
uint32_t handle; // 属于此服务的handle,在一个Skynet节点中,handle是唯一的
int session_id; // 此服务消息序列的id
ATOM_INT ref; // 原子性操作,用于记录服务的引用(非常重要,引用计数关系着服务内存的释放)
int message_count; // 服务消息的总数
bool init; // 服务是否完成初始化
bool endless; // 服务是否陷入死循环
bool profile; // 服务是否开启性能监控
CHECKCALLING_DECL
};
注册服务
函数:skynet_handle_register()
详细见《Skynet源码之:服务管理》,这里面管理着所有的服务
代码:
// Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
// 需要先把handle置为0,表明这是个空置的服务
// 因为在skynet_handle_register(ctx)注册的时候,可能遇到服务全部退出的情况
// 所以设置为0之后,不至于skynet_handle_retireall()找不到对应的handle
ctx->handle = 0;
ctx->handle = skynet_handle_register(ctx);
创建消息队列
函数:skynet_mq_create()
详细见《Skynet源码之:消息队列》,这里负责全局消息队列和次级消息队列的管理
代码:
struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
模块初始化
函数:skynet_module_instance_init()
详细见《Skynet源码之:模块加载》,里面有详细的分析和说明
但是不同的模块具有不同的 module_init() 函数
重点可以看看《Skynet源码之:service_logger》+ 《Skynet源码之:service_snlua》
最后,可以判断该模块是否初始化成功
代码:
// 这里传入了几个参数
// mod:刚才skynet_module_query()查出的模板
// inst:刚才skynet_module_instance_create()创建的实列化
// ctx:就是本服务的结构体
// param:需要传给模板实列的参数
int r = skynet_module_instance_init(mod, inst, ctx, param);
// 注意:
// param:这个参数的是根据不同的模板,具有不同的值
// 例如:
// service_logger模板,传入的参数是:config->logger,即是用户配置的日志路径和名字
// service_snlua模板,传入的参数是:需要启动的服务脚本的名字,即是name.lua中的名字
总结
至此,一个服务的启动就已经完成了
但这还是远远不够的:
1,skynet_context结构体中的 void * cb_ud 和 skynet_cb cb字段是什么作用?如何赋值的?
2,我们启动一个服务,都是通过skynet.newservice()实现的,底层的机制是什么呢?
3,启动后的服务,是如何处理消息的呢?
4,服务中调用lua层,c层的命令,又是如何实现的呢?
一切的一切,都需要归结到《Skynet源码之:service_snlua》中去解释
Skynet的API
在写完本章和《Skynet源码之:service_snlua》之后,我的脑子已经一团浆糊
主要是关系的东西太多太多了,不知道从何写起了,即使自己心中知道实现,但就是凑不起来
因为确实很多很杂,有时候关系lua-c的交互,有时候又到定时器去了
接着又是协程的执行,以及服务之间各种调用和封装,还有消息的注册打包之类的
再加上消息队列等,还有各种坑点和使用注意事项
我想了一个办法:围绕skynet的API来描述,应该是最为清晰的
主要特别关注:skynet.lua 文件 和 manager.lua
计算机里没有黑魔法,不要急,慢慢来,总有一天我会搞懂的
有关服务启动
1,skynet.newservice()
2,skynet.lanuch()
3,bootstrap.lua
4,skynet.init_service
5,skynet.init()
6,skynet.start()
7,skynet.require()
有关消息
1,skynet.call()
2,skynet.send()
3,skynet.pack()
4,skynet.register_protocol()
5,skynet.dispatch_message()
6,raw_dispatch_message()
7,skynet.response()
有关功能
1,skynet.timeout()
2,skynet.error()
流程总结
对于服务的实现来说,有太多的东西需要说,除了有关模块的准备外
下面我总结一下重要的流程
1,服务是如何被worker线程获取的:《Skynet源码之:进程启动》
2,服务的创建是怎么样进行的:《Skynet源码之:服务实现》
3,服务是如何工作的:《Skynet源码之:service_snlua》
4,服务的消息是如何流转的
1,worker线程在全局消息队列中,获取到服务的次级消息队列
2,从次级消息队列中,获取到一个消息msg
3,通过 dispatch_message() 函数(skynet_server.c文件),调用到服务建立时绑定的cb函数
4,服务的cb函数跟服务创建时的模板有关,一般为snlua
5,而snlua模板启动的服务,cb函数一般会绑定为在lua层的 skynet.dispatch_message() 函数(skynet.lua文件)
6,最后交由 raw_dispatch_message() 函数解析消息,并根据消息session_id,获得对应的协程
注意:
这里要着重提一下 raw_dispatch_message() 函数的实现
详情可见《Skynet源码之:service_snlua》中的dispatch_message部分
1,很多服务在启动时,会使用skynet.register_protocol() 函数来注册协议,例如
skynet.register_protocol {
name = "SYSTEM",
id = skynet.PTYPE_SYSTEM,
unpack = function(...) return ... end,
dispatch = function()
-- reopen signal
print("SIGHUP")
end
}
此时能,就会在skynet.lua中注册相对类型的处理函数
function skynet.register_protocol(class)
local name = class.name
local id = class.id
assert(proto[name] == nil and proto[id] == nil)
assert(type(name) == "string" and type(id) == "number" and id >=0 and id <=255)
proto[name] = class
proto[id] = class
end
2,最后在 raw_dispatch_message() 函数就会作出区分
-- 根据注册的消息类型,获取用户的注册函数
local p = proto[prototype]
if p == nil then
if prototype == skynet.PTYPE_TRACE then
-- trace next request
trace_source[source] = c.tostring(msg,sz)
elseif session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, prototype)
end
return
end
-- 那么这个p.dispatch又是哪里注册的呢?
-- 就是我们经常在main.lua文件中看到的
-- 某个main.lua文件
skynet.start(function()
skynet.dispatch("lua", function(_,_, command, ...)
skynet.trace()
local f = CMD[command]
skynet.ret(skynet.pack(f(...)))
end)
end)
-- 具体实现
-- skynet.lua文件
function skynet.dispatch(typename, func)
local p = proto[typename]
if func then
local ret = p.dispatch
p.dispatch = func
return ret
else
return p and p.dispatch
end
end
-- skynet.lua文件
-- raw_dispatch_message()中的部分代码
local f = p.dispatch
if f then
local co = co_create(f)
session_coroutine_id[co] = session
session_coroutine_address[co] = source
local traceflag = p.trace
if traceflag == false then
-- force off
trace_source[source] = nil
session_coroutine_tracetag[co] = false
else
local tag = trace_source[source]
if tag then
trace_source[source] = nil
c.trace(tag, "request")
session_coroutine_tracetag[co] = tag
elseif traceflag then
-- set running_thread for trace
running_thread = co
skynet.trace()
end
end
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
else
trace_source[source] = nil
if session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, proto[prototype].name)
end
end
以上,整个服务的消息的执行流程都总结完毕了