Skynet 源码学习 -- Socket Server 和 Skynet_socket
异步IO选用底层接口Linux –> epool其他Unix变种 –> kevent宏选择如下 :#ifdef __linux__#include "socket_epoll.h"#endif#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)#in
·
异步IO
选用底层接口
- Linux –> epool
- 其他Unix变种 –> kevent
宏选择如下 :
#ifdef __linux__
#include "socket_epoll.h"
#endif
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
#include "socket_kqueue.h"
#endif
提供接口
// 由于所有的接口实现都写在头文件里面,为了方式重定义而选择全部声明为static。
// 添加一个新的socket句柄
static int sp_add(poll_fd fd, int sock, void *ud);
// 删除对一个socket句柄的维护
static void sp_del(poll_fd fd, int sock);
// 对一个socket句柄的write熟悉进入维护
static void sp_write(poll_fd, int sock, void *ud, bool enable);
// 询问当前的触发事件
static int sp_wait(poll_fd, struct event *e, int max);
// 设置一个socket为非阻塞。
static void sp_nonblocking(int sock);
socket 结构体定义源码
struct socket {
uintptr_t opaque; // 所属的服务单元 ID
struct wb_list high; // 高优先级写队列
struct wb_list low; // 低优先级写队列
int64_t wb_size; // 写缓存大小
int fd; // socket 文件描述符
int id; // 在 socket server中的ID
uint16_t protocol; // socket 协议 TCP/ UDP
uint16_t type; // socket 状态 ( 读/写/监听 ...)
union {
int size; // 读缓存预估需要的大小
uint8_t udp_address[UDP_ADDRESS_SIZE];
} p;
};
socket_server
定义源码
struct socket_server {
int recvctrl_fd; // 管道接收端
int sendctrl_fd; // 管道发送端
int checkctrl;
poll_fd event_fd; //epool/kevent 句柄
int alloc_id;
int event_n;
int event_index;
struct socket_object_interface soi;
struct event ev[MAX_EVENT];
struct socket slot[MAX_SOCKET]; // 管理的socket数组
char buffer[MAX_INFO];
uint8_t udpbuffer[MAX_UDP_PACKAGE];
fd_set rfds;
};
工作模式 :
服务单元 -> socket_server 发送命令
服务单元利用socket_server提供的接口对socket进行操作 :
// 发起tcp链接
int
socket_server_connect(struct socket_server *ss, uintptr_t opaque, const char * addr, int port);
// 发送数据
int64_t
socket_server_send(struct socket_server *ss, int id, const void * buffer, int sz);
// 关闭 socket_server
void
socket_server_exit(struct socket_server *ss);
// 关闭socket
void
socket_server_close(struct socket_server *ss, uintptr_t opaque, int id);
void
socket_server_shutdown(struct socket_server *ss, uintptr_t opaque, int id);
// 打开监听socket
int
socket_server_listen(struct socket_server *ss, uintptr_t opaque, const char * addr, int port, int backlog);
// 绑定socket 到服务ID
int
socket_server_bind(struct socket_server *ss, uintptr_t opaque, int fd);
// ....
而上面的全部接口都是在向socket_server的管道写段发送命令。比如 :
int
socket_server_connect(struct socket_server *ss, uintptr_t opaque, const char * addr, int port) {
struct request_package request;
int len = open_request(ss, &request, opaque, addr, port);
if (len < 0)
return -1;
// 向写管道发送O命令,发起一个tcp链接请求。
send_request(ss, &request, 'O', sizeof(request.u.open) + len);
return request.u.open.id;
}
命令格式
//8bit 命令代码字符
/*
The first byte is TYPE
S Start socket
B Bind socket
L Listen socket
K Close socket
O Connect to (Open)
X Exit
D Send package (high)
P Send package (low)
A Send UDP package
T Set opt
U Create UDP socket
C set udp address
*/
//8bit 命令buffers大小
//命令buffers
socket_server 响应命令
源码:
//socket_server 响应各种异步IO事件的```socket_server_poll```中有以下代码 :
if (ss->checkctrl) { // 每次处理完socket的IO事件会设置checkctrl 1
if (has_cmd(ss)) { // 检测管道读端是否可读。
int type = ctrl_cmd(ss, result); //响应命名。
if (type != -1) {
clear_closed_event(ss, result, type);
return type;
} else
continue;
} else {
ss->checkctrl = 0;
}
}
...
// 命令处理接口
static int
ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
int fd = ss->recvctrl_fd;
// the length of message is one byte, so 256+8 buffer size is enough.
uint8_t buffer[256];
uint8_t header[2];
block_readpipe(fd, header, sizeof(header));
int type = header[0];
int len = header[1];
block_readpipe(fd, buffer, len);
// ctrl command only exist in local fd, so don't worry about endian.
switch (type) {
case 'S':
return start_socket(ss,(struct request_start *)buffer, result);
case 'B':
return bind_socket(ss,(struct request_bind *)buffer, result);
case 'L':
return listen_socket(ss,(struct request_listen *)buffer, result);
case 'K':
return close_socket(ss,(struct request_close *)buffer, result);
case 'O':
return open_socket(ss, (struct request_open *)buffer, result);
case 'X':
...
各个对应的响应命令代码则是实际的维护socket数组的信息和维护对应的epool/kevent 。
socket 的异步IO事件switch
首先异步IO是有epool或者kevent实现的, Skynet也仅仅是做了简单的包装 (以epool为例):
static int
sp_wait(int efd, struct event *e, int max) {
struct epoll_event ev[max];
int n = epoll_wait(efd , ev, max, -1);
int i;
for (i=0;i<n;i++) {
e[i].s = ev[i].data.ptr;
unsigned flag = ev[i].events;
e[i].write = (flag & EPOLLOUT) != 0;
e[i].read = (flag & EPOLLIN) != 0;
}
return n;
}
拿到的 event会在socket_server_poll 中简单处理 :
struct event *e = &ss->ev[ss->event_index++];
struct socket *s = e->s;
if (s == NULL) {
// dispatch pipe message at beginning
continue;
}
switch (s->type) {
case SOCKET_TYPE_CONNECTING:
return report_connect(ss, s, result);
case SOCKET_TYPE_LISTEN: {
int ok = report_accept(ss, s, result);
if (ok > 0) {
return SOCKET_ACCEPT;
} if (ok < 0 ) {
return SOCKET_ERROR;
}
// when ok == 0, retry
break;
}
case SOCKET_TYPE_INVALID:
fprintf(stderr, "socket-server: invalid socket\n");
break;
default:
if (e->read) {
int type;
if (s->protocol == PROTOCOL_TCP) {
type = forward_message_tcp(ss, s, result);
} else {
type = forward_message_udp(ss, s, result);
if (type == SOCKET_UDP) {
// try read again
--ss->event_index;
return SOCKET_UDP;
}
}
if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERROR) {
// Try to dispatch write message next step if write flag set.
e->read = false;
--ss->event_index;
}
if (type == -1)
break;
return type;
}
if (e->write) {
int type = send_buffer(ss, s, result);
if (type == -1)
break;
return type;
}
break;
}
- connection 命令
- 生产对应命令包
- 打断处理循环 直接退出
- Accpet 事件
- 新的socket会自动和这个Listen socket所属的服务单元关联
- 生产对应命令包
- 打断处理循环 直接退出
- Read
- 读出socket的可读数据
- 生产对应命令包
- 打断处理循环 直接退出
- Write
- 写socket所属的写队列缓存数据。
- 生产对应命令包
- 打断处理循环 直接退出
Skynet_socket
包装socket_server 的接口、
void
skynet_socket_exit() {
socket_server_exit(SOCKET_SERVER);
}
...
处理socket_server 的输出socket_message,包装成通用的 skynet_message 加入二级消息队列
static void
forward_message(int type, bool padding, struct socket_message * result) {
struct skynet_socket_message *sm;
size_t sz = sizeof(*sm);
if (padding) {
if (result->data) {
size_t msg_sz = strlen(result->data);
if (msg_sz > 128) {
msg_sz = 128;
}
sz += msg_sz;
} else {
result->data = "";
}
}
sm = (struct skynet_socket_message *)skynet_malloc(sz);
sm->type = type;
sm->id = result->id;
sm->ud = result->ud;
if (padding) {
sm->buffer = NULL;
memcpy(sm+1, result->data, sz - sizeof(*sm));
} else {
sm->buffer = result->data;
}
struct skynet_message message;
message.source = 0;
message.session = 0;
message.data = sm;
message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);
if (skynet_context_push((uint32_t)result->opaque, &message)) {
// todo: report somewhere to close socket
// don't call skynet_socket_close here (It will block mainloop)
skynet_free(sm->buffer);
skynet_free(sm);
}
}
int
skynet_socket_poll() {
struct socket_server *ss = SOCKET_SERVER;
assert(ss);
struct socket_message result;
int more = 1;
int type = socket_server_poll(ss, &result, &more);
switch (type) {
case SOCKET_EXIT:
return 0;
case SOCKET_DATA:
forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
break;
case SOCKET_CLOSE:
forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
break;
//...
更多推荐
已为社区贡献1条内容
所有评论(0)