目录

一. 应用层协议

二. 序列化与反序列化

1. 何为序列化与反序列化

2. 重新理解read,write,recv,send为什么支持全双工

3. 计算器代码实现

3.1 Socket 封装

3.2 定制协议

3.3 NetCal 计算处理

3.4 代码结构

三. 进程间关系与守护进程

1. 进程组

2. 会话

3. 守护进程


一. 应用层协议

前面我们提到了五层模型,其中网络层传输层链路层都是负责进行数据传输的,而应用层是根据我们不同的需求进行个性化开发的。应用层通过接收传输的数据,对数据进行处理完成不同的功能。但是,我们在数据传输的过程中,传递的都是字符串,那么应用层该如何识别这些字符串呢?

由此,我们需要一份协议来规定好,字符串传递分别代表什么内容。

协议是为实现网络数据交换而建立的规则、约定或标准,用于规范通信行为,定义了通信双方如何进行数据交换,包括数据格式、通信过程中的操作和错误处理等。

二. 序列化与反序列化

1. 何为序列化与反序列化

上文我们得知传输间需要协议,网络传输是以字符串形式传输的,那我们如何来管理这些字符串呢?这里我们就需要对数据进行结构化,我们可以约定以 \r\n 为每个数据的分隔,或者索性传结构体,总之需要双份共同有一份存储格式。对于将数据打包后传给网络的行为我们叫做序列化,而从网络接收到数据后进行拆包的行为我们叫做反序列化。

所以简单说序列化与反序列化就是打包和拆包。

2. 重新理解read,write,recv,send为什么支持全双工

我们首先来理解下图

全双工是指双方可以同时接收发送数据,数据传输的本质就是拷贝,将数据以字符串的形式拷贝到缓冲区中,再由缓冲区发送出去,这样底层的传输就无需关注传输的内容,因为它们都是字符串。我们使用这些接口时,都是先将数据拷贝到缓冲区当中,并不是直接进行传输。所以read,write,recv,send这些函数本质就是拷贝函数。

3. 计算器代码实现

3.1 Socket 封装

这里我们将 socket 套接字接口进行封装使用。目前我们熟知的套接字有 Udp 和 Tcp ,它们两者的接口有共同点也有不同点,所以我们可以使用虚函数继承的方式来写类。首先若服务端使用 Udp 套接字,只需要socket和bind即可,客户端只需要socket;若服务端使用 Tcp 那么就要在 Udp 的基础上进行 listen ,客户端不变。两者都有的接口为 send,recv,close等。

我们创建一个 Socket 类作为父类,让 Tcp 和 Udp 继承 Socket 作为子类。后续进行接口封装即可。

#pragma once

#include <iostream>
#include <string>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <cstdlib>
#include "Common.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"

namespace SocketModule
{
    using namespace LogModule;
    using namespace std;

    const static int gbacklog = 16;

    class Socket
    {
    public:
        virtual ~Socket() {}
        virtual void SocketOrDie() = 0;
        virtual void BindOrDie(uint16_t port) = 0;
        virtual void ListenOrDie(int backlog) = 0;
        virtual void Close() = 0;
        virtual shared_ptr<Socket> accept(InetAddr *client) = 0;
        virtual int Recv(string *out) = 0;
        virtual int Send(const string &message) = 0;
        virtual int Connect(const string &server_ip, uint16_t port) = 0;

    public:
        void BuildTcpSocketMethod(uint16_t port, int backlog = gbacklog)
        {
            SocketOrDie();
            //cout<<"2";
            BindOrDie(port);//为啥错?
            //cout<<"1"<<endl;
            ListenOrDie(backlog);
        }
        void TcpClientSocket()
        {
            SocketOrDie();
        }
        void BuildUdpSocketMethod(uint16_t port)
        {
            SocketOrDie();
            BindOrDie(port);
        }
        void UdpClientSocket()
        {
            SocketOrDie();
        }
    };

    const static int defaultnum = -1;
    class TcpSocket : public Socket
    {
    public:
        TcpSocket() : _sockfd(defaultnum)
        {
        }
        TcpSocket(int fd) : _sockfd(fd)
        {
        }
        void SocketOrDie() override
        {
            _sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
            if (_sockfd < 0)
            {
                LOG(LogLevel::FATAL) << "socket error";
                exit(SOCKET_ERR);
            }
            LOG(LogLevel::INFO) << "socket success";

            int opt = 1;
            setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        }

        void BindOrDie(uint16_t port) override
        {
            InetAddr localaddr(port);
            // 这里报错
            int n = ::bind(_sockfd, localaddr.NetAddrPtr(), localaddr.NetAddrLen());
            if (n < 0)
            {
                LOG(LogLevel::FATAL) << "bind error";
                //cout<<"1";
                exit(BIND_ERR);
            }
            LOG(LogLevel::INFO) << "bind success";
        }

        void ListenOrDie(int backlog) override
        {
            int n = ::listen(_sockfd, gbacklog);
            if (n < 0)
            {
                LOG(LogLevel::FATAL) << "listen error";
                exit(LISTEN_ERR);
            }
            LOG(LogLevel::INFO) << "listen success";
        }

        shared_ptr<Socket> accept(InetAddr *client) override
        {
            sockaddr_in peer;
            socklen_t len = sizeof(peer);
            int fd = ::accept(_sockfd, CONV(peer), &len);
            if (fd < 0)
            {
                LOG(LogLevel::WARNING) << "accept error";
                return nullptr;
            }
            client->SetAddr(peer);
            return make_shared<TcpSocket>(fd);
        }

        void Close() override
        {
            if (_sockfd > 0)
                ::close(_sockfd);
        }

        int Recv(string *out) override
        {
            char buffer[1024];
            ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer)-1, 0);
            if (n > 0)
            {
                buffer[n] = 0;
                *out += buffer;
            }
            return n;
        }

        int Send(const string &message) override
        {
            return ::send(_sockfd, message.c_str(), message.size(), 0);
        }

        int Connect(const string &server_ip, uint16_t port) override
        {
            InetAddr server(server_ip, port);
            int n = connect(_sockfd, server.NetAddrPtr(), server.NetAddrLen());
            if (n < 0)
            {
                LOG(LogLevel::FATAL) << "connect error";
                exit(CONNECT_ERR);
            }
            LOG(LogLevel::INFO) << "connect success";
            return n;
        }

    private:
        int _sockfd;
    };
}

3.2 定制协议

我们这里的协议格式设置的较为简单,报头 + 报文的组合,报头为协议报文内容的长度,报文是我们想要传输的内容。我们用 /r/n 作为分隔符。

接下来我们就可以定制协议了

我们协议需要两个载体,Request 和 Response ,客户端发送请求将数据内容序列化打包到 Request 中,发送给服务端;服务端反序列化接收 Request 拿到数据;服务端进行上层应用层处理计算,将 Request 得到的结果存储到 Response 中,服务端对 Response 打包序列化操作发送回给客户端,最后客户端反序列化接收 Response 得到最终结果。

了解了大致的框架结构我们就开始细究内部的细节。

 首先Response和Request都需要序列化和反序列化函数,这里我们用到了Jsoncpp来快速进行键值对应输入和提取。接下里就是协议 Protocol ,首先需要对报文进行处理,给报文添加报头发送到网络中,在网络接收到报文后确定报文是否完整进行验证。然后是获得Response和Request后该如何操作等等

#pragma once
#include <iostream>
#include <string>
#include <memory>
#include <jsoncpp/json/json.h>
#include <functional>
#include "Socket.hpp"

using namespace SocketModule;
using namespace std;

class Request
{
public:
    Request()
    {
    }

    Request(int x, int y, char oper)
        : _x(x),
          _y(y),
          _oper(oper)
    {
    }

    string Serialize()
    {
        Json::Value root;
        root["x"] = _x;
        root["y"] = _y;
        root["oper"] = _oper;
        Json::FastWriter writer;
        string s = writer.write(root);
        return s;
    }

    bool DeSerialize(string &in)
    {
        Json::Value root;
        Json::Reader reader;
        bool ok = reader.parse(in, root);
        if (ok)
        {
            _x = root["x"].asInt();
            _y = root["y"].asInt();
            _oper = root["oper"].asInt();
        }
        return ok;
    }

    ~Request() {}
    int X() { return _x; }
    int Y() { return _y; }
    char Oper() { return _oper; }

private:
    int _x;
    int _y;
    char _oper;
};

class Response
{
public:
    Response()
    {
    }
    Response(int code, int result)
        : _code(code),
          _result(result)
    {
    }

    string Serialize()
    {
        Json::Value root;
        root["code"] = _code;
        root["result"] = _result;
        Json::FastWriter writer;
        string s = writer.write(root);
        return s;
    }

    bool DeSerialize(string &in)
    {
        Json::Value root;
        Json::Reader reader;
        bool ok = reader.parse(in, root);
        if (ok)
        {
            _code = root["code"].asInt();
            _result = root["result"].asInt();
        }
        return ok;
    }

    ~Response() {}
    void SetResult(int res)
    {
        _result = res;
    }
    void SetCode(int code)
    {
        _code = code;
    }
    void ShowResult()
    {
        std::cout << "计算结果是: " << _result << "[" << _code << "]" << std::endl;
    }

private:
    int _code;
    int _result;
};

const string sep = "/r/n";

using func_t = function<Response(Request &req)>;

class Protocol
{
public:
    Protocol() {}
    Protocol(func_t func)
        : _func(func)
    {
    }

    string Encode(const string &jsonstr)
    {
        string len = to_string(jsonstr.size());
        return len + sep + jsonstr + sep;
    }

    bool Decode(string &buffer, string *package)
    {
        ssize_t pos = buffer.find(sep);
        if (pos == string::npos)
            return false;
        string package_len = buffer.substr(0, pos);
        int package_len_int = stoi(package_len);
        int target_len = package_len_int + sep.size() * 2 + package_len.size();
        if (buffer.size() < target_len)
            return false;
        *package = buffer.substr(pos + sep.size(), package_len_int);
        buffer.erase(0, target_len);
        return true;
    }

    void GetRequest(shared_ptr<Socket> &sock, InetAddr &client)
    {
        string buffer_queue;
        while (true)
        {
            int n = sock->Recv(&buffer_queue); // 成功
            if (n > 0)
            {
                std::cout << "-----------request_buffer--------------" << std::endl;
                std::cout << buffer_queue << std::endl;
                std::cout << "------------------------------------" << std::endl;
                string json_package;
                while (Decode(buffer_queue, &json_package))
                {
                    cout << "-----------request_json--------------" << std::endl;
                    cout << json_package << std::endl;
                    cout << "------------------------------------" << std::endl;

                    cout << "-----------request_buffer--------------" << std::endl;
                    cout << buffer_queue << std::endl;
                    cout << "------------------------------------" << std::endl;

                    LOG(LogLevel::DEBUG) << client.StringAddr() << " 请求: " << json_package;
                    Request resq;
                    // 这里错误
                    bool ok = resq.DeSerialize(json_package);
                    if (!ok)
                        continue;

                    Response rep = _func(resq); // 应该是计算结果然后状态码返回给Response

                    string jsonstr = rep.Serialize();

                    string send_str = Encode(jsonstr);

                    sock->Send(send_str); // 失败
                }
            }
            else if (n == 0)
            {
                LOG(LogLevel::INFO) << "client:" << client.StringAddr();
                break;
            }
            else
            {
                LOG(LogLevel::FATAL) << "client" << client.StringAddr() << ",recv error";
                break;
            }
        }
    }

    bool GetResponse(shared_ptr<Socket> &client, string &resp_buff, Response *resp)
    {
        while (true)
        {
            int n = client->Recv(&resp_buff);
            if (n > 0)
            {
                string json_package;
                while (Decode(resp_buff, &json_package))
                {
                    resp->DeSerialize(json_package);
                }
                return true;
            }
            else if (n == 0)
            {
                std::cout << "server quit " << std::endl;
                return false;
            }
            else
            {
                std::cout << "recv error" << std::endl;
                return false;
            }
        }
    }

    string BuildRequestString(int x, int y, char oper)
    {
        // 1. 构建一个完整的请求
        Request req(x, y, oper);

        // 2. 序列化
        std::string json_req = req.Serialize();

        return Encode(json_req);
    }

    ~Protocol()
    {
    }

private:
    func_t _func;
};

3.3 NetCal 计算处理

计算的步骤很简单,我们可以将计算划分为功能给到上层的应用层进行解耦操作,Cal 接收Request类型返回Response类型。

#pragma once
#include <iostream>
#include "Protocol.hpp"

class Cal
{
public:
    Response Excute(Request &req)
    {
        Response resp(0, 0);
        switch (req.Oper())
        {
        case '+':
            resp.SetResult(req.X() + req.Y());
            break;
        case '-':
            resp.SetResult(req.X() - req.Y());
            break;
        case '*':
            resp.SetResult(req.X() * req.Y());
            break;
        case '/':
        {
            if (req.Y() == 0)
            {
                resp.SetCode(1);
            }
            else
            {
                resp.SetResult(req.X() / req.Y());
            }
        }
        break;
        case '%':
        {
            if (req.Y() == 0)
            {
                resp.SetCode(2);
            }
            else
            {
                resp.SetResult(req.X() % req.Y());
            }
        }
        break;
        default:
            resp.SetCode(3);
            break;
        }
        return resp;
    }
};

3.4 代码结构

在服务端视角,代码分为三份。第一份是最上层的应用层 NetCal 负责接收协议传上来的数据进行计算操作,接下来是协议层负责设置数据传输的格式设置,最后是网络层负责客户端与服务端之间的通信。

#include "NetCal.hpp"
#include "Protocol.hpp"
#include "TcpServer.hpp"
#include "Daemon.hpp"
#include <memory>

void Usage(std::string proc)
{
    std::cerr << "Usage: " << proc << " port" << std::endl;
}

// 我的代码为什么要这样写???
// ./tcpserver 8080
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        Usage(argv[0]);
        exit(USAGE_ERR);
    }
    //Daemon(0, 0);
    //Enable_File_Log_Strategy();
    Enable_Console_Log_Strategy();

    // 1. 顶层
    std::unique_ptr<Cal> cal = std::make_unique<Cal>();

    // 2. 协议层
    std::unique_ptr<Protocol> protocol = std::make_unique<Protocol>([&cal](Request &req)->Response{
        return cal->Excute(req);
    });

    // 3. 服务器层
    std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(std::stoi(argv[1]),
        [&protocol](std::shared_ptr<Socket> &sock, InetAddr &client){
            protocol->GetRequest(sock, client);
    });

    tsvr->Start();

    return 0;
}

三. 进程间关系与守护进程

1. 进程组

进程组是一个或者多个进程的集合,一个进程组内可以包括多个进程。进程我们用 pid 表示,ppid 表示父进程 id ,进程组我们用 pgid 表示。

这里我们用 ps -o pid,pgid,ppid,comm来查看当前的信息,其中 -o 表示以逗号为分隔

每一个进程组都有一个组长进程,该进程组的 id 就等于组长进程的 id ,我们通过命令查看进程组信息

该进程组组长为 ps进程,cat 进程与ps为同一个组。

组长可以创建一个进程组或创建进程组当中的进程。

进程组的生命周期由最后一个进程离开才算结束,若组长先离开,那么会有新的进程成为组长。

2. 会话

终端是用户窗口,而会话是管理终端与进程组的一个工具。简单理解,终端是手机那么会话就是手机的后台。每个会话中至少存在一个或者多个进程组,我们用 sid 来表示会话的 ID 。

会话分为前台进程和后台进程,前台进程支持用户与终端直接进行交互,但只能存在一个前台进程,后台进程不接受用户输入也不收命令影响。

我们创建进程组后,在后面添加&符号,会将该进程组放到后台运行,若不加&符号,就默认在前台运行。当我们不进行任何操作时,前台进程默认为 bash 进程,bash 进程支持我们执行系统命令。当我们运行文件后,就默认将 bash 进程移到后台,此时我们输入系统命令,bash 就无法进行解析。

了解了会话,接下来我们看看如何创建会话。

我们通常是调用 setseid 函数来创建一个会话

setsid:

#include <unistd.h>
pid_t setsid(void);

返回值:

成功返回 sid ,失败返回-1

我们创建会话的进程不能是当前进程组的组长进程,当其他进程调用了 setsid 后,调用进程会重新创建一个进程组,并成为新进程组的组长进程。创建的新进程组会与原来的控制终端进行切割。

由于一个进程组默认为组长进程,所以若我们需要调用函数,我们首先进行 fork 创建子进程,将父进程终止,子进程会继承父进程的进程组 ID ,子进程执行 setsid ,这样就不会导致错误。

3. 守护进程

守护进程是运行在后台的进程,它脱离终端既不依赖终端输入也不依赖终端输出,它是脱离独立终端与用户会话的进程,我们通常用其当做垃圾清理。

下面我们就来实现一个守护进程

#pragma once

#include <iostream>
#include <cstdlib>
#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

const char *dev_null = "/dev/null/";
const char *root = "/";

void Daemon(bool chdirec, bool isclose)
{
    // 关闭可能引起异常退出的信号
    signal(SIGCHLD, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);

    //创建父进程退出,留下子进程
    if (fork() > 0)
        exit(1);

    // 创建守护进程
    setsid();

    // 是否更改工作目录
    if (chdirec)
        chdir(root);

    // 关闭终端输入输出,重定向文件
    if (isclose)
    {
        ::close(0);
        ::close(1);
        ::close(2);
    }
    else
    {
        int fd = open(dev_null, O_RDWR);
        if (fd > 0)
        {
            dup2(fd, 0);
            dup2(fd, 1);
            dup2(fd, 2);
            close(fd);
        }
    }
}

首先我们需要关闭可能会引起信号异常退出的,接着创建父进程退出留下子进程当做守护进程,更改工作目录,关闭终端输入输出并且重定向文件。这样就完成了守护进程。

Logo

一起探索未来云端世界的核心,云原生技术专区带您领略创新、高效和可扩展的云计算解决方案,引领您在数字化时代的成功之路。

更多推荐