Skynet源码之:服务实现(14)

JavenLaw

服务实现的复杂性

服务实现是最为复杂的一个部分,其复杂不是在于本身的代码,而是服务几乎把所有的模块都联系到了一起

从消息队列message_queue,服务管理handle_storage,模块加载modules,监视器monitor,Skynet中API的实现

以及c-lua的调用,lualib的调用等,各种命令的传递,都深度关涉到服务实现

可以说服务实现是使用这些部分的一个综合协调体

​ 服务会使用模块加载的创建,初始化,释放,通知

​ 服务会使用服务管理的创建,释放,查找,命名

​ 服务会使用消息队列的发消息,收消息,处理消息

​ 服务还会被worker线程获取执行,被监视器monitor监控

​ 服务更需要实现对Skynet中各类API接口的封装

​ 同时服务也被网络通信绑定

​ 也关系到lua协程、以及任务的执行

对此,我会尽最大的努力来描述,并尽可能地把上面的模块串联起来


第一个启动的服务

随着以下步骤的实现,我们迎来了第一个服务的启动

if (config->daemon) { // 守护进程
	if (daemon_init(config->daemon)) {
		exit(1);
	}
}
skynet_harbor_init(config->harbor); // 节点建立
skynet_handle_init(config->harbor); // 服务管理
skynet_mq_init(); // 消息队列
skynet_module_init(config->module_path);// 模块加载
skynet_timer_init(); // 定时器
skynet_socket_init(); // 网络
skynet_profile_enable(config->profile); // 性能监控

第一个启动的就是logger服务

struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);
if (ctx == NULL) {
	fprintf(stderr, "Can't launch %s service\n", config->logservice);
	exit(1);
}


服务启动函数

skynet_context_new() 函数用来新建一个服务,实现如下:

// 位于skynet_server.c文件中

struct skynet_context * 
skynet_context_new(const char * name, const char *param) {
    
    // 第一步:查找模块,详细见《Skynet源码之:模块加载》
	struct skynet_module * mod = skynet_module_query(name);

	if (mod == NULL)
		return NULL;

    // 第二步:模块实例化,详细见《Skynet源码之:模块加载》
    void *inst = skynet_module_instance_create(mod);
    if (inst == NULL)
        return NULL;
    
    // 第三步:分配服务内存
    struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
    
    // 这个宏是在开发阶段检查框架是否有bug而存在的
    // 详细见:https://github.com/cloudwu/skynet/discussions/1753
    CHECKCALLING_INIT(ctx)
	
    // 第四步:对服务结构体进行一些赋值
    ctx->mod = mod;
    ctx->instance = inst;
    
    // 对服务的引用进程初始化,并设置为2
    // 详细见《Skynet专题之:原子操作》
    // 这里为什么要设置为2呢?后面会再说
    ATOM_INIT(&ctx->ref, 2);
    ctx->cb = NULL;
    ctx->cb_ud = NULL;
    ctx->session_id = 0;
    
    // 对服务的日志打印进行初始化
    // 详细见《Skynet专题之:原子操作》
    // 这里可以全局搜索:logfile,也能了解其用法
    // 或参考:https://github.com/cloudwu/skynet/discussions/1851
    // 这是有关debug_console中logon和logoff命令的实现
    ATOM_INIT(&ctx->logfile, (uintptr_t)NULL);
	
    ctx->init = false;
    
    // 有关debug_console中endless命令的实现
    ctx->endless = false;

    ctx->cpu_cost = 0;
    ctx->cpu_start = 0;
    ctx->message_count = 0;
    ctx->profile = G_NODE.profile;
    // Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
    ctx->handle = 0;	
    ctx->handle = skynet_handle_register(ctx);
    struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
    
    // init function maybe use ctx->handle, so it must init at last
    // 往全局管理中添加服务节点,详细见《Skynet源码之:环境准备》
    context_inc();

    CHECKCALLING_BEGIN(ctx)
    int r = skynet_module_instance_init(mod, inst, ctx, param);
    CHECKCALLING_END(ctx)
    
    if (r == 0) {
        struct skynet_context * ret = skynet_context_release(ctx);
        if (ret) {
            ctx->init = true;
        }
        // 这里为什么需要把次级消息队列放入全局队列中呢?
        // 在skynet_mq.c文件中,第85行,有明确的注释
        // When the queue is create (always between service create and service init) ,
		// set in_global flag to avoid push it to global queue .
		// If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.
        // 详细请看《Skynet源码之:消息队列》
        skynet_globalmq_push(queue);
        if (ret) {
            skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
        }
        return ret;
    } else {
        skynet_error(ctx, "FAILED launch %s", name);
        uint32_t handle = ctx->handle;
        skynet_context_release(ctx);
        skynet_handle_retire(handle);
        struct drop_t d = { handle };
        skynet_mq_release(queue, drop_message, &d);
        return NULL;
    }
}


查找服务模块

函数:skynet_module_query()

​ 首先是skynet_module结构体,在skynet_module.h文件中定义

详细见《Skynet源码之:模块加载》,里面有详细的分析和说明

​ 最后,我们得到一个指向 struct skynet_module * mod 的指针,被赋值给mod

代码:

struct skynet_module * mod = skynet_module_query(name);
if (mod == NULL)
		return NULL;


创建模块实例化

函数:skynet_module_instance_create()

详细见《Skynet源码之:模块加载》,里面有详细的分析和说明

​ 但是不同的模块具有不同的 module_create() 函数

​ 重点可以看看《Skynet源码之:service_logger》+ 《Skynet源码之:service_snlua》

​ 最后,我们得到一个指向 该服务的实例 的指针,被赋值给inst

代码:

void *inst = skynet_module_instance_create(mod);
if (inst == NULL)
	return NULL;


服务结构体

先来看看服务的结构体以及它的赋值

// 服务结构体
struct skynet_context {
	void * instance; // 指向服务的实例,即前面的inst
	struct skynet_module * mod; // 指向服务所属的模块,即前面的mod
	void * cb_ud;
	skynet_cb cb; // 属于服务的回调函数
	struct message_queue *queue; // 属于服务的次级消息队列
	ATOM_POINTER logfile; // 用于日志打印,详细见《Skynet源码之:日志打印》
	uint64_t cpu_cost;	// in microsec,统计此服务花费的cpu时间总数
	uint64_t cpu_start;	// in microsec,记录服务处理某条消息时的开始时间,等消息处理完毕时,会根据 cpu_end - cpu_start 来计算
	char result[32]; // 用于存放stat命令的结果,展示给控制台
	uint32_t handle; // 属于此服务的handle,在一个Skynet节点中,handle是唯一的
	int session_id; // 此服务消息序列的id
	ATOM_INT ref; // 原子性操作,用于记录服务的引用(非常重要,引用计数关系着服务内存的释放)
	int message_count; // 服务消息的总数
	bool init; // 服务是否完成初始化
	bool endless; // 服务是否陷入死循环
	bool profile; // 服务是否开启性能监控
	
	CHECKCALLING_DECL
};


注册服务

函数:skynet_handle_register()

详细见《Skynet源码之:服务管理》,这里面管理着所有的服务

代码:

// Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
// 需要先把handle置为0,表明这是个空置的服务
// 因为在skynet_handle_register(ctx)注册的时候,可能遇到服务全部退出的情况
// 所以设置为0之后,不至于skynet_handle_retireall()找不到对应的handle
ctx->handle = 0;
ctx->handle = skynet_handle_register(ctx);


创建消息队列

函数:skynet_mq_create()

详细见《Skynet源码之:消息队列》,这里负责全局消息队列和次级消息队列的管理

代码:

struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);


模块初始化

函数:skynet_module_instance_init()

详细见《Skynet源码之:模块加载》,里面有详细的分析和说明

​ 但是不同的模块具有不同的 module_init() 函数

​ 重点可以看看《Skynet源码之:service_logger》+ 《Skynet源码之:service_snlua》

​ 最后,可以判断该模块是否初始化成功

代码:

// 这里传入了几个参数
// mod:刚才skynet_module_query()查出的模板
// inst:刚才skynet_module_instance_create()创建的实列化
// ctx:就是本服务的结构体
// param:需要传给模板实列的参数
int r = skynet_module_instance_init(mod, inst, ctx, param);

// 注意:
// 		param:这个参数的是根据不同的模板,具有不同的值
// 例如:
// 		service_logger模板,传入的参数是:config->logger,即是用户配置的日志路径和名字
// 		service_snlua模板,传入的参数是:需要启动的服务脚本的名字,即是name.lua中的名字


总结

至此,一个服务的启动就已经完成了

但这还是远远不够的:

​ 1,skynet_context结构体中的 void * cb_ud 和 skynet_cb cb字段是什么作用?如何赋值的?

​ 2,我们启动一个服务,都是通过skynet.newservice()实现的,底层的机制是什么呢?

​ 3,启动后的服务,是如何处理消息的呢?

​ 4,服务中调用lua层,c层的命令,又是如何实现的呢?

一切的一切,都需要归结到《Skynet源码之:service_snlua》中去解释


Skynet的API

在写完本章和《Skynet源码之:service_snlua》之后,我的脑子已经一团浆糊

主要是关系的东西太多太多了,不知道从何写起了,即使自己心中知道实现,但就是凑不起来

因为确实很多很杂,有时候关系lua-c的交互,有时候又到定时器去了

接着又是协程的执行,以及服务之间各种调用和封装,还有消息的注册打包之类的

再加上消息队列等,还有各种坑点和使用注意事项

我想了一个办法:围绕skynet的API来描述,应该是最为清晰的

主要特别关注:skynet.lua 文件 和 manager.lua

计算机里没有黑魔法,不要急,慢慢来,总有一天我会搞懂的


有关服务启动

​ 1,skynet.newservice()

​ 2,skynet.lanuch()

​ 3,bootstrap.lua

​ 4,skynet.init_service

​ 5,skynet.init()

​ 6,skynet.start()

​ 7,skynet.require()


有关消息

​ 1,skynet.call()

​ 2,skynet.send()

​ 3,skynet.pack()

​ 4,skynet.register_protocol()

​ 5,skynet.dispatch_message()

​ 6,raw_dispatch_message()

​ 7,skynet.response()


有关功能

​ 1,skynet.timeout()

​ 2,skynet.error()

流程总结

对于服务的实现来说,有太多的东西需要说,除了有关模块的准备外

下面我总结一下重要的流程

1,服务是如何被worker线程获取的:《Skynet源码之:进程启动》

2,服务的创建是怎么样进行的:《Skynet源码之:服务实现》

3,服务是如何工作的:《Skynet源码之:service_snlua》

4,服务的消息是如何流转的

​ 1,worker线程在全局消息队列中,获取到服务的次级消息队列

​ 2,从次级消息队列中,获取到一个消息msg

​ 3,通过 dispatch_message() 函数(skynet_server.c文件),调用到服务建立时绑定的cb函数

​ 4,服务的cb函数跟服务创建时的模板有关,一般为snlua

​ 5,而snlua模板启动的服务,cb函数一般会绑定为在lua层的 skynet.dispatch_message() 函数(skynet.lua文件)

​ 6,最后交由 raw_dispatch_message() 函数解析消息,并根据消息session_id,获得对应的协程

注意:

​ 这里要着重提一下 raw_dispatch_message() 函数的实现

​ 详情可见《Skynet源码之:service_snlua》中的dispatch_message部分

​ 1,很多服务在启动时,会使用skynet.register_protocol() 函数来注册协议,例如

skynet.register_protocol {
    name = "SYSTEM",
    id = skynet.PTYPE_SYSTEM,
    unpack = function(...) return ... end,
    dispatch = function()
        -- reopen signal
        print("SIGHUP")
    end
}

​ 此时能,就会在skynet.lua中注册相对类型的处理函数

function skynet.register_protocol(class)
	local name = class.name
	local id = class.id
	assert(proto[name] == nil and proto[id] == nil)
	assert(type(name) == "string" and type(id) == "number" and id >=0 and id <=255)
	proto[name] = class
	proto[id] = class
end

​ 2,最后在 raw_dispatch_message() 函数就会作出区分

-- 根据注册的消息类型,获取用户的注册函数
local p = proto[prototype]
if p == nil then
    if prototype == skynet.PTYPE_TRACE then
        -- trace next request
        trace_source[source] = c.tostring(msg,sz)
    elseif session ~= 0 then
        c.send(source, skynet.PTYPE_ERROR, session, "")
    else
        unknown_request(session, source, msg, sz, prototype)
    end
    return
end

-- 那么这个p.dispatch又是哪里注册的呢?
-- 就是我们经常在main.lua文件中看到的
-- 某个main.lua文件
skynet.start(function()
	skynet.dispatch("lua", function(_,_, command, ...)
		skynet.trace()
		local f = CMD[command]
		skynet.ret(skynet.pack(f(...)))
	end)
end)

-- 具体实现
-- skynet.lua文件
function skynet.dispatch(typename, func)
	local p = proto[typename]
	if func then
		local ret = p.dispatch
		p.dispatch = func
		return ret
	else
		return p and p.dispatch
	end
end

-- skynet.lua文件
-- raw_dispatch_message()中的部分代码
local f = p.dispatch
if f then
    local co = co_create(f)
    session_coroutine_id[co] = session
    session_coroutine_address[co] = source
    local traceflag = p.trace
    if traceflag == false then
        -- force off
        trace_source[source] = nil
        session_coroutine_tracetag[co] = false
    else
        local tag = trace_source[source]
        if tag then
            trace_source[source] = nil
            c.trace(tag, "request")
            session_coroutine_tracetag[co] = tag
        elseif traceflag then
            -- set running_thread for trace
            running_thread = co
            skynet.trace()
        end
    end
    suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
else
    trace_source[source] = nil
    if session ~= 0 then
        c.send(source, skynet.PTYPE_ERROR, session, "")
    else
        unknown_request(session, source, msg, sz, proto[prototype].name)
    end
end

以上,整个服务的消息的执行流程都总结完毕了