C++线程池

📅 2026/7/5 15:19:30
C++线程池
一、知识储备1.与线程相关知识点无论是单核还是多核IO密集型更适合设计多线程程序不会浪费资源在多核里CPU密集型适合设计多线程程序。2.与C相关知识点2.1 智能指针1、std::make_unique1使用std::make_unique的原因异常安全风险如果在new操作和unique_ptr构造之间发生异常例如在计算构造函数参数时那么new分配的内存可能无法被unique_ptr接管从而发生内存泄漏。make_unique可以解决这个风险make_unique在内部一次性完成内存分配和对象构造。如果构造函数参数的计算过程中抛出异常或者构造函数本身抛出异常因为此时还没有返回unique_ptr所以不会有内存泄漏分配的内存会被自动释放。2关键特性所有权独占std::make unique 创建的 std::unique ptr 拥有对所创建对象的独占所有权。这意味着同一时刻只有一个unique ptr可以指向该对象。所有权可以通过std::move 转移。自动内存管理当unique_ptr被销毁例如离开作用域时它所管理的对象会被自动删除(调用其析构函数并释放内存不支持自定义删除器std::make_unique 不支持指定自定义删除器。如果需要自定义删除器必须直接使用 std::unique_ptr 的构造函数例如 std::unique_ptrT,D ptr(new T,custom_deleter);o不能用于std::shared_ptrstd::make unique 只创建 std::unique ptr。要创建std::shared ptr,应使用 std::make shared。二、需要注意的点1、创建线程列表时为了避免手动释放创建的线程需要使用unique ptr2、线程对象被取消时防止线程函数同时被取消需要设置分离线程3、在设置提交任务返回机制的时候不能用task-getResult()要用Result(task)因为线程执行完tasktask对象就被析构掉了4、Any类型Any类型接受任意数据类型思路1、任意的其他类型——使用模板2、让一个类型指向其他任意类型基类类型可以指向所有派生类型3、Any类型中有一个基类指针成员变量基类指针就可以指向继承这个基类的所以有派生类型4、将派生类型写成模板派生类型任意类型的数据作为成员变量//Any类型可以接收任意数据的类型 class Any { public: templatetypename T Any(T data) :base_(new DeriveT(data)) {} private: //基类类型 class Base { public: virtual ~Base() default; }; //派生类类型 templatetypename T class Derive : public Base { public: Derive(T data):data_(data) {} private: T data_; }; private: //定义一个基类的指针 std::unique_ptrBase base_; };5、死锁问题分析1死锁问题一当线程池结束运行之后main()程序无法正常退出原因分析2死锁问题二在Linux环境下信号量类的析构函数没有释放资源导致死锁问题三、主要代码结构1.数据结构1任务相关class Task { public: Task(); ~Task() default; void exec(); void setResult(Result* res); //用户自定义任意任务类型从Task继承重写run方法实现任务自定义 virtual Any run()0; private: Result * result_; };//实现接收提交到线程池的task任务执行完成后的返回值类型Result class Result { public: Result(std::shared_ptrTask task,bool isValid true); ~Result() default; //问题一setVal方法获取任务执行完的返回值的 void setVal(Any any); //问题二get方法用户调用这个方法获取task的返回值 Any get(); private: Any any_;//储存任务的返回值 Semaphore sem_;//线程通信信号量 std::shared_ptrTask task_;//指向对应获取返回值的任务对象 std::atomic_bool isVaild_;//返回值是否有效 };//实现一个信号量类 class Semaphore { public: Semaphore(int limit 0) :resLimit_(limit), isExit_(false) {} ~Semaphore() { isExit_ true; } //获取一个信号量资源 void wait() { if(isExit_) return; std::unique_lockstd::mutex lock(mtx_); //等待信号量有资源没有资源的话会阻塞当前线程 cond_.wait(lock,[]()-bool {return resLimit_ 0;}); resLimit_--;//消耗一个信号量资源 } //增加一个信号量资源 void post() { if(isExit_) return; std::unique_lockstd::mutex lock(mtx_); resLimit_;//增加信号量资源 cond_.notify_all();//增加信号量资源之后通知 } private: int resLimit_;//资源计数 std::mutex mtx_; std::condition_variable cond_; std::atomic_bool isExit_; };2线程相关class Thread { public: //线程函数对象类型 using ThreadFunc std::functionvoid(int); //线程构造函数 Thread(ThreadFunc func); //析构函数 ~Thread(); //启动线程 void start(); //获取线程id int getId()const; private: ThreadFunc func_; static int generateId_; int threadId_;//保存线程id };3与线程池相关enum class PoolMode //线程池工作模式 { MODE_FIXED,//固定数量的线程 MODE_CACHED//线程数量可动态增长 };//线程池类 class ThreadPool { public: //线程池构造 ThreadPool(); //析构函数 ~ThreadPool(); //设置线程池的工作模式 void setMode(PoolMode mode); //设置task任务队列上限阈值 void settaskQueMaxThreshHold(int threshhold); //设置cached模式下线程数量上限阈值 void setthreadMaxThreshHold(int threshhold); //给线程池提交任务 Result submitTask(std::shared_ptrTask sp); //开启线程池 void start(int initThreadSize std::thread::hardware_concurrency());//hardware_concurrency()系统核心数量 ThreadPool(const ThreadPool)delete; ThreadPool operator(const ThreadPool)delete; private: //定义线程函数 void threadFunc(int threadid); bool checkRunningState() const; private: /* 如果new一个Thread需要delete,避免手动delete, 将vectorThread*改为vectorstd::unique_ptrThread, std::make_unique 是在C14中引入的一个功能。如果你使用的是C11标准 则需要升级到C14或更高版本。 */ //std::vectorstd::unique_ptrThreadthreads_;//线程列表 std::unordered_mapint,std::unique_ptrThreadthreads_; int initThreadSize_;//线程初始数量 std::atomic_int curThreadSize_; //记录当前线程池里面的线程数量总数 std::atomic_int idleThreadSize_;//空闲的线程数量 long unsigned int threadMaxThreshHold_;//线程数量上限阈值 std::queuestd::shared_ptrTasktaskQue_;//任务队列 std::atomic_int taskSize_;//任务初始数量 long unsigned int taskQueMaxThreshHold_;//任务队列数量上限阈值 std::mutex taskQueMtx_;//保证任务队列的线程安全 std::condition_variable notFull_;//表示任务队列的不满 std::condition_variable notEmpty_;//表示任务队列的不空 std::condition_variable exitCond_;//表示等待线程资源全部回收 PoolMode poolMode_;//当前线程池的工作模式 std::atomic_bool isPoolRunning_;//当前线程池的启动状态 };2.函数实现1与任务相关——提交任务//给线程池提交任务 用户调用该接口传入任务对象生产任务 void ThreadPool::submitTask(std::shared_ptrTask sp) { //获取锁 std::unique_lockstd::mutex lock(taskQueMtx_); //线程的通信 等待任务队列有空余 //用户提交任务最长不能阻塞超过1s,否则判断提交任务失败返回 if(!notFull_.wait_for(lock,std::chrono::seconds(1), []()-bool{if(taskQue_.size() taskQueMaxThreshHold_) {std::cout任务队列已满需要等待...std::endl;} return taskQue_.size() taskQueMaxThreshHold_;})) { //表示notFull_等待1s,条件依然没有满足 std::cerrtask queue is full,submit task fail.std::endl; return; } //如果有空余把任务放入任务队列中 taskQue_.emplace(sp); taskSize_; std::cout任务提交成功std::endl; notEmpty_.notify_all(); }2、与线程相关——线程执行函数//定义线程函数 线程池的所有线程从任务队列里面消费任务 void ThreadPool::threadFunc() { for (;;) { std:: shared_ptrTask task; { //先获取锁 std::unique_lockstd::mutexlock(taskQueMtx_); std::coutstd::this_thread::get_id()尝试获取任务...std::endl; //等待notEmpty条件 notEmpty_.wait(lock,[]()-bool{return taskQue_.size() 0;}); std::coutstd::this_thread::get_id()任务获取成功开始执行任务...std::endl; //从任务队列中取一个任务出来 task taskQue_.front(); taskQue_.pop(); taskSize_--; //如果依然有剩余任务继续通知其它的线程执行任务 if (taskQue_.size() 0) { notEmpty_.notify_all(); } //取出一个任务进行通知 notFull_.notify_all(); }//增加作用域当出了作用域锁就会释放掉 //当前线程负责执行这个任务 if(task ! nullptr) { task-run(); } } }3、与线程池相关——开启线程池//开启线程池 void ThreadPool::start(int initThreadSize) { //记录初始线程数量 initThreadSize_initThreadSize; //创建线程对象 for (int i 0; i initThreadSize_; i) { //创建thread线程对象的时候把线程函数给到thred线程对象 auto ptr std::make_uniqueThread(std::bind(ThreadPool::threadFunc,this)); threads_.emplace_back(std::move(ptr));// } //启动所有线程 for (int i 0; i initThreadSize_; i) { threads_[i]-start(); } }4、如何使用线程池