基于 epoll 的协程调度器——零基础深入浅出 C++20 协程

📅 2026/7/2 1:49:20
基于 epoll 的协程调度器——零基础深入浅出 C++20 协程
以协程为目标涉及到的新语法会简单说明不涉及的不旁征博引* 若语法的原理非常简单也会简单展开讲讲有利于透过现象看本质用起来更得心应手上一篇文章里不光引入了初级的调度器还说明了 final_suspend 与协程自清理的关系、协程句柄通过类型擦除来屏蔽用户定义承诺对象的差异、以及 lambda 表达式的本质是仿函数等如果没有这些内容铺垫看本文时会有很多地方难以理解还没看过的小伙伴墙裂建议先看那篇。工具还是之前介绍过的 C Insights 这里不再用到 Compile Explorer主要是它的运行环境不支持像文件、网络之类的异步 IO为此需要用户自行搭建开发环境。基于 epoll 的 IO 多路复用本文演示的异步 IO 以文件操作为主相比网络操作它具有代码量少、易于测试的优点。为了简化复杂度这里没有接入任何三方库而是直接调用操作系统 raw API阅读本文需要具有 IO 多路复用 (multiplexing) 的知识基础例如 Linux 的 epoll 或 Windows 的 IOCP。在单线程时代想要处理多个 IO 事件也不是不行只要将异步 IO 句柄交给 select / poll / epoll / kqueue 等待即可当任一 IO 事件到达时控制权将从阻塞等待中返回并告知用户哪个句柄上有何种事件发生从而方便用户直接处理那个句柄上的 IO 事件并且预期将不会被阻塞。这种模型因为检测完成后还需要用户动作一下也称为 Reactor 模型相对的还有 Proactor 模型主要是基于 Windows IOCP当事件完成时相应的读、写动作已由系统完成不再需要用户动作故有此区别关于这一点后面在介绍基于 IOCP 的调度器时详述。类 Unix 系统上的 IO 多路分离器比较多早期的 select 就能监控 IO 句柄的读、写、异常三个事件集并且带超时能力后面发展的 poll 消除了 select 对句柄数量的限制Linux 上诞生的 epoll 解决了 select poll 在句柄数量增长时效能线性下降的问题主要优化了句柄集合在用户态与内核态的来回复制、返回时遍历句柄集等性能开销kqueue 则是 BSD 系统上的 epoll 平替两者都支持水平触发与边缘触发两种模式。水平触发意味着只要句柄上有事件分离器就会一直通知上述四个默认都是水平触发适合少量离散数据的场景边缘触发意味着一次通知中如果不将对应的事件处理完下次不会再通知除非有新的事件产生epoll / kqueue 可选边缘触发适合大数据量的场景可以有效缓解高频通知导致的数据传输低效问题。恶补了 IO 多路复用机制相关的知识后考虑到我们是在 Linux 上进行测试这里选取了 epoll 作为分离器。需要注意的是 epoll 不能直接处理普通文件读写需要借助 fifo 文件后面我们会看到这一点话不多说直接上 demo#include coroutine #include unordered_map #include sys/epoll.h #include unistd.h #include fcntl.h #include vector #include stdexcept #include iostream #include sstream #define MAX_EVENTS 10 struct Task { struct promise_type { Task get_return_object() { return {}; } std::suspend_never initial_suspend() { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void return_void() {} void unhandled_exception() { std::terminate(); } }; }; class EpollScheduler { private: int epoll_fd; std::unordered_mapint, std::coroutine_handle io_handles; public: EpollScheduler() { epoll_fd epoll_create(MAX_EVENTS); if (epoll_fd -1) { std::stringstream ss; ss epoll_create failed, error errno; throw std::runtime_error(ss.str()); } } ~EpollScheduler() { close(epoll_fd); } void register_io(int fd, std::coroutine_handle handle) { if (io_handles.find(fd) io_handles.end()) { io_handles[fd] handle; epoll_event event{}; event.events EPOLLIN | EPOLLET; event.data.fd fd; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, event) -1) { std::stringstream ss; ss epoll_ctl failed, error errno; throw std::runtime_error(ss.str()); } } } void run() { while (true) { epoll_event events[MAX_EVENTS] { 0 }; int n epoll_wait(epoll_fd, events, MAX_EVENTS, -1); for (int i 0; i n; i) { int ready_fd events[i].data.fd; if (auto it io_handles.find(ready_fd); it ! io_handles.end()) { it-second.resume(); } } } } }; struct AsyncReadAwaiter { EpollScheduler sched; int fd; std::string buffer; AsyncReadAwaiter(EpollScheduler s, int file_fd, size_t buf_size) : sched(s), fd(file_fd), buffer(buf_size, \0) {} bool await_ready() const { return false; } void await_suspend(std::coroutine_handle h) { sched.register_io(fd, h); } std::string await_resume() { ssize_t n read(fd, buffer.data(), buffer.size()); if (n -1) { std::stringstream ss; ss read failed, error errno; throw std::runtime_error(ss.str()); } buffer.resize(n); return std::move(buffer); } }; Task async_read_file(EpollScheduler sched, const char* path) { int fd open(path, O_RDONLY | O_NONBLOCK); if (fd -1) { std::stringstream ss; ss open failed, error errno; throw std::runtime_error(ss.str()); } while (true) { auto data co_await AsyncReadAwaiter(sched, fd, 4096); std::cout Read data.size() bytes\n; // if (data.size() 0) // break; } close(fd); } int main(int argc, char* argv[]) { if (argc 2) { std::cout Usage: sample pipe std::endl; return 1; } EpollScheduler scheduler; async_read_file(scheduler, argv[1]); scheduler.run(); return 0; }先来看编译公司的开发环境中安装的 gcc 最高版本为 12.1$ /opt/compiler/gcc-12/bin/g --version /opt/compiler/gcc-12/bin/g (GCC) 12.1.0 Copyright (C) 2022 Free Software Foundation, Inc. This is free software; see the source for copying conditions. There is NO warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.经 Compile Explorer 验证可用一点点降低版本尝试发现能编译这段代码的最低 gcc 版本是 11.1如果你需要在本地安装 gcc 的话大于等于这个版本就行。包装一个简单的 Makefileall: sample sample : sample.cpp /opt/compiler/gcc-12/bin/g -stdc20 -o $ $^ mkfifo communication.pipe clean: rm sample communication.pipemkfifo 用于管道文件 (communication.pipe) 的创建。启动 sample 程序后可以在管道另一侧用脚本写一些数据进去for ((i1;i500;i)); do echo hello communication.pipe; done写入 500 个 hello 字符串接收端的 sample 输出如下$ ./sample communication.pipe Read 6 bytes Read 60 bytes Read 6 bytes Read 54 bytes Read 6 bytes Read 6 bytes Read 6 bytes Read 6 bytes Read 6 bytes Read 6 bytes Read 6 bytes Read 6 bytes Read 12 bytes Read 0 bytes Read 6 bytes Read 6 bytes Read 6 bytes Read 6 bytes Read 6 bytes Read 6 bytes ...demo 唯一的参数是 pipe 文件路径。如果使用普通文件做同样的测试$ ./sample sample.cpp terminate called after throwing an instance of std::runtime_error what(): epoll_ctl failed, error 1 Aborted (core dumped)果然报错了这就是开头所说 epoll 不支持普通文件的特性对于普通文件Linux 认为永远可读可写没必要通过 epoll 进行等待所以 epoll_ctl 直接返回 EPERM 了。这个顺便演示了 C20 编译器会对协程体代码进行 try...catch 的逻辑任何未捕获的异常终将调用用户承诺对象的 unhandled_exception 接口这里调了 terminate 来终止进程关于这一点请参考《协程本质是函数加状态机》。代码比较长下面分段看下#include coroutine #include unordered_map #include sys/epoll.h #include unistd.h #include fcntl.h #include vector #include stdexcept #include iostream #include sstream #define MAX_EVENTS 10返回对象定义相比之前经典的定义承诺对象的 final_suspend 未中断协程、返回对象没有析构时销毁协程句柄的动作意味着协程是个启动后“不管”的类型struct Task { struct promise_type { Task get_return_object() { return {}; } std::suspend_never initial_suspend() { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void return_void() {} void unhandled_exception() { std::terminate(); } }; };跳到 main果然没有接收协程体 async_read_file 的返回对象它返回的临时对象将自动析构不影响协程体正常运转int main(int argc, char* argv[]) { if (argc 2) { std::cout Usage: sample pipe std::endl; return 1; } EpollScheduler scheduler; async_read_file(scheduler, argv[1]); scheduler.run(); return 0;