【Linux】应用层自定义协议与序列化
前面我们提到了五层模型,其中网络层传输层链路层都是负责进行数据传输的,而应用层是根据我们不同的需求进行个性化开发的。应用层通过接收传输的数据,对数据进行处理完成不同的功能。但是,我们在数据传输的过程中,传递的都是字符串,那么应用层该如何识别这些字符串呢?由此,我们需要一份协议来规定好,字符串传递分别代表什么内容。协议是为实现网络数据交换而建立的规则、约定或标准,用于规范通信行为,定义了通信双方如何
目录
2. 重新理解read,write,recv,send为什么支持全双工
一. 应用层协议
前面我们提到了五层模型,其中网络层传输层链路层都是负责进行数据传输的,而应用层是根据我们不同的需求进行个性化开发的。应用层通过接收传输的数据,对数据进行处理完成不同的功能。但是,我们在数据传输的过程中,传递的都是字符串,那么应用层该如何识别这些字符串呢?
由此,我们需要一份协议来规定好,字符串传递分别代表什么内容。
协议是为实现网络数据交换而建立的规则、约定或标准,用于规范通信行为,定义了通信双方如何进行数据交换,包括数据格式、通信过程中的操作和错误处理等。
二. 序列化与反序列化
1. 何为序列化与反序列化
上文我们得知传输间需要协议,网络传输是以字符串形式传输的,那我们如何来管理这些字符串呢?这里我们就需要对数据进行结构化,我们可以约定以 \r\n 为每个数据的分隔,或者索性传结构体,总之需要双份共同有一份存储格式。对于将数据打包后传给网络的行为我们叫做序列化,而从网络接收到数据后进行拆包的行为我们叫做反序列化。
所以简单说序列化与反序列化就是打包和拆包。
2. 重新理解read,write,recv,send为什么支持全双工
我们首先来理解下图

全双工是指双方可以同时接收发送数据,数据传输的本质就是拷贝,将数据以字符串的形式拷贝到缓冲区中,再由缓冲区发送出去,这样底层的传输就无需关注传输的内容,因为它们都是字符串。我们使用这些接口时,都是先将数据拷贝到缓冲区当中,并不是直接进行传输。所以read,write,recv,send这些函数本质就是拷贝函数。
3. 计算器代码实现
3.1 Socket 封装
这里我们将 socket 套接字接口进行封装使用。目前我们熟知的套接字有 Udp 和 Tcp ,它们两者的接口有共同点也有不同点,所以我们可以使用虚函数继承的方式来写类。首先若服务端使用 Udp 套接字,只需要socket和bind即可,客户端只需要socket;若服务端使用 Tcp 那么就要在 Udp 的基础上进行 listen ,客户端不变。两者都有的接口为 send,recv,close等。
我们创建一个 Socket 类作为父类,让 Tcp 和 Udp 继承 Socket 作为子类。后续进行接口封装即可。
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstdlib>
#include "Common.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"
namespace SocketModule
{
using namespace LogModule;
using namespace std;
const static int gbacklog = 16;
class Socket
{
public:
virtual ~Socket() {}
virtual void SocketOrDie() = 0;
virtual void BindOrDie(uint16_t port) = 0;
virtual void ListenOrDie(int backlog) = 0;
virtual void Close() = 0;
virtual shared_ptr<Socket> accept(InetAddr *client) = 0;
virtual int Recv(string *out) = 0;
virtual int Send(const string &message) = 0;
virtual int Connect(const string &server_ip, uint16_t port) = 0;
public:
void BuildTcpSocketMethod(uint16_t port, int backlog = gbacklog)
{
SocketOrDie();
//cout<<"2";
BindOrDie(port);//为啥错?
//cout<<"1"<<endl;
ListenOrDie(backlog);
}
void TcpClientSocket()
{
SocketOrDie();
}
void BuildUdpSocketMethod(uint16_t port)
{
SocketOrDie();
BindOrDie(port);
}
void UdpClientSocket()
{
SocketOrDie();
}
};
const static int defaultnum = -1;
class TcpSocket : public Socket
{
public:
TcpSocket() : _sockfd(defaultnum)
{
}
TcpSocket(int fd) : _sockfd(fd)
{
}
void SocketOrDie() override
{
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
LOG(LogLevel::FATAL) << "socket error";
exit(SOCKET_ERR);
}
LOG(LogLevel::INFO) << "socket success";
int opt = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
}
void BindOrDie(uint16_t port) override
{
InetAddr localaddr(port);
// 这里报错
int n = ::bind(_sockfd, localaddr.NetAddrPtr(), localaddr.NetAddrLen());
if (n < 0)
{
LOG(LogLevel::FATAL) << "bind error";
//cout<<"1";
exit(BIND_ERR);
}
LOG(LogLevel::INFO) << "bind success";
}
void ListenOrDie(int backlog) override
{
int n = ::listen(_sockfd, gbacklog);
if (n < 0)
{
LOG(LogLevel::FATAL) << "listen error";
exit(LISTEN_ERR);
}
LOG(LogLevel::INFO) << "listen success";
}
shared_ptr<Socket> accept(InetAddr *client) override
{
sockaddr_in peer;
socklen_t len = sizeof(peer);
int fd = ::accept(_sockfd, CONV(peer), &len);
if (fd < 0)
{
LOG(LogLevel::WARNING) << "accept error";
return nullptr;
}
client->SetAddr(peer);
return make_shared<TcpSocket>(fd);
}
void Close() override
{
if (_sockfd > 0)
::close(_sockfd);
}
int Recv(string *out) override
{
char buffer[1024];
ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer)-1, 0);
if (n > 0)
{
buffer[n] = 0;
*out += buffer;
}
return n;
}
int Send(const string &message) override
{
return ::send(_sockfd, message.c_str(), message.size(), 0);
}
int Connect(const string &server_ip, uint16_t port) override
{
InetAddr server(server_ip, port);
int n = connect(_sockfd, server.NetAddrPtr(), server.NetAddrLen());
if (n < 0)
{
LOG(LogLevel::FATAL) << "connect error";
exit(CONNECT_ERR);
}
LOG(LogLevel::INFO) << "connect success";
return n;
}
private:
int _sockfd;
};
}
3.2 定制协议
我们这里的协议格式设置的较为简单,报头 + 报文的组合,报头为协议报文内容的长度,报文是我们想要传输的内容。我们用 /r/n 作为分隔符。
接下来我们就可以定制协议了
我们协议需要两个载体,Request 和 Response ,客户端发送请求将数据内容序列化打包到 Request 中,发送给服务端;服务端反序列化接收 Request 拿到数据;服务端进行上层应用层处理计算,将 Request 得到的结果存储到 Response 中,服务端对 Response 打包序列化操作发送回给客户端,最后客户端反序列化接收 Response 得到最终结果。

了解了大致的框架结构我们就开始细究内部的细节。
首先Response和Request都需要序列化和反序列化函数,这里我们用到了Jsoncpp来快速进行键值对应输入和提取。接下里就是协议 Protocol ,首先需要对报文进行处理,给报文添加报头发送到网络中,在网络接收到报文后确定报文是否完整进行验证。然后是获得Response和Request后该如何操作等等
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include <jsoncpp/json/json.h>
#include <functional>
#include "Socket.hpp"
using namespace SocketModule;
using namespace std;
class Request
{
public:
Request()
{
}
Request(int x, int y, char oper)
: _x(x),
_y(y),
_oper(oper)
{
}
string Serialize()
{
Json::Value root;
root["x"] = _x;
root["y"] = _y;
root["oper"] = _oper;
Json::FastWriter writer;
string s = writer.write(root);
return s;
}
bool DeSerialize(string &in)
{
Json::Value root;
Json::Reader reader;
bool ok = reader.parse(in, root);
if (ok)
{
_x = root["x"].asInt();
_y = root["y"].asInt();
_oper = root["oper"].asInt();
}
return ok;
}
~Request() {}
int X() { return _x; }
int Y() { return _y; }
char Oper() { return _oper; }
private:
int _x;
int _y;
char _oper;
};
class Response
{
public:
Response()
{
}
Response(int code, int result)
: _code(code),
_result(result)
{
}
string Serialize()
{
Json::Value root;
root["code"] = _code;
root["result"] = _result;
Json::FastWriter writer;
string s = writer.write(root);
return s;
}
bool DeSerialize(string &in)
{
Json::Value root;
Json::Reader reader;
bool ok = reader.parse(in, root);
if (ok)
{
_code = root["code"].asInt();
_result = root["result"].asInt();
}
return ok;
}
~Response() {}
void SetResult(int res)
{
_result = res;
}
void SetCode(int code)
{
_code = code;
}
void ShowResult()
{
std::cout << "计算结果是: " << _result << "[" << _code << "]" << std::endl;
}
private:
int _code;
int _result;
};
const string sep = "/r/n";
using func_t = function<Response(Request &req)>;
class Protocol
{
public:
Protocol() {}
Protocol(func_t func)
: _func(func)
{
}
string Encode(const string &jsonstr)
{
string len = to_string(jsonstr.size());
return len + sep + jsonstr + sep;
}
bool Decode(string &buffer, string *package)
{
ssize_t pos = buffer.find(sep);
if (pos == string::npos)
return false;
string package_len = buffer.substr(0, pos);
int package_len_int = stoi(package_len);
int target_len = package_len_int + sep.size() * 2 + package_len.size();
if (buffer.size() < target_len)
return false;
*package = buffer.substr(pos + sep.size(), package_len_int);
buffer.erase(0, target_len);
return true;
}
void GetRequest(shared_ptr<Socket> &sock, InetAddr &client)
{
string buffer_queue;
while (true)
{
int n = sock->Recv(&buffer_queue); // 成功
if (n > 0)
{
std::cout << "-----------request_buffer--------------" << std::endl;
std::cout << buffer_queue << std::endl;
std::cout << "------------------------------------" << std::endl;
string json_package;
while (Decode(buffer_queue, &json_package))
{
cout << "-----------request_json--------------" << std::endl;
cout << json_package << std::endl;
cout << "------------------------------------" << std::endl;
cout << "-----------request_buffer--------------" << std::endl;
cout << buffer_queue << std::endl;
cout << "------------------------------------" << std::endl;
LOG(LogLevel::DEBUG) << client.StringAddr() << " 请求: " << json_package;
Request resq;
// 这里错误
bool ok = resq.DeSerialize(json_package);
if (!ok)
continue;
Response rep = _func(resq); // 应该是计算结果然后状态码返回给Response
string jsonstr = rep.Serialize();
string send_str = Encode(jsonstr);
sock->Send(send_str); // 失败
}
}
else if (n == 0)
{
LOG(LogLevel::INFO) << "client:" << client.StringAddr();
break;
}
else
{
LOG(LogLevel::FATAL) << "client" << client.StringAddr() << ",recv error";
break;
}
}
}
bool GetResponse(shared_ptr<Socket> &client, string &resp_buff, Response *resp)
{
while (true)
{
int n = client->Recv(&resp_buff);
if (n > 0)
{
string json_package;
while (Decode(resp_buff, &json_package))
{
resp->DeSerialize(json_package);
}
return true;
}
else if (n == 0)
{
std::cout << "server quit " << std::endl;
return false;
}
else
{
std::cout << "recv error" << std::endl;
return false;
}
}
}
string BuildRequestString(int x, int y, char oper)
{
// 1. 构建一个完整的请求
Request req(x, y, oper);
// 2. 序列化
std::string json_req = req.Serialize();
return Encode(json_req);
}
~Protocol()
{
}
private:
func_t _func;
};
3.3 NetCal 计算处理
计算的步骤很简单,我们可以将计算划分为功能给到上层的应用层进行解耦操作,Cal 接收Request类型返回Response类型。
#pragma once
#include <iostream>
#include "Protocol.hpp"
class Cal
{
public:
Response Excute(Request &req)
{
Response resp(0, 0);
switch (req.Oper())
{
case '+':
resp.SetResult(req.X() + req.Y());
break;
case '-':
resp.SetResult(req.X() - req.Y());
break;
case '*':
resp.SetResult(req.X() * req.Y());
break;
case '/':
{
if (req.Y() == 0)
{
resp.SetCode(1);
}
else
{
resp.SetResult(req.X() / req.Y());
}
}
break;
case '%':
{
if (req.Y() == 0)
{
resp.SetCode(2);
}
else
{
resp.SetResult(req.X() % req.Y());
}
}
break;
default:
resp.SetCode(3);
break;
}
return resp;
}
};
3.4 代码结构
在服务端视角,代码分为三份。第一份是最上层的应用层 NetCal 负责接收协议传上来的数据进行计算操作,接下来是协议层负责设置数据传输的格式设置,最后是网络层负责客户端与服务端之间的通信。
#include "NetCal.hpp"
#include "Protocol.hpp"
#include "TcpServer.hpp"
#include "Daemon.hpp"
#include <memory>
void Usage(std::string proc)
{
std::cerr << "Usage: " << proc << " port" << std::endl;
}
// 我的代码为什么要这样写???
// ./tcpserver 8080
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
//Daemon(0, 0);
//Enable_File_Log_Strategy();
Enable_Console_Log_Strategy();
// 1. 顶层
std::unique_ptr<Cal> cal = std::make_unique<Cal>();
// 2. 协议层
std::unique_ptr<Protocol> protocol = std::make_unique<Protocol>([&cal](Request &req)->Response{
return cal->Excute(req);
});
// 3. 服务器层
std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(std::stoi(argv[1]),
[&protocol](std::shared_ptr<Socket> &sock, InetAddr &client){
protocol->GetRequest(sock, client);
});
tsvr->Start();
return 0;
}
三. 进程间关系与守护进程
1. 进程组
进程组是一个或者多个进程的集合,一个进程组内可以包括多个进程。进程我们用 pid 表示,ppid 表示父进程 id ,进程组我们用 pgid 表示。
这里我们用 ps -o pid,pgid,ppid,comm来查看当前的信息,其中 -o 表示以逗号为分隔

每一个进程组都有一个组长进程,该进程组的 id 就等于组长进程的 id ,我们通过命令查看进程组信息

该进程组组长为 ps进程,cat 进程与ps为同一个组。
组长可以创建一个进程组或创建进程组当中的进程。
进程组的生命周期由最后一个进程离开才算结束,若组长先离开,那么会有新的进程成为组长。
2. 会话
终端是用户窗口,而会话是管理终端与进程组的一个工具。简单理解,终端是手机那么会话就是手机的后台。每个会话中至少存在一个或者多个进程组,我们用 sid 来表示会话的 ID 。
会话分为前台进程和后台进程,前台进程支持用户与终端直接进行交互,但只能存在一个前台进程,后台进程不接受用户输入也不收命令影响。
我们创建进程组后,在后面添加&符号,会将该进程组放到后台运行,若不加&符号,就默认在前台运行。当我们不进行任何操作时,前台进程默认为 bash 进程,bash 进程支持我们执行系统命令。当我们运行文件后,就默认将 bash 进程移到后台,此时我们输入系统命令,bash 就无法进行解析。
了解了会话,接下来我们看看如何创建会话。
我们通常是调用 setseid 函数来创建一个会话
setsid:
#include <unistd.h> pid_t setsid(void);返回值:
成功返回 sid ,失败返回-1
我们创建会话的进程不能是当前进程组的组长进程,当其他进程调用了 setsid 后,调用进程会重新创建一个进程组,并成为新进程组的组长进程。创建的新进程组会与原来的控制终端进行切割。
由于一个进程组默认为组长进程,所以若我们需要调用函数,我们首先进行 fork 创建子进程,将父进程终止,子进程会继承父进程的进程组 ID ,子进程执行 setsid ,这样就不会导致错误。
3. 守护进程
守护进程是运行在后台的进程,它脱离终端既不依赖终端输入也不依赖终端输出,它是脱离独立终端与用户会话的进程,我们通常用其当做垃圾清理。
下面我们就来实现一个守护进程
#pragma once
#include <iostream>
#include <cstdlib>
#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
const char *dev_null = "/dev/null/";
const char *root = "/";
void Daemon(bool chdirec, bool isclose)
{
// 关闭可能引起异常退出的信号
signal(SIGCHLD, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
//创建父进程退出,留下子进程
if (fork() > 0)
exit(1);
// 创建守护进程
setsid();
// 是否更改工作目录
if (chdirec)
chdir(root);
// 关闭终端输入输出,重定向文件
if (isclose)
{
::close(0);
::close(1);
::close(2);
}
else
{
int fd = open(dev_null, O_RDWR);
if (fd > 0)
{
dup2(fd, 0);
dup2(fd, 1);
dup2(fd, 2);
close(fd);
}
}
}
首先我们需要关闭可能会引起信号异常退出的,接着创建父进程退出留下子进程当做守护进程,更改工作目录,关闭终端输入输出并且重定向文件。这样就完成了守护进程。
更多推荐




所有评论(0)