《cached模式线程池:线程池析构之后,线程资源回收问题》-CSDN博客
在完成线程池的完整工作之后,实际在测试的时候,发现程序有问题。
线程池项目测试.cpp:
//线程池项目.cpp:此文件包含"main"函数。程序将在此处开始并结束。
//#include<iostream>
#include<chrono>
#include<thread>
using namespace std;#include"threadpool.hpp"/*
有些场景,线程池是希望能够获取线程执行完任务以后的返回值的
举例:
计算 1 + ...+ 30000 的和
thread1 1 + ... + 10000
thread2 10000 + ... + 20000
thread3 20000 + ... + 30000
...
main thread: 给每一个线程分配计算的区间,并等待他们计算完返回结果,合并最终的结果即可。
*/using ulong = unsigned long long;class MyTask : public Task
{
public:MyTask(int begin, int end):begin_(begin), end_(end){}// 问题一:怎么设计run函数的返回值,可以表示任意类型。Any run(){std::cout << "tid: " << std::this_thread::get_id() << "begin!" << std::endl;std::this_thread::sleep_for(std::chrono::seconds(3));ulong sum = 0;for (int i = begin_; i <= end_; i++)sum += i;std::cout << "tid: " << std::this_thread::get_id() << "end!" << std::endl;return sum;}
private:int begin_;int end_;
};int main()
{{ThreadPool pool;pool.start(4);Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 100000000));Result res2 = pool.submitTask(std::make_shared<MyTask>(1, 100000000));ulong sum1 = res1.get().cast_<ulong>();cout << sum1 << endl;}cout << "main over!" << endl;getchar();#if 0{ThreadPool pool;// 用户自己设置线程池的工作模式pool.setMode(PoolMode::MODE_CACHED);pool.start(4);srand(time(0));// 如何设计这里的Result机制呢?size_t begin1 = clock();Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 100000000));Result res2 = pool.submitTask(std::make_shared<MyTask>(100000001, 200000000));Result res3 = pool.submitTask(std::make_shared<MyTask>(200000001, 300000000));pool.submitTask(std::make_shared<MyTask>(200000001, 300000000));pool.submitTask(std::make_shared<MyTask>(200000001, 300000000));pool.submitTask(std::make_shared<MyTask>(200000001, 300000000));ulong sum1 = res1.get().cast_<ulong>();ulong sum2 = res2.get().cast_<ulong>();ulong sum3 = res3.get().cast_<ulong>();size_t end1 = clock();// Master - Slave线程模型// Master线程用来分解任务,然后给各个Slaver线程分配任务// 等待各个Slaver线程执行完成任务,返回结果// Master线程合并各个任务结果,输出// cout << "测试结果:" << (sum1 + sum2 + sum3) << "花费时间:" << end1 - begin1 << endl;1+...300000000正确结果//size_t begin2 = clock();//ulong sum = 0;//for (int i = 1; i <= 300000000; i++)// sum += i;//size_t end2 = clock();//cout << "正确结果:" << sum << "花费时间:" << end2 - begin2 << endl;}/*pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());*/getchar();
#endif
}
问题出现:
只退出了三个线程,正在执行的线程没有退出。主线程getchar并没有被执行,被阻塞了。
分析一下原因,为什么有一个线程没有退出呢?
这是我们之前写的代码,我们来分析一下。
线程池在析构的时候,此时,在线程池中的线程,我们分析了两种情况。针对这两种情况来讲,是没什么问题的。线程池在析构的时候将自己的运行状态已经改为false。
情况一:线程在等待
- 针对第一种情况:如果线程此时在等待状态。
线程池要析构,notEmpty_.notify_all()唤醒所有正在等待的线程,主线程exitCond_wait获取锁,由就绪状态进入等待状态,释放锁。线程notEmpty_.wait()被唤醒,线程状态由等待状态变为阻塞状态,再获取锁,就绪状态,继续执行,释放锁。发现线程池未启动,释放资源,唤醒主线程。主线程被唤醒,由等待状态变为阻塞状态,获取锁,就进入绪状态,判断谓词条件是否成立,若成立,则退出,不成立,则继续释放锁,由就绪状态进入等待状态。
情况二:线程在执行任务
- 针对第二种情况:如果线程此时在执行任务。
线程池要析构,获取锁,主线程由就绪状态进入等待状态。线程执行完任务之后,释放资源,唤醒主线程,同时发现while循环条件不满足,线程函数结束,线程退出。主线程被唤醒,由等待状态进入阻塞状态,获取锁,进入就绪状态,判断谓词条件是否成立,若成立,则退出,不成立,则继续释放锁,由就绪状态进入等待状态。
这两种情况的分析是没什么问题的,但是线程池内的线程还有一种情况,线程在获取锁!
那么,这就会导致一个问题,如果主线程也在获取锁呢?就会竞争同一把锁。有两种情况,如果主线程先竞争到锁或者线程池的线程执行函数线程先竞争到锁。
情况三:线程在竞争锁
- 第一种情况:主线程先竞争到锁。
主线程先竞争到锁,线程池中的线程处于阻塞状态,主线程接着exitCond_wiat()由就绪状态进入等待状态,同时释放锁(注意:此时谓词条件肯定不满足,因为线程池中还有线程没回收呢)。线程池中的线程再获取到锁,notEmpty_.wait()由就绪状态进入等待状态,但是此时,已经没有条件变量来唤醒了线程池中的线程了(线程池出了作用域,线程列表也已经没有任务了)。
所以,线程池中的线程就一直处于等待状态,主线程也一直处于等待状态。此时就造成了死锁!
- 第二种情况:线程池中的线程先竞争到锁。
线程池中的线程先竞争到锁,此时,主线程在阻塞。线程竞争到锁之后,notEmpty_.wait()由就绪状态进入等待状态,同时,释放锁。主线程获取到锁,由阻塞状态进入就绪状态,继续往后执行,exitCond_.wait()由就绪状态进入等待状态,同时释放锁。但是,此时,已经没有条件变量来唤醒主线程了。
所以,线程池中的线程一直处于等待状态,主线程也一直处于等待状态。此时就造成了死锁!
最终的问题就是,线程池中的线程在notEmpty_.wait()一直处于等待状态,得不到notify。
那么,我们之前写的代码还是有缺陷的。
将noetEmpty_.notify_all()放在获取锁之后,能不能解决死锁问题呢?
- 第一种情况:如果主线程先获取到锁。
主线程先获取到锁,紧接着notEmpty_.notfiy_all()唤醒线程池中的线程,但是没起什么作用,因为线程池中的线程还处于阻塞状态。然后,exitCond_.wait()由就绪状态进入等待状态,同时释放锁。线程池中的线程获取到锁,由阻塞状态进入就绪状态,继续向后运行,notEmpty_.wait()由就绪状态进入等待状态,同时释放锁。
还是同样的问题,主线程和线程池中的线程都一直处在等待状态,而没有被唤醒。死锁!
- 第二种情况:如果线程池中线程先获取到锁。
线程池中线程先竞争到锁,notEmpty_.wait()进入等待状态,同时释放锁。此时,主线程由阻塞状态进入就绪状态,之后notEmpty_.notify_all()唤醒线程中线程,主线程exitCond_.wait()由就绪状态进入等待状态,同时释放锁。线程池中线程被唤醒,由等待状态进入阻塞状态,同时获取到锁,进入就绪状态,继续向后运行,同时释放锁。释放资源,再唤醒主线程。主线程被唤醒,由等待状态进入阻塞状态,同时获取到锁,进入就绪状态,判断谓词条件成立,退出。
此时,第二种情况下的死锁问题解决了。
但是第一种情况如何解决呢?线程池在析构的时候,已经将运行状态改为false。所以,在线程池中线程的线程函数中使用双重判断即可。
此时,无论是主线程先获取锁,还是线程池中线程函数线程先获取锁。线程池中线程都会先判断第一个while条件不满足,跳出while循环,再次判断第二个while条件不满足。释放资源,唤醒主线程。
来看一下如何设计的。
// 线程池析构
ThreadPool:: ~ThreadPool()
{isPoolRunning_ = false;// 等待线程池里面所有的线程返回 std::unique_lock<std::mutex> lock(taskQueMtx_);// 唤醒所有阻塞等待的线程notEmpty_.notify_all();exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}
// 定义线程函数 线程池的所有线程从任务队列里面消费任务
void ThreadPool::threadFunc(int threadid)
{auto lastTime = std::chrono::high_resolution_clock().now();while (isPoolRunning_){std::shared_ptr<Task> task;{// 先获取锁std::unique_lock<std::mutex> lock(taskQueMtx_);// 下面测试用的std::cout << "tid: " << std::this_thread::get_id() << "尝试获取任务..." << std::endl;// cached模式下,有可能已经创建了很多线程,但是空闲线程如果超过了60s,就应该把多余的线 程// 结束回收掉(超过initThreadSize_的线程要进行回收)// 当前时间 - 上一次线程执行完的时间 > 60s// 每秒钟返回一次 怎么区分超时返回,还是有任务待执行返回?// 锁+双重判断while (isPoolRunning_ && taskQue_.size() == 0){if (poolMode_ == PoolMode::MODE_CACHED){// 条件变量 超时返回了if (std::cv_status::timeout == notEmpty_.wait_for(lock, std::chrono::seconds(1))){// 当前时间auto now = std::chrono::high_resolution_clock().now();// 空闲时间auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);if (dur.count() >= THREAD_MAX_IDLE_TIME && curThreadSize_ > initThreadSize_){// 开始回收当前线程// 记录线程数量的相关变量的值修改// 把线程对象从线程列表删除threads_.erase(threadid);curThreadSize_--;idleThreadSize_--;std::cout << "threadid:" << std::this_thread::get_id() << " exit" << std::endl;return;}}}else{// 等待任务队列不为空 notEmpty条件 notEmpty_.wait(lock); }// 1.线程本身在阻塞等待 线程池要结束 回收线程资源//if (!isPoolRunning_)//{// 线程回收// //threads_.erase(threadid);// //std::cout << "threadid:" << std::this_thread::get_id() << " exit" << std::endl;// 唤醒线程池// //exitCond_.notify_all();// //return;//}}if (!isPoolRunning_){break;}idleThreadSize_--;// 下面测试用的std::cout << "tid: " << std::this_thread::get_id() << "获取任务成功..." << std::endl;// 从任务队列获取任务task = taskQue_.front();taskQue_.pop();taskSize_--;// 如果依然有剩余任务,继续通知其他线程来获取执行任务,提高了多线程同时并发获取任务、处理任务的能力if (taskQue_.size() > 0){notEmpty_.notify_all();}// 取出一个任务之后,进行通知,通知其他用户可以继续提交任务notFull_.notify_all();}// 一个线程取完任务之后,此刻就应该把锁释放掉,让多线程可以同时并发获取任务、处理任务// 当前线程负责执行该任务if (task != nullptr){// task->run(); // 执行任务,把任务的返回值通过setVal方法给Resulttask->exec();} idleThreadSize_++;lastTime = std::chrono::high_resolution_clock().now(); // 更新线程执行完任务的时间}// 2.线程正在执行任务 线程池要结束 回收线程资源threads_.erase(threadid);std::cout << "threadid:" << std::this_thread::get_id() << " exit" << std::endl;// 唤醒线程池exitCond_.notify_all();
}
这样设计非常巧妙,无论是fixed模式还是cached模式,这三种情况最终都在一个地方释放资源,并唤醒主线程。
再来测试一下。
这样,就解决了死锁问题。
线程池完整代码
threadpool.hpp:
#ifndef THREADPOOL_H
#define THREADPOOL_H#include<iostream>
#include<vector>
#include<queue>
#include<memory>
#include<atomic>
#include<mutex>
#include<condition_variable>
#include<functional>
#include<unordered_map>// Any类型:可以接收任意数据的类型
class Any
{
public:Any() = default;~Any() = default;Any(const Any&) = delete;Any& operator=(const Any&) = delete;Any(Any&&) = default;Any& operator=(Any&&) = default;// 写不写都可以// 这个构造函数可以让Any类型接收任意其他类型的数据template<typename T>Any(T data) : base_(std::make_unique<Derive<T>>(data)){}// 这个方法可以把Any对象里面存储的data数据提取出来template<typename T>T cast_(){// 我们怎么从base_找到它所指向的Derive对象,从它里面取出data成员变量// 基类指针转换成派生类指针,向下转换。RTTI// 要确定基类指针确实指向了一个派生类对象。Derive<T>* pd = dynamic_cast<Derive<T>*>(base_.get());if (pd == nullptr){throw "type is unmatch!";}return pd->data_;}
private:// 基类类型class Base{public:virtual ~Base() = default;// 基类析构函数设置为虚函数};// 派生类类型template<typename T>class Derive : public Base{public:Derive(T data) : data_(data){}T data_;// 保存了任意其他类型的数据};private:// 定义一个基类的指针std::unique_ptr<Base> base_;
};// 实现一个信号量类
class Semaphore
{
public:Semaphore(int limit = 0):resLimit_(limit){}// 获取一个信号量资源void wait(){// 等待信号量有资源,没有资源,会阻塞当前线程std::unique_lock<std::mutex> lock(mtx_);cond_.wait(lock, [&]()->bool {return resLimit_ > 0; });resLimit_--;}// 增加一个信号量资源void post(){std::unique_lock<std::mutex> lock(mtx_);resLimit_++;cond_.notify_all();}private:int resLimit_;// 初始信号量资源个数std::mutex mtx_;std::condition_variable cond_;
};// Task类型的前置声明
class Task;
// 实现Result:接收提交到线程池的task任务执行完成后的返回值类型Result
class Result
{
public:Result(std::shared_ptr<Task> task, bool isValid = true);~Result() = default;// 问题一:setVal方法,获取任务执行完的返回值void setVal(Any any);// 问题二:get方法,用户调用这个方法获取返回值Any get();
private: Any any_; // 存储任务的返回值Semaphore sem_; // 线程通信信号量std::shared_ptr<Task> task_; // 指向将来对应获取返回值的任务对象std::atomic_bool isValid_; // 返回值是否有效
};// 任务抽象基类
class Task
{
public:Task();~Task() = default;void exec();void setResult(Result* res);virtual Any run() = 0;// 用户可以自定义任务类型,从Task继承,重写run方法,实现自定义任务处理
private:Result* result_; // Result对象的生命周期是长于Task的,不能使用智能指针,会出现智能指针交叉引用问题。
};// 线程池支持的模式
enum PoolMode
{MODE_FIXED, // 1.fixed模式 线程数量固定MODE_CACHED,// 2.cached模式 线程数量可动态增长
};// 线程类型
class Thread
{
public:// 线程函数对象类型using ThreadFunc = std::function<void(int)>;// 线程构造Thread(ThreadFunc func);// 线程启动void start();// 线程析构~Thread();// 获取线程idint getId() const;
private:static int generateId_;ThreadFunc func_;int threadId_; // 保存线程id
};/*
example:
ThreadPool poo;
pool.start(4);class MyTask : public Task
{public:void run(){ //线程代码... }
};pool.submitTask(std::make_shared<MyTask>());
*/// 线程池类型
class ThreadPool
{
public:// 线程池构造ThreadPool();// 线程池析构~ThreadPool();// 设置线程池的工作模式void setMode(PoolMode mode);// 设置task任务队列的上线阈值void setTaskQueMaxThreshHold(int threshhold);// 设置线程池cached模式下线程阈值void setThreadSizeThreshHold(int threshhold);// 给线程池提交任务Result submitTask(std::shared_ptr<Task> sp);// 开启线程池void start(int initThreadSize = std::thread::hardware_concurrency());ThreadPool(const ThreadPool&) = delete;ThreadPool& operator=(const ThreadPool&) = delete;
private:// 线程函数void threadFunc(int threadid);// 检查pool的运行状态bool checkRunningState() const;
private:// std::vector<std::unique_ptr<Thread>> threads_; // 线程列表std::unordered_map<int, std::unique_ptr<Thread>> threads_; // 线程列表int initThreadSize_; // 初始的线程数量int threadSizeHold_; // 线程数量上限阈值std::atomic_int curThreadSize_; // 记录当前线程池里面的线程总数量std::atomic_int idleThreadSize_; // 记录空闲线程的数量std::queue<std::shared_ptr<Task>> taskQue_; // 任务队列std::atomic_int taskSize_; // 任务的数量int taskQueMaxThreshHold_; // 任务队列数量的上线阈值std::mutex taskQueMtx_; // 保证任务队列的线程安全std::condition_variable notFull_; // 表示任务队列不满std::condition_variable notEmpty_; // 表示任务队列不空std::condition_variable exitCond_; // 等待线程资源全部回收PoolMode poolMode_; // 当前线程池的工作模式std::atomic_bool isPoolRunning_; // 表示线程池的启动状态};#endif
threadpool.cpp:
#include"threadpool.hpp"
#include<functional>
#include<thread>// 任务队列数量的上限阈值
const int TASK_MAX_THRESHHOLD = INT32_MAX;
// 线程数量的上限阈值
const int THREAD_MAX_THRESHHOLD = 10;
// 线程最大空闲时间
const int THREAD_MAX_IDLE_TIME = 10; // 单位:秒// 线程池构造
ThreadPool::ThreadPool(): initThreadSize_(0), taskSize_(0), idleThreadSize_(0), curThreadSize_(0), taskQueMaxThreshHold_(TASK_MAX_THRESHHOLD), threadSizeHold_(THREAD_MAX_THRESHHOLD), poolMode_(PoolMode::MODE_FIXED), isPoolRunning_(false)
{
}// 线程池析构
ThreadPool:: ~ThreadPool()
{isPoolRunning_ = false;// 等待线程池里面所有的线程返回 std::unique_lock<std::mutex> lock(taskQueMtx_);notEmpty_.notify_all();// 唤醒所有阻塞等待的线程exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}// 设置线程池的工作模式
void ThreadPool::setMode(PoolMode mode)
{if (checkRunningState())return;poolMode_ = mode;
}// 设置task任务队列的上线阈值
void ThreadPool::setTaskQueMaxThreshHold(int threshhold)
{if (checkRunningState())return;taskQueMaxThreshHold_ = threshhold;
}// 设置线程池cached模式下线程阈值
void ThreadPool::setThreadSizeThreshHold(int threshhold)
{if (checkRunningState())return;if (poolMode_ == PoolMode::MODE_CACHED){threadSizeHold_ = THREAD_MAX_THRESHHOLD;}}// 给线程池提交任务 用户调用接口,传入任务对象,生产任务
Result ThreadPool::submitTask(std::shared_ptr<Task> sp)
{// 获取锁std::unique_lock<std::mutex> lock(taskQueMtx_);// 线程通信 等待任务队列有空余 wait wait_for wait_until// 用户提交任务,最长不能阻塞超过1s,否则判断提交任务失败,返回if (!notFull_.wait_for(lock, std::chrono::seconds(1), [&]()->bool { return taskQue_.size() < (size_t)taskQueMaxThreshHold_; })){// 表示notFull_等待1s钟,条件仍然不满足std::cerr << "task queue is full, submit task faile." << std::endl;// return task->getResult(); // 线程执行完,task对象就被析构了return Result(sp,false);}// 如果有空余,把任务放进任务队列taskQue_.emplace(sp);taskSize_++;// 因为新放了任务,任务队列肯定不为空了,在notEmpty_上进行通知,赶快分配线程执行任务吧notEmpty_.notify_all();// cached模式 任务处理比较紧急 场景:小而快的任务 需要根据任务数量和空闲线程数量,判断是否需要创建新的线程出来if (poolMode_ == PoolMode::MODE_CACHED&& taskSize_ > idleThreadSize_&& curThreadSize_ < threadSizeHold_){// 这行打印用于测试std::cout << ">>> create new thread..." << std::endl;// 创建新线程auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));// threads_.emplace_back(std::move(ptr));// threads_[curThreadSize_]->start();int threadId = ptr->getId();threads_.emplace(threadId, std::move(ptr));// 启动线程threads_[threadId]->start();// 修改线程数量相关变量curThreadSize_++;idleThreadSize_++;}// 返回任务的Result对象// return task->getResult();return Result(sp);
}// 开启线程池 初始线程数量为当前系统的CPU核心数量
void ThreadPool::start(int initThreadSize)
{// 设置线程池的运行状态isPoolRunning_ = true;// 记录初始线程个数initThreadSize_ = initThreadSize;// 记录线程池里面线程的总数量curThreadSize_ = initThreadSize;// 创建线程对象 std::vector<Thread*> threads_;for (int i = 0; i < initThreadSize_; i++){// 创建thread线程对象的时候,把线程函数给到thread线程对象auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));int threadId = ptr->getId();threads_.emplace(threadId, std::move(ptr));// threads_.emplace_back(std::move(ptr));}// 启动所有线程 std::vector<Thread*> threads_;for (int i = 0; i < initThreadSize_; i++){threads_[i]->start();// 需要去执行一个线程函数idleThreadSize_++;}
}
// 定义线程函数 线程池的所有线程从任务队列里面消费任务
void ThreadPool::threadFunc(int threadid)
{auto lastTime = std::chrono::high_resolution_clock().now();while (isPoolRunning_){std::shared_ptr<Task> task;{// 先获取锁std::unique_lock<std::mutex> lock(taskQueMtx_);// 下面测试用的std::cout << "tid: " << std::this_thread::get_id() << "尝试获取任务..." << std::endl;// cached模式下,有可能已经创建了很多线程,但是空闲线程如果超过了60s,就应该把多余的线 程// 结束回收掉(超过initThreadSize_的线程要进行回收)// 当前时间 - 上一次线程执行完的时间 > 60s// 每秒钟返回一次 怎么区分超时返回,还是有任务待执行返回?// 锁+双重判断while (isPoolRunning_ && taskQue_.size() == 0){if (poolMode_ == PoolMode::MODE_CACHED){// 条件变量 超时返回了if (std::cv_status::timeout == notEmpty_.wait_for(lock, std::chrono::seconds(1))){// 当前时间auto now = std::chrono::high_resolution_clock().now();// 空闲时间auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);if (dur.count() >= THREAD_MAX_IDLE_TIME && curThreadSize_ > initThreadSize_){// 开始回收当前线程// 记录线程数量的相关变量的值修改// 把线程对象从线程列表删除threads_.erase(threadid);curThreadSize_--;idleThreadSize_--;std::cout << "threadid:" << std::this_thread::get_id() << " exit" << std::endl;return;}}}else{// 等待任务队列不为空 notEmpty条件 notEmpty_.wait(lock); }// 1.线程本身在阻塞等待 线程池要结束 回收线程资源//if (!isPoolRunning_)//{// // 线程回收// threads_.erase(threadid);// std::cout << "threadid:" << std::this_thread::get_id() << " exit" << std::endl;// // 唤醒线程池// exitCond_.notify_all();// return;//}}if (!isPoolRunning_){break;}idleThreadSize_--;// 下面测试用的std::cout << "tid: " << std::this_thread::get_id() << "获取任务成功..." << std::endl;// 从任务队列获取任务task = taskQue_.front();taskQue_.pop();taskSize_--;// 如果依然有剩余任务,继续通知其他线程来获取执行任务,提高了多线程同时并发获取任务、处理任务的能力if (taskQue_.size() > 0){notEmpty_.notify_all();}// 取出一个任务之后,进行通知,通知其他用户可以继续提交任务notFull_.notify_all();}// 一个线程取完任务之后,此刻就应该把锁释放掉,让多线程可以同时并发获取任务、处理任务// 当前线程负责执行该任务if (task != nullptr){// task->run(); // 执行任务,把任务的返回值通过setVal方法给Resulttask->exec();} idleThreadSize_++;lastTime = std::chrono::high_resolution_clock().now(); // 更新线程执行完任务的时间}// 2.线程正在执行任务 线程池要结束 回收线程资源threads_.erase(threadid);std::cout << "threadid:" << std::this_thread::get_id() << " exit" << std::endl;// 唤醒线程池exitCond_.notify_all();
}bool ThreadPool::checkRunningState() const
{return isPoolRunning_;
}/ 线程实现方法int Thread::generateId_ = 0;// 线程构造
Thread::Thread(ThreadFunc func):func_(func),threadId_(generateId_++)
{}// 线程析构
Thread::~Thread()
{}// 启动线程
void Thread::start()
{// 创建一个线程来执行线程函数std::thread t(func_,threadId_);t.detach();// 设置线程分离
}// 获取线程id
int Thread::getId()const
{return threadId_;
}/ Task方法的实现
Task::Task():result_(nullptr)
{}void Task::exec()
{if (result_ != nullptr){result_->setVal(run()); // 这里发生多态调用}
}void Task::setResult(Result* res)
{result_ = res;
}/ Result方法的实现Result::Result(std::shared_ptr<Task> task, bool isValid):task_(task),isValid_(isValid)
{task_->setResult(this);
}Any Result::get() // 用户调用的
{if (!isValid_){return "";}sem_.wait(); // task任务如果没有执行完,这里会阻塞用户的线程return std::move(any_);// Any std::unique_ptr<Base> base_;
}void Result::setVal(Any any) // 谁调用的呢?
{// 存储task的返回值this->any_ = std::move(any);sem_.post(); // 已经获取的任务的返回值,增加信号量资源
}
线程池项目测试.cpp:
//线程池项目.cpp:此文件包含"main"函数。程序将在此处开始并结束。
//#include<iostream>
#include<chrono>
#include<thread>
using namespace std;#include"threadpool.hpp"/*
有些场景,线程池是希望能够获取线程执行完任务以后的返回值的
举例:
计算 1 + ...+ 30000 的和
thread1 1 + ... + 10000
thread2 10000 + ... + 20000
thread3 20000 + ... + 30000
...
main thread: 给每一个线程分配计算的区间,并等待他们计算完返回结果,合并最终的结果即可。
*/using ulong = unsigned long long;class MyTask : public Task
{
public:MyTask(int begin, int end):begin_(begin), end_(end){}// 问题一:怎么设计run函数的返回值,可以表示任意类型。Any run(){std::cout << "tid: " << std::this_thread::get_id() << "begin!" << std::endl;std::this_thread::sleep_for(std::chrono::seconds(3));ulong sum = 0;for (int i = begin_; i <= end_; i++)sum += i;std::cout << "tid: " << std::this_thread::get_id() << "end!" << std::endl;return sum;}
private:int begin_;int end_;
};int main()
{{ThreadPool pool;pool.start(4);Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 100000000));Result res2 = pool.submitTask(std::make_shared<MyTask>(1, 100000000));ulong sum1 = res1.get().cast_<ulong>();cout << sum1 << endl;}cout << "main over!" << endl;getchar();#if 0{ThreadPool pool;// 用户自己设置线程池的工作模式pool.setMode(PoolMode::MODE_CACHED);pool.start(4);srand(time(0));// 如何设计这里的Result机制呢?size_t begin1 = clock();Result res1 = pool.submitTask(std::make_shared<MyTask>(1, 100000000));Result res2 = pool.submitTask(std::make_shared<MyTask>(100000001, 200000000));Result res3 = pool.submitTask(std::make_shared<MyTask>(200000001, 300000000));pool.submitTask(std::make_shared<MyTask>(200000001, 300000000));pool.submitTask(std::make_shared<MyTask>(200000001, 300000000));pool.submitTask(std::make_shared<MyTask>(200000001, 300000000));ulong sum1 = res1.get().cast_<ulong>();ulong sum2 = res2.get().cast_<ulong>();ulong sum3 = res3.get().cast_<ulong>();size_t end1 = clock();// Master - Slave线程模型// Master线程用来分解任务,然后给各个Slaver线程分配任务// 等待各个Slaver线程执行完成任务,返回结果// Master线程合并各个任务结果,输出// cout << "测试结果:" << (sum1 + sum2 + sum3) << "花费时间:" << end1 - begin1 << endl;1+...300000000正确结果//size_t begin2 = clock();//ulong sum = 0;//for (int i = 1; i <= 300000000; i++)// sum += i;//size_t end2 = clock();//cout << "正确结果:" << sum << "花费时间:" << end2 - begin2 << endl;}/*pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());pool.submitTask(std::make_shared<MyTask>());*/getchar();
#endif
}