基于完成端口的 IO 多路复用

上文中提到了 Unix 系统中多路复用接口的发展历程:分别经历了 select -> poll -> epoll/kqueue,Windows 则通过完成端口一统江山,其实它俩调用方式差不太多:

epoll IOCP
初始化 epoll_create

CreateIoCompletionPort

关联句柄 epoll_ctl

CreateIoCompletionPort

等待并获取下一个事件 epoll_wait

GetQueuedCompletionStatus

投递事件 n/a (self pipe trick) PostQueuedCompletionStatus
销毁 close CloseHandle

而在可等待对象上,IOCP 则丰富的多:

* 文件 I/O 事件​​
* 文件系统变更
* 套接字(Socket)事件​​
* 命名管道(Named Pipe)事件​​
* 设备 I/O 事件​​
* 定时器事件(结合 Waitable Timer)​​

这方面能与它相提并论的恐怕只有 kqueue 了。有了上面的铺垫再参考之前 epoll 的实现,直接上 demo 源码:

#include <coroutine>
#include <unordered_map>
#include <windows.h>
#include <vector>
#include <stdexcept>
#include <iostream>
#include <sstream>
#include <memory>

struct Task {
    struct promise_type {
        Task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };
};

class IocpScheduler {
private:
    HANDLE iocp_handle;
    std::unordered_map<HANDLE, std::coroutine_handle<>> io_handles;

public:
    IocpScheduler() {
        iocp_handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
        if (iocp_handle == NULL) {
            throw std::runtime_error("CreateIoCompletionPort failed");
        }
    }

    ~IocpScheduler() {
        CloseHandle(iocp_handle);
    }

    void register_io(HANDLE file_handle, std::coroutine_handle<> handle) {
        if (io_handles.find(file_handle) == io_handles.end()) {
            io_handles[file_handle] = handle;

            if (CreateIoCompletionPort(file_handle, iocp_handle, (ULONG_PTR)file_handle, 0) == NULL) {
                throw std::runtime_error("CreateIoCompletionPort failed to associate file handle");
            }
        }
    }

    void run() {
        while (true) {
            DWORD bytes_transferred = 0;
            ULONG_PTR completion_key = 0;
            LPOVERLAPPED overlapped = nullptr;

            BOOL success = GetQueuedCompletionStatus(
                iocp_handle,
                &bytes_transferred,
                &completion_key,
                &overlapped,
                INFINITE);

            if (completion_key != 0) {
                HANDLE ready_handle = (HANDLE)completion_key;
                if (auto it = io_handles.find(ready_handle); it != io_handles.end()) {
                    it->second.resume();
                }
            }
        }
    }
};

struct AsyncReadAwaiter {
    IocpScheduler& sched;
    HANDLE file_handle;
    std::unique_ptr<char[]> buffer;
    DWORD buffer_size;
    OVERLAPPED overlapped;
    DWORD bytes_read;

    AsyncReadAwaiter(IocpScheduler& s, HANDLE file, DWORD size)
        : sched(s), file_handle(file), buffer_size(size), bytes_read(0) {
        buffer = std::make_unique<char[]>(size);
        ZeroMemory(&overlapped, sizeof(OVERLAPPED));
    }

    bool await_ready() const {
        return false;
    }

    void await_suspend(std::coroutine_handle<> h) {
        sched.register_io(file_handle, h);
        
        if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) {
            DWORD error = GetLastError();
            if (error != ERROR_IO_PENDING) {
                std::stringstream ss;
                ss << "ReadFile failed, error " << error;
                throw std::runtime_error(ss.str());
            }
        }
    }

    std::string await_resume() {
        DWORD bytes_transferred = 0;
        if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
            DWORD error = GetLastError();
            std::stringstream ss;
            ss << "GetOverlappedResult failed, error " << error;
            throw std::runtime_error(ss.str());
        }

        return std::string(buffer.get(), bytes_transferred);
    }
};

Task async_read_file(IocpScheduler& sched, const char* path) {
    HANDLE file_handle = CreateFileA(
        path,
        GENERIC_READ,
        FILE_SHARE_READ,
        NULL,
        OPEN_EXISTING,
        FILE_FLAG_OVERLAPPED,
        NULL);

    if (file_handle == INVALID_HANDLE_VALUE) {
        std::stringstream ss;
        ss << "CreateFile failed, error " << GetLastError();
        throw std::runtime_error(ss.str());
    }

    while (true) {
        auto data = co_await AsyncReadAwaiter(sched, file_handle, 4096);
        std::cout << "Read " << data.size() << " bytes\n";
        if (data.size() == 0) {
            break;
        }
    }

    CloseHandle(file_handle);
}

int main(int argc, char* argv[]) {
    if (argc < 2) {
        std::cout << "Usage: sample file_path" << std::endl;
        return 1;
    }

    IocpScheduler scheduler;
    async_read_file(scheduler, argv[1]);
    scheduler.run();
    return 0;
}

先看编译:

image

Compile Explorer 中指定最新的 msvc 编译器和 C++20 选项可以编译通过,注意在 Windows 中选项指定的语法与 Unix 大相径庭,别弄错了。

一点一点降低版本尝试,发现能编译这段代码的最低版本是 msvc19.29,对应 vs16.11,如果你需要在本地安装测试环境的话,稳妥起见安装 msvc19.30、对应 vs17.0 也就是  VS2022 比较好,如果本地只有 VS2019,需要升级到第五个也就是最后一个发行版才可以。

image

接下来创建一个简单的控制台应用包含上面的源文件,需要配置一下 C++ 语言标准:

image

更多推荐