service_harbor 的实现和《Skynet源码之:节点建立》密切相关
可以先了解一下 harbor 的设计和历史
让我们看看 service_harbor 的 harbor_create 创建函数
struct harbor *
harbor_create(void) {
struct harbor * h = skynet_malloc(sizeof(*h)); // 创建了一个 struct harbor 结构体的内存
memset(h, 0, sizeof(*h));
h->map = hash_new();
return h; // 返回 struct harbor 结构体的句柄
}
// 下面开始都是一些结构体的定义
// 远程消息的头部
struct remote_message_header {
uint32_t source;
uint32_t destination;
uint32_t session;
};
// 远程消息
struct harbor_msg {
struct remote_message_header header;
void * buffer;
size_t size;
};
// 远程消息队列
struct harbor_msg_queue {
int size;
int head;
int tail;
struct harbor_msg * data; // 实质是一个 结构体数组,参考《Skynet消息队列》中二级消息队列的实现
};
struct keyvalue {
struct keyvalue * next;
char key[GLOBALNAME_LENGTH];
uint32_t hash;
uint32_t value;
struct harbor_msg_queue * queue;
};
// slave结构体
struct slave {
int fd;
struct harbor_msg_queue *queue;
int status;
int length;
int read;
uint8_t size[4];
char * recv_buffer;
};
// hashmap结构体
struct hashmap {
struct keyvalue *node[HASH_SIZE];
};
// harbor结构体
struct harbor {
struct skynet_context *ctx;
int id;
uint32_t slave;
struct hashmap * map;
struct slave s[REMOTE_MAX];
};
// 将 hashmap 初始化为0
static struct hashmap *
hash_new() {
struct hashmap * h = skynet_malloc(sizeof(struct hashmap));
memset(h, 0, sizeof(*h));
return h;
}
看看 harbor_init 函数的做了什么
int
harbor_init(struct harbor *h, struct skynet_context *ctx, const char * args) {
h->ctx = ctx;
int harbor_id = 0;
uint32_t slave = 0;
// 读取args字符串中的值,存储在 harbor_id 和 slave
sscanf(args, "%d %u", &harbor_id, &slave);
// 若 harbor == 0,则harbor_id == 0,slave == 4,这个4是指cdummy的服务地址
// 若 harbor ~= 0,则harbor_id == harbor,slave == 16777221,这个16777221是指cslave的服务地址
if (slave == 0) {
return 1;
}
h->id = harbor_id;
// 若 harbor == 0,记录的就是cdummy的服务地址
// 若 harbor ~= 0,记录的就是cslave的服务地址
h->slave = slave;
if (harbor_id == 0) {
close_all_remotes(h);
}
skynet_callback(ctx, h, mainloop); // 把 harbor服务 的 cb 函数绑定为 mainloop
skynet_harbor_start(ctx); // 对 harbor 进行初始设置
return 0;
}
skynet源码分析之master/salve集群模式 - RainRill - 博客园 (cnblogs.com)
看下第一种情况,当 harbor == 0 的时候,此时会把 slave 关掉
static void
close_all_remotes(struct harbor *h) {
int i;
for (i=1;i<REMOTE_MAX;i++) {
close_harbor(h,i);
// don't call report_harbor_down.
// never call skynet_send during module exit, because of dead lock
}
}
// 把 struct slave s[REMOTE_MAX] 数组中的 slave 对象的状态 s->status 都设置为关闭
// 这 256 个对象,其实就是表示网络中全部的256个节点
// 根据Skynet的设计,master-slave模式中,其中一个节点是 master,剩余节点都是 slave
// 当 harbor == 0 的时候,表示不使用 master-slave 模式,因此关闭全部 slave 对象
static void
close_harbor(struct harbor *h, int id) {
struct slave *s = &h->s[id];
s->status = STATUS_DOWN;
if (s->fd) {
skynet_socket_close(h->ctx, s->fd); // 关掉 slave 对象的连接,即关闭 本地节点 和 外部节点 的连接
s->fd = 0;
}
if (s->queue) {
release_queue(s->queue); // 释放 slave 对象的消息队列
s->queue = NULL;
}
}
再详细了解一下,如何组织使用 slave 的消息队列的:
// 消息释放操作
static void
release_queue(struct harbor_msg_queue *queue) {
if (queue == NULL)
return;
struct harbor_msg * m;
while ((m=pop_queue(queue)) != NULL) {
skynet_free(m->buffer); // 释放掉所有发给此节点的消息
}
skynet_free(queue->data);
skynet_free(queue);
}
// 把消息加入到消息队列中
static void
push_queue_msg(struct harbor_msg_queue * queue, struct harbor_msg * m) {
// If there is only 1 free slot which is reserved to distinguish full/empty
// of circular buffer, expand it.
if (((queue->tail + 1) % queue->size) == queue->head) {
struct harbor_msg * new_buffer = skynet_malloc(queue->size * 2 * sizeof(struct harbor_msg));
int i;
for (i=0;i<queue->size-1;i++) {
new_buffer[i] = queue->data[(i+queue->head) % queue->size];
}
skynet_free(queue->data);
queue->data = new_buffer;
queue->head = 0;
queue->tail = queue->size - 1;
queue->size *= 2;
}
struct harbor_msg * slot = &queue->data[queue->tail];
*slot = *m;
queue->tail = (queue->tail + 1) % queue->size;
}
// 消息压入操作
static void
push_queue(struct harbor_msg_queue * queue, void * buffer, size_t sz, struct remote_message_header * header) {
struct harbor_msg m;
m.header = *header;
m.buffer = buffer;
m.size = sz;
push_queue_msg(queue, &m);
}
// 消息弹出操作
static struct harbor_msg *
pop_queue(struct harbor_msg_queue * queue) {
if (queue->head == queue->tail) {
return NULL;
}
struct harbor_msg * slot = &queue->data[queue->head]; // 这里是一个数组队列
queue->head = (queue->head + 1) % queue->size;
return slot;
}
现在我们知道,在 master-slave 模式,每个节点在本进程中,都用一个 struct slave 结构体来表示
每个节点发往本节点的消息,都会存储在 struct slave 结构体中的 queue 字段
接着我们看看 harbor 是如何接收消息,处理消息的?
看看 harbor 服务的 cb 实现
static int
mainloop(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
struct harbor * h = ud;
switch (type) {
// 处理网络消息,即:外部节点 发给 本地节点 的消息
case PTYPE_SOCKET: {
const struct skynet_socket_message * message = msg;
switch(message->type) {
case SKYNET_SOCKET_TYPE_DATA:
push_socket_data(h, message); // 把网络来的消息,push进对应服务的信箱中
skynet_free(message->buffer);
break;
case SKYNET_SOCKET_TYPE_ERROR:
case SKYNET_SOCKET_TYPE_CLOSE: {
int id = harbor_id(h, message->id);
if (id) {
report_harbor_down(h, id);
} else {
skynet_error(context, "Unknown fd (%d) closed", message->id);
}
break;
}
case SKYNET_SOCKET_TYPE_CONNECT:
// fd forward to this service
break;
case SKYNET_SOCKET_TYPE_WARNING: {
int id = harbor_id(h, message->id);
if (id) {
skynet_error(context, "message havn't send to Harbor (%d) reach %d K", id, message->ud);
}
break;
}
default:
skynet_error(context, "recv invalid socket message type %d", type);
break;
}
return 0;
}
// 这个还不知道是做什么的
case PTYPE_HARBOR: {
harbor_command(h, msg, sz, session, source);
return 0;
}
// 处理系统消息,即:本地节点 发往 外部节点 的消息
// 例如 skynet.send 或 skynet.call
// 根据Skynet的设计,所有发往外部节点的消息,都会先发往本地节点的harbor服务中
// 再由harbor服务统一发给外部节点
case PTYPE_SYSTEM : {
// remote message out
const struct remote_message *rmsg = msg;
// 当传来的是 外部节点的名字,则调用 remote_send_name()
if (rmsg->destination.handle == 0) {
if (remote_send_name(h, source, rmsg->destination.name, rmsg->type, session, rmsg->message, rmsg->sz)) {
return 0;
}
} else {
// 当传来的是 外部节点的handle,则调用 remote_send_handle()
if (remote_send_handle(h, source, rmsg->destination.handle, rmsg->type, session, rmsg->message, rmsg->sz)) {
return 0;
}
}
skynet_free((void *)rmsg->message);
return 0;
}
// 处理一下错误类型
default:
skynet_error(context, "recv invalid message from %x, type = %d", source, type);
if (session != 0 && type != PTYPE_ERROR) {
skynet_send(context, 0, source, PTYPE_ERROR, session, NULL, 0);
}
return 0;
}
}
我们看看 外部节点 发给 本地节点 的消息 处理流程
第一,判断消息的类型:
SKYNET_SOCKET_TYPE_DATA(有数据来)
SKYNET_SOCKET_TYPE_ERROR(连接错误)
SKYNET_SOCKET_TYPE_CLOSE(连接关闭)
SKYNET_SOCKET_TYPE_CONNECT(连接中)
SKYNET_SOCKET_TYPE_WARNING(连接警告)
第二,重点关注如何处理数据:
// 网络消息
struct skynet_socket_message {
int type;
int id;
int ud;
char * buffer;
};
static void
push_socket_data(struct harbor *h, const struct skynet_socket_message * message) {
assert(message->type == SKYNET_SOCKET_TYPE_DATA);
int fd = message->id;
int i;
int id = 0;
// 在 struct slave s[REMOTE_MAX] 数组中
// 找到 此消息 对应的 slave 节点
struct slave * s = NULL;
for (i=1;i<REMOTE_MAX;i++) {
if (h->s[i].fd == fd) {
s = &h->s[i];
id = i;
break;
}
}
if (s == NULL) {
skynet_error(h->ctx, "Invalid socket fd (%d) data", fd);
return;
}
uint8_t * buffer = (uint8_t *)message->buffer;
int size = message->ud;
for (;;) {
// 这里需要重点搞清楚:
// 对于网络消息 SKYNET_SOCKET_TYPE_DATA 来说
// STATUS_HANDSHAKE
// STATUS_HEADER
// STATUS_CONTENT
// 分别代表着什么?
switch(s->status) {
case STATUS_HANDSHAKE: {
// check id
// 检查一下remote_id
// remote_id 就是 外部节点 在 struct slave s[REMOTE_MAX] 数组中的 index
uint8_t remote_id = buffer[0];
if (remote_id != id) {
skynet_error(h->ctx, "Invalid shakehand id (%d) from fd = %d , harbor = %d", id, fd, remote_id);
close_harbor(h, id);
return;
}
++buffer;
--size;
s->status = STATUS_HEADER;
dispatch_queue(h, id); // *************************重点实现函数:dispatch_queue()*************************
if (size == 0) {
break;
}
// go though
}
case STATUS_HEADER: {
// big endian 4 bytes length, the first one must be 0.
int need = 4 - s->read;
if (size < need) {
memcpy(s->size + s->read, buffer, size);
s->read += size;
return;
} else {
memcpy(s->size + s->read, buffer, need);
buffer += need;
size -= need;
if (s->size[0] != 0) {
skynet_error(h->ctx, "Message is too long from harbor %d", id);
close_harbor(h,id);
return;
}
s->length = s->size[1] << 16 | s->size[2] << 8 | s->size[3];
s->read = 0;
s->recv_buffer = skynet_malloc(s->length);
s->status = STATUS_CONTENT;
if (size == 0) {
return;
}
}
}
// go though
case STATUS_CONTENT: {
int need = s->length - s->read;
if (size < need) {
memcpy(s->recv_buffer + s->read, buffer, size);
s->read += size;
return;
}
memcpy(s->recv_buffer + s->read, buffer, need);
forward_local_messsage(h, s->recv_buffer, s->length); // *******重点实现函数:forward_local_messsage()*******
s->length = 0;
s->read = 0;
s->recv_buffer = NULL;
size -= need;
buffer += need;
s->status = STATUS_HEADER;
if (size == 0)
return;
break;
}
default:
return;
}
}
}
看看 dispatch_queue() 函数做了什么?
static void
dispatch_queue(struct harbor *h, int id) {
struct slave *s = &h->s[id];
int fd = s->fd;
assert(fd != 0);
struct harbor_msg_queue *queue = s->queue; // 获取到 struct slave s[id] 的消息队列 s->queue
if (queue == NULL)
return;
struct harbor_msg * m;
while ((m = pop_queue(queue)) != NULL) { // 把 struct slave s[id] 消息队列 s->queue 上的所有消息都发送出去
send_remote(h->ctx, fd, m->buffer, m->size, &m->header); // 把消息发给外部节点
skynet_free(m->buffer);
}
release_queue(queue); // 释放 struct slave s[id] 的消息队列 s->queue
s->queue = NULL; // 重置 struct slave s[id] 的消息队列 s->queue
}
看看是如何把消息发给外部节点的
static void
send_remote(struct skynet_context * ctx, int fd, const char * buffer, size_t sz, struct remote_message_header * cookie) {
size_t sz_header = sz + sizeof(*cookie);
if (sz_header > UINT32_MAX) {
skynet_error(ctx, "remote message from :%08x to :%08x is too large.", cookie->source, cookie->destination);
return;
}
uint8_t sendbuf[sz_header+4];
to_bigendian(sendbuf, (uint32_t)sz_header);
memcpy(sendbuf+4, buffer, sz);
header_to_message(cookie, sendbuf+4+sz);
struct socket_sendbuffer tmp;
tmp.id = fd;
tmp.type = SOCKET_BUFFER_RAWPOINTER;
tmp.buffer = sendbuf;
tmp.sz = sz_header+4;
// ignore send error, because if the connection is broken, the mainloop will recv a message.
skynet_socket_sendbuffer(ctx, &tmp); // 调用网络接口,向 外部节点 发送消息
}
看看 forward_local_messsage() 函数做了什么?
static void
forward_local_messsage(struct harbor *h, void *msg, int sz) {
const char * cookie = msg;
cookie += sz - HEADER_COOKIE_LENGTH;
struct remote_message_header header;
message_to_header((const uint32_t *)cookie, &header);
uint32_t destination = header.destination;
int type = destination >> HANDLE_REMOTE_SHIFT;
destination = (destination & HANDLE_MASK) | ((uint32_t)h->id << HANDLE_REMOTE_SHIFT);
// 把消息发给本地节点的其他服务
if(skynet_send(h->ctx, header.source, destination, type | PTYPE_TAG_DONTCOPY, (int)header.session, (void *)msg,
sz-HEADER_COOKIE_LENGTH) < 0) {
if (type != PTYPE_ERROR) {
// don't need report error when type is error
skynet_send(h->ctx, destination, header.source , PTYPE_ERROR, (int)header.session, NULL, 0);
}
skynet_error(h->ctx, "Unknown destination :%x from :%x type(%d)", destination, header.source, type);
}
}