目录
一、线程池:
1、池化技术:
2、线程池优点:
3、线程池应用场景:
4、线程池实现:
二、封装线程:
三、单例模式:
四、其他锁:
五、读者写者问题
一、线程池:
1、池化技术:
池化技术是以空间换时间的一种技术,上述的线程池实际上就是先提前创建一批线程用容器存储着,然后当有任务来临,线程直接从任务列表中获取并且执行任务,这样通过空间来换取时间,可以极大地提高效率
2、线程池优点:
高效,方便
- 线程池是已经创建好线程了,当有任务来临时,只需交给线程即可
- 在调度线程时保证合理性,防止线程的过度调度,保证内核的充分利用
- 避免了在处理短时间任务时创建与销毁线程的代价
但是线程池的数量也不是越多越好,更多的是要与实际开发相结合
3、线程池应用场景:
- 需要大量的线程来完成任务,并且每一个任务耗时短
- 对性能要求苛刻,要快速响应,比如打游戏时候放技能
- 当有可能突然出现大量请求的时候,但是不至于使服务器产生大量线程,也就是短时间产生大量线程使服务器内存达到最大,此时可以用线程池提升该效率问题
4、线程池实现:
#pragma once#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <pthread.h>
#include <unistd.h>
#include <ctime>struct ThreadInfo
{pthread_t _tid;std::string ThreadName;
};static int defaultnum = 5;template <class T>
class ThreadPool
{
public:void Lock(){pthread_mutex_lock(&_mutex);}void UnLock(){pthread_mutex_unlock(&_mutex);}void ThreadSleep(){pthread_cond_wait(&_cond,&_mutex);}bool IsEmpty(){return _task.empty();}std::string GetThreadName(pthread_t tid){for(auto &it : _threads){if(it._tid == tid){return it.ThreadName;}}return "NONE";}void Wakeup(){pthread_cond_signal(&_cond);}
public:ThreadPool(int num = defaultnum):_threads(num){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_cond,nullptr);}static void *myhander(void *args){ ThreadPool<T> * tp = static_cast<ThreadPool<T> *>(args);std::string name = tp->GetThreadName(pthread_self());while(true){tp->Lock();while(tp->IsEmpty()){tp->ThreadSleep();}T ret = tp->Pop();tp->UnLock();ret();std::cout << name << "正在运行,结果: " << ret.Getresult() << std::endl;}}T Pop(){T t = _task.front();_task.pop();return t;}void Push(const T &t){Lock();_task.push(t);Wakeup();UnLock();}void Start(){int num = _threads.size();for(int i = 0;i < num;i++){_threads[i].ThreadName = "thread-" + std::to_string(i + 1);pthread_create(&(_threads[i]._tid),nullptr,myhander,this);}}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:std::vector<ThreadInfo> _threads;std::queue<T> _task;pthread_mutex_t _mutex;pthread_cond_t _cond;
};
对于上述线程池代码,有三个要注意的点:
1、myhander函数要设计成静态的,因为如果不设计成静态的,myhander就是成员方法,而成员方法有this指针,这样导致其形参不是void*,所以不能在创建线程的时候直接传,要设置成静态的
2、但是设置成静态的函数的话,发现编译不过,因为静态的函数可以访问其静态的函数和变量,无法访问成员函数和成员变量,所以在创建线程的时候要传递当前对象,也就是this指针,这样就能够通过对象访问其成员函数了
3、这里要引入互斥锁,这是因为线程池中的任务队列会被多个执行流访问,要加锁对其进行保护,我们知道线程执行任务的时候要去任务队列中拿任务,此时就需要判断任务队列是否为空,所以加锁要在判断之前
判断就是如果队列为空,就让当前线程去条件变量中休眠,还有为了防止伪唤醒,所以这里的if要改为while
4、当线程拿到任务的时候,这个任务就是当前线程私有的了,所以处理任务就不要放在临界区了,因为要保证在处理任务的线程是并行的,如果一个线程要等待另一个线程处理完任务就不是并行的了,效率会大大降低
任务设计
我们的任务在之前章节就已经完成了,直接拿过来用即可
#pragma once
#include <iostream>
#include <string>std::string opers = "+-*/%";enum
{Divzero = 1,Modzero,Unknow
};class Task
{
public:Task(){}Task(int data1, int data2, char oper): _data1(data1), _data2(data2), _oper(oper), _exitcode(0),_result(0){}void run(){switch (_oper){case '+':_result = _data1 + _data2;break;case '-':_result = _data1 - _data2;break;case '*':_result = _data1 * _data2;break;case '/':{if (_data2 == 0)_exitcode = Divzero;else_result = _data1 / _data2;}break;case '%':{if (_data2 == 0) _exitcode = Modzero;else _result = _data1 % _data2;}break;default:_exitcode = Unknow;break;}}void operator ()(){run();}std::string Getresult(){std::string ret = std::to_string(_data1);ret += _oper;ret += std::to_string(_data2);ret += "=";ret += std::to_string(_result);ret += "[exitcode=";ret += std::to_string(_exitcode);ret += "]";return ret;}std::string GetTask(){std::string ret = std::to_string(_data1);ret += _oper;ret += std::to_string(_data2);ret += "=?";return ret;}~Task(){}private:int _data1;int _data2;char _oper;int _exitcode;int _result;
};
主函数逻辑
主线程就不断地向线程池中push任务即可,然后线程池就会从任务队列中拿任务并处理,怎么处理不用关心,所以这里对我们来说相当于一个黑盒
#include "ThreadPool.hpp"
#include "Task.hpp"int main()
{srand(time(nullptr)^getpid());ThreadPool<Task> *tp = new ThreadPool<Task>();tp->Start();while(true){int data1 = rand()%10+1;usleep(10);int data2 = rand()%5;char op = opers[rand()%opers.size()];Task t(data1,data2,op);usleep(10);tp->Push(t);std::cout<<"主线程启动:" << t.GetTask() <<std::endl;sleep(1);}return 0;
}
实验结果:
如下,并且我们发现这里的线程是有顺序性的,这是因为主线程每秒只会push一个任务,然后线程池中每次都只有一个线程获取到任务,其他线程就会在队列中等待,当这个线程处理完任务后,就会到队列的最后等待,此时若主线程有push任务,就会唤醒队列第一个线程,完成任务后又会到队列尾部,这样我们就能够看到顺序性
二、封装线程:
接下来我们尝试封装线程,就像C++中的线程库那样使用而不是使用原生接口
对于一个线程,我们可以给他设置很多属性,其中_data是传进来的参数,_cb是一个回调函数
private:pthread_t _tid;std::string _name;bool _isrunning;uint64_t _start_time;T _data;callback_t _cb;
将这些在构造函数进行初始化:
Thread(callback_t cb,T data):_tid(0),_name(""),_isrunning(false),_start_time(0),_cb(cb),_data(data){}
然后设计对外开放的接口,能够像C++库中的线程使用起来
void Run(){_name = "thread-" + std::to_string(num++);_isrunning = true;_start_time = time(nullptr);pthread_create(&_tid,nullptr,Running,this);}void Entry(){_cb(_data);}void Join(){_isrunning = false;pthread_join(_tid,nullptr);}std::string GetName(){return _name;}uint64_t Start_Time(){return _start_time;}bool IsRunning(){return _isrunning;}
其中,Running方法要设计成静态的,这是因为和线程池那里一样,如果不是静态方法就是成员函数,这样的话就会多一个形参,所以要设置成静态的,但是静态方法不能直接访问成员变量,所以在创建线程的时候要将this指针也就是当前对象传过去,这样就能够通过对象访问成员函数或成员变量了
static void* Running(void *args){Thread* thread = static_cast<Thread*>(args);thread->Entry();return nullptr;}
这里还要设置回调函数
typedef void (*callback_t)(T);
其中callback_t是 函数指针类型,他的参数是T,在定义一个Thread类的时候,需要传递一个函数给构造函数,这个cb就是一个函数指针,然后赋值给_cb,后面_cb()调用的就是传递进来的函数
在Entry中的cb(_data)调用的是这个uPrint
主函数逻辑
初始化好要传的参数,准备好回调函数所用的函数,就可以像C++中的线程库进行使用了
class udata
{
public:std::string _s;int _data;
};void uPrint(udata x)
{while(true){std::cout << "我是一个封装后的线程,我得到了参数 : " << x._s << " 和 " << x._data <<std::endl;sleep(1);}
}int main()
{udata da;da._s = "666";da._data = 111;Thread<udata> t(uPrint,da);t.Run();std::cout << "该线程的名字 : " << t.GetName() <<std::endl;std::cout << "该线程是否运行 : " << t.IsRunning() <<std::endl;std::cout << "该线程创建时间 : " << t.Start_Time() <<std::endl;t.Join();return 0;
}
三、单例模式:
具体可以看看下面这篇文章
【C++11】特殊类的设计 && 单例模式 && 类型转换-CSDN博客https://blog.csdn.net/2303_80828380/article/details/147028880?spm=1001.2014.3001.5501
在这里我们是将单例模式在代码中实现出来,也就是将线程池设计成单例模式
1、将构造函数私有化,保证不能够在外部随便创建变量
private:ThreadPool(int num = defaultnum): _threads(num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}
2、将拷贝构造和赋值重载删掉,保证不能够通过拷贝来创建新对象
ThreadPool(const ThreadPool<T>& copy) = delete;const ThreadPool<T>& operator=(const ThreadPool<T>& copy) = delete;
3、设计静态的指针指向堆空间,还要有锁,_tp要设计成静态的原因实现单例模式,保证全局唯一实例,_lock为静态的提供全局同步锁,确保多线程下安全创建单例
静态成员变量在类里面声明,在类外面定义,是语法规定的
类内定义,类外初始化,在初始化的时候不要带上static,并且要指定类域,不然的话编译器编译的时候就找不到对应的变量了
template <class T>
class ThreadPool
{
private:static ThreadPool<T>* _tp;//不可以像下面初始化//static ThreadPool<T>* _tp = nullptr;static pthread_mutex_t _lock;
};template <class T>
ThreadPool<T>* ThreadPool<T>::_tp = nullptr;template <class T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;
4、这里采用懒汉模式创建单例,所以在GetInstance接口中new对象
这里这个函数为静态的,为了访问静态变量_tp,如果不是静态函数的话就访问不了_tp了
static ThreadPool<T>* GetInstance(){if(_tp == nullptr) {pthread_mutex_lock(&_lock);if(_tp == nullptr){_tp = new ThreadPool<T>;}pthread_mutex_unlock(&_lock);}return _tp;}
还要进行两次判空:
我们发现,只有第一个线程进入的时候_tp才会为空,这样的话后面如果多次进行加锁判断释放锁会降低效率,所以这里得二次判断,保证后面不为空的时候直接返回,并且即使第一次有多个线程进入了第一个if,但是,依然是只会有一个线程成功申请锁并进入if,new空间,其余线程即使后来申请到锁了,但是if判断失效,所以就会释放锁,然后后来的线程就会在第一个if那里都进不去,进而增加效率
完整代码:
#pragma once#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <pthread.h>
#include <unistd.h>
#include <ctime>struct ThreadInfo
{pthread_t _tid;std::string ThreadName;
};static int defaultnum = 5;template <class T>
class ThreadPool
{
public:void Lock(){pthread_mutex_lock(&_mutex);}void UnLock(){pthread_mutex_unlock(&_mutex);}void ThreadSleep(){pthread_cond_wait(&_cond, &_mutex);}bool IsEmpty(){return _task.empty();}std::string GetThreadName(pthread_t tid){for (auto &it : _threads){if (it._tid == tid){return it.ThreadName;}}return "NONE";}void Wakeup(){pthread_cond_signal(&_cond);}public:static void *myhander(void *args){ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);std::string name = tp->GetThreadName(pthread_self());while (true){tp->Lock();while (tp->IsEmpty()){tp->ThreadSleep();}T ret = tp->Pop();tp->UnLock();ret();std::cout << name << "正在运行,结果: " << ret.Getresult() << std::endl;}}T Pop(){T t = _task.front();_task.pop();return t;}void Push(const T &t){Lock();_task.push(t);Wakeup();UnLock();}void Start(){int num = _threads.size();for (int i = 0; i < num; i++){_threads[i].ThreadName = "thread-" + std::to_string(i + 1);pthread_create(&(_threads[i]._tid), nullptr, myhander, this);}}//懒汉模式static ThreadPool<T>* GetInstance(){if(_tp == nullptr) //我们发现,只有第一个线程进入的时候_tp才会为空,这样的话后面如果多次进行加锁判断释放锁会降低效率{ //所以这里得二次判断,保证后面不为空的时候直接返回,并且即使第一次有多个线程进入了235行,但是//依然是只会有一个线程成功申请锁并进入if,new空间,其余线程即使后来申请到锁了,但是if判断失效,所以就//会释放锁,然后后来的线程就会在第一个if那里都进不去,进而增加效率pthread_mutex_lock(&_lock);if(_tp == nullptr){_tp = new ThreadPool<T>;}pthread_mutex_unlock(&_lock);}return _tp;}private:ThreadPool(int num = defaultnum): _threads(num){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}ThreadPool(const ThreadPool<T>& copy) = delete;const ThreadPool<T>& operator=(const ThreadPool<T>& copy) = delete;
private:std::vector<ThreadInfo> _threads;std::queue<T> _task;pthread_mutex_t _mutex;pthread_cond_t _cond;static ThreadPool<T>* _tp;//static ThreadPool<T>* _tp = nullptr;static pthread_mutex_t _lock;
};template <class T>
ThreadPool<T>* ThreadPool<T>::_tp = nullptr;template <class T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;
四、其他锁:
- 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
- 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
- CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
- 自旋锁:申请锁失败的时候不将自己挂起,而是不断尝试申请锁
自旋就是不把自己挂起,而是由线程不断地去周而复始的去申请锁,如果申请锁成功则进入,失败则返回重新检测锁的状态
自旋锁相关接口:
自旋锁的类型是pthread_spinlock_t
加解自旋锁:
其中:
int pthread_spin_lock(pthread_spinlock_t *lock)失败就不断重试(阻塞式)
int pthread_spin_trylock(pthread_spinlock_t *lock);失败就继续向后运行(非阻塞式)
其实是类似于之前学习的互斥锁,使用和其大差不差
什么时候使用自旋锁:
1、锁持有时间极短
当临界区代码执行时间非常短(如几个指令周期)时,自旋锁的“忙等待”不会显著浪费CPU资源
2、多核/多CPU环境
在多核系统中,等待线程可能在另一个核心上运行,避免因睡眠-唤醒带来的上下文切换开销
3、不可睡眠的上下文
在中断上下文、内核抢占禁用区域等不能睡眠的代码路径中,必须使用自旋锁
4、对实时性要求高
自旋锁的响应延迟低(无上下文切换),适合实时系统或低延迟任务
特性 | 自旋锁 | 互斥锁 |
---|---|---|
等待方式 | 忙等待(循环检测) | 睡眠等待(进入阻塞状态) |
CPU消耗 | 高(持续占用CPU) | 低(释放CPU给其他线程) |
适用场景 | 短临界区、多核、不可睡眠 | 长临界区、通用场景 |
性能优势 | 低延迟(无上下文切换) | 高吞吐量(减少CPU浪费) |
五、读者写者问题
这个也是遵循"321"原则的
3种关系:
读者和读者,写者和写者,读者和写者
2个角色:
读者和写者
1个交易场所:
阻塞队列或者缓冲区
以上基本是和生产消费者模型是差不多的,不同的是读者和读者之间是没有互斥关系的,最大的原因在于读者是只会读缓冲区中的数据,是不会将数据拿走或者修改的,所以不需要维护读者和读者之间的互斥关系
相关接口:
注意:读者和写者的锁不是同一个锁
初始化,销毁自旋锁:
读者进行加锁,解锁
写者进行加锁,解锁
解锁,这个读者写者的锁都能解
读者写者的伪代码:
理解:
读者在读数据的时候,写者是不能够写的,但是其他读者可以读
写者在写数据的时候,读者是不能够读数据的
当所有读者读完数据的时候,再让写者写数据
当写者写完数据的时候,再让读者读数据
接下来实现一个读者优先的伪代码:
void Reader()
{// 首先就是读者的加锁和解锁操作:// 首先对读者进行加锁pthread_mutex_lock(&rlock);reader_count++; //在这里将读者的数量+1// 接着判断读者是否为1,如果为1就证明这是第一个读者,此时申请写者的锁,就会有两种情况// 1、写者没有进行写入,所以写者也就没有申请到锁,所以读者申请成功锁,让以后写者无法进行写入// 2、写者正在进行写入,所以写者也就持有锁,那么第一个读者也就在这里申请失败锁,进入阻塞队列中等待if (reader_count == 1) {pthread_mutex_lock(&wlock);}// 这里是释放的读者自己的锁,如果读者正在读,一定会持有写者的锁的pthread_mutex_unlock(&rlock);// 在这里进行读取,因为不需要维护读者之间的互斥关系也就不需要锁,直接进行读取即可//当读取完后,下面就是减少读者的代码pthread_mutex_lock(&rlock);reader_count--; // 将读者的数量-1//这里进行判断,如果读者的数量为0的时候释放写者的锁,此时写者就能够进行写入了if (reader_count == 0) {pthread_mutex_lock(&wlock);}pthread_mutex_unlock(&rlock);
}
void Writer()
{// 然后是写者的加锁,写入数据,解锁pthread_mutex_lock(&wlock);//写入数据pthread_mutex_unlock(&wlock);
}