从零开始手写一个协程库(二)

📅 2026/7/2 9:30:30
从零开始手写一个协程库(二)
引言这是关于协程库的第二篇文章如果大家刚开始看这篇文章建议先看一下我之前的这一篇。从零开始手写一个协程库一-CSDN博客本篇文章是对协程fiber类的一个代码实现其中包含了对协程的重新恢复创建删除暂停重利用等操作架构简介对于一个协程它的状态分为三种一种是READY一种是TERM一种是RUNNIG。到底是哪个协程该工作什么时候工作工作完之后状态怎么样怎么处理这都需要一个调度器来决定这个调度器的任务就是来分配工作回收资源可能听起来有点像内存池做的事情。我们这个协程库里做了两手准备第一个是调度器就是单独的一个协程每一次当一个协程执行完之后都要把执行权还给调度器第二个是主协程来调度。然后就是代码的设计我们对于主协程和子协程有不同的处理方式所以自然创建方式也不同。然后我们的主协程是由GetThis创建的因为如果一开始没有任何协程在运行当我们想获取一个协程实例的时候自然就是创建一个协程而这个协程就是我们的主协程#ifndef FIBER_H #define FIBER_H #include memory #include functional #include cstdint #include ucontext.h #include mutex #include atomic #include iostream #include cassert #include vector namespace fengyue { class Fiber : public std::enable_shared_from_thisFiber { public: enum State{ READY, // 就绪状态 RUNNING, // 运行中状态 TERM // 已完成状态 }; private: Fiber(); // 细节Fiber是私有的只能被GetThis方法调用用于创建主协程 public: // 用于创建指定的回调函数栈大小和run_in_scheduler 本协程是否参与调度器的调度默认为true Fiber(bool for_pool); Fiber(std::functionvoid() func, size_t stack_size 0, bool run_in_scheduler true); ~Fiber(); public: void reset(std::functionvoid() func); // 重置协程状态和入口重复利用栈空间不用重新创建栈 void resume(); // 恢复协程执行 void yield(); // 暂停将执行权让给调度 uint64_t getId() const {return m_id;} State getState() const {return m_state;} public: static void SetThis(Fiber* fiber); // 设置当前运行的协程 static std::shared_ptrFiber GetThis(); // 获取当前运行的协程的实例指针 static void SetSchedulerFiber(Fiber* fiber); // 设置调度器协程默认主协程 static uint64_t GetFiberId(); // 获取当前运行的协程的ID static void MainFunc(); // 协程的主函数入口点 private: uint64_t m_id; // 协程的唯一ID State m_state READY; // 协程的状态 uint32_t m_stack_size 0; // 协程的栈大小默认0表示使用默认栈大小 ucontext_t m_ctx; // 协程的上下文 void* m_stack nullptr; // 协程的栈指针 std::functionvoid() m_func; // 协程的回调函数 bool m_runInScheduler true; // 是否将执行器交给调度器默认为true bool m_for_pool false; public: std::mutex m_mutex; // 协程的互斥锁 }; } #endif代码首先我们要声明几个全局变量一个是正在运行的协程指针每一次拿到这个指针就相当于拿到了这个协程里面的某一个运行的协程第二个是主协程的指针这个在创建的时候就已经确定了第三个就是调度协程我们这里默认就是主协程SetThis函数就是设置当前正在运行的协程GetThis这个函数要干两件事情首先第一个if判断如果已经有协程正在运行了那不管是主协程还是子协程说明我们的主协程已经完全创建好了那么我们直接返回一个智能指针。不过大家可能都注意到了我们返回的全部都是t_fiber-shared_from_this()这个我们上篇文章已经提过了作用就是让引用计数1底层原理就是weak_ptr.lock()目的就是防止这个指针因为被调度器拿到离开本作用域而计数-1变成0直接析构销毁再要运行的时候直接报错。而我们1就很好的避免了这个问题只有这个协程完全运行结束我们来手动删除这个后面会有。然后就是创建主线程我们创建主线程用的是Fiber()在这里面设置了t_fiber main_fiber.get()// 正在运行的协程 static thread_local Fiber* t_fiber nullptr; // 主协程 static thread_local std::shared_ptrFiber t_thread_fiber nullptr; // 调度器协程 static thread_local Fiber* t_scheduler_fiber nullptr; // 协程的ID计数器 static std::atomicuint64_t s_fiber_id{0}; // 活跃协程数量计数器 static std::atomicuint64_t s_fiber_count{0}; void Fiber::SetThis(Fiber* fiber) { t_fiber fiber; } // 运行该函数创建主协程 std::shared_ptrFiber Fiber::GetThis() { if (t_fiber) { return t_fiber-shared_from_this(); // 主要是为了让引用计数1 } std::shared_ptrFiber main_fiber(new Fiber()); // 这里面设置了t_fiber main_fiber.get() t_thread_fiber main_fiber; t_scheduler_fiber main_fiber.get(); // 除非主动设置否则主协程默认为调度协程 assert(t_fiber main_fiber.get()); return t_fiber-shared_from_this(); }接下来就是构造主协程和子协程的函数如果是构建主协程说明这个是第一个协程我们要设置状态为RUNNING然后设置为当前运行的协程并获取当前的上下文然后把这个协程的信息设置一下。创建子协程状态时READY因为创建不代表可以运行因为子协程是要真正工作的所以我们先分配栈的大小然后我们先获取上下文然后对这个上下文进行修改最后用makecontext对这个上下文完成替换工作绑定要执行的函数等待之后的激活// 作用创建主协程设置状态初始化上下文并分配ID Fiber::Fiber() { SetThis(this); // 在GetThis中使用了无参的构造函数这里相当于把thismain_fiber的指针地址赋值给了t_fiber m_state RUNNING; if (getcontext(m_ctx)) { std::cerr Fiber() failed\n; pthread_exit(NULL); } m_id s_fiber_id; s_fiber_count; if (debug) { std::cout Fiber():main id m_id std::endl; } } /* 作用创建一个新的协程初始化回调函数栈的大小和状态分配栈空间并通过make修改上下文当set或swap激活ucontext_t的时候 m_ctx上下文就会执行make的第二个参数的函数 */ Fiber::Fiber(std::functionvoid() func, size_t stack_size, bool run_in_scheduler) : m_func(func), m_runInScheduler(run_in_scheduler) { m_state READY; // 分配栈空间 m_stack_size stack_size ? stack_size : 128000; m_stack malloc(m_stack_size); if (getcontext(m_ctx)) { std::cerr Fiber(std::functionvoid() func, size_t stack_size, bool run_in_scheduler) failed\n; pthread_exit(NULL); } m_ctx.uc_link nullptr; // 因为这里没有设置后继所以在运行完mainfunc后会返回到主协程 m_ctx.uc_stack.ss_sp m_stack; // 地址 m_ctx.uc_stack.ss_size m_stack_size; // 大小 makecontext(m_ctx, Fiber::MainFunc, 0); m_id s_fiber_id; s_fiber_count; if (debug) { std::cout Fiber(std::functionvoid() func, size_t stack_size, bool run_in_scheduler):id m_id std::endl; } }对于一个协程的析构函数s_fiber_count是一个全局变量记录着所有的协程数量所以在析构一个协程的时候要--然后释放其对应的栈。我们之所以要有reset函数主要是提高了效率因为一个协程如果事情干完了完全可以再重新分配任务这样避免反复的开辟空间和销毁内存我们直接用原来的栈只需要传递一下新的函数即可所以我们的操作就是给原来协程的m_func赋值新的函数然后利用getcontext和makecontext修改上下文绑定要执行的函数等待之后的激活Fiber::~Fiber() { s_fiber_count--; if (m_stack) { free(m_stack); } if (debug) { std::cout Fiber::~Fiber():id m_id std::endl; } } // 作用重置协程的回调函数并重新设置上下文将协程从TERM状态转换为READY状态 void Fiber::reset(std::functionvoid() func) { assert(m_state TERM m_stack); m_state READY; m_func func; if (getcontext(m_ctx)) { std::cerr Fiber::reset(std::functionvoid() func) failed\n; pthread_exit(NULL); } m_ctx.uc_link nullptr; m_ctx.uc_stack.ss_sp m_stack; m_ctx.uc_stack.ss_size m_stack_size; makecontext(m_ctx, Fiber::MainFunc, 0); }接下来是两个关键的函数一个是resume恢复运行状态因为我们这个项目做了两手准备一个是主协程来调度一个是调度器单独是一个协程所以我们特地用m_runInScheduler变量来记录我们的选择如果为true那么说明我们选择的是后者不过无论是哪一种因为协程的任务已经确定了只需要我们激活因为此时执行权要么在调度器那要么在主协程那所以我们用swapcontext来切换上下文执行我们当前的上下文另一个是yield暂停状态首先这个函数是暂停不是销毁所以我们的状态是READY也就是可以再次resume然后逻辑和resume的恰好相反我们要把执行权返回给主协程或者调度器用swapcontext同时要设施当前执行的协程是调度器或者主协程// 作用将协程的状态设置为running并恢复协程的执行如果m_runInScheduler为true则将上下文切换到调度协程否则切换到主线程的协程 void Fiber::resume() { m_state RUNNING; if (m_runInScheduler) { SetThis(this); // 设置当前工作的协程为this if (swapcontext((t_scheduler_fiber-m_ctx), m_ctx)) { std::cerr Fiber::resume() failed\n; pthread_exit(NULL); } } else { SetThis(this); if (swapcontext((t_thread_fiber-m_ctx), m_ctx)) { std::cerr Fiber::resume() failed\n; pthread_exit(NULL); } } } void Fiber::yield() { assert(m_state RUNNING || m_state TERM); if (m_state ! TERM) { m_state READY; } if (m_runInScheduler) { SetThis(t_scheduler_fiber); if (swapcontext(m_ctx, (t_scheduler_fiber-m_ctx))) { std::cerr Fiber::yield() failed\n; pthread_exit(NULL); } } else { SetThis (t_thread_fiber.get()); if (swapcontext(m_ctx, (t_thread_fiber-m_ctx))) { std::cerr Fiber::yield() failed\n; pthread_exit(NULL); } } }最后一个是我们执行的主函数在代码里面我已经说明了一些细节大家看一看。主要就是对于计数1的理解我们是手动释放也就是reset然后是把一个裸指针交给调度器完成最后的销毁任务void Fiber::MainFunc() { std::shared_ptrFiber cur GetThis(); // GetThis()的shared_from_this的方法让引用计数加1触发里面的if判断 assert(cur ! nullptr); cur-m_func(); // 执行函数 cur-m_func nullptr; cur-m_state TERM; // 函数执行完毕一定要设定为term状态如果是ready说明之后会resume但是term就会被调度器删除 // 运行完毕让出执行权 /* 这里不可以是cur-yield()因为如果我们先调用yield()首先我们会进行swap把cur的上下文全部写入ucp中然后切换到主线程 主线程调度器删除curcur计数-1这个时候cur的计数是1因为cur的引用是在堆上所以即使栈被删除了cur这个指针也不会被删除 如果我们先reset因为raw_ptr是一个裸指针get得来的在栈上那么删除的时候就不会调用任何的析构函数并且把主动权让给了调度器 当栈被删除的时候这个裸指针也会被删除 */ auto raw_ptr cur.get(); cur.reset(); raw_ptr-yield(); }所以基于上述的代码我们也可以很轻松的写出一个简单的协程池而这个协程池的关键就是利用了reset这个函数先创建协程然后不断地利用这些已经创建好的协程// 协程池 class FiberPool { public: FiberPool(size_t size) { for (size_t i 0; i size; i) { fibers.push_back(std::make_sharedFiber(true)); } } std::shared_ptrFiber getFiber(std::functionvoid() func) { for (auto fiber : fibers) { if (fiber-getState() Fiber::READY) { fiber-reset(func); return fiber; } } auto new_fiber std::make_sharedFiber(func); fibers.push_back(new_fiber); return new_fiber; } private: std::vectorstd::shared_ptrFiber fibers; };总结本篇文章就到这里结束了希望可以对大家有所帮助~~~