目录
- 线程同步概念
- 条件变量
- 条件变量相关接口
- pthread_cond_init
- pthread_cond_wait
- pthread_cond_signal与pthread_cond_broadcast
- 测试样例
- 生产消费模型
- 生产者消费者模型的特点
- 基于BlockingQueue的生产者消费者模型
线程同步概念
线程同步同样是需要互斥锁来实现的,但不同的是:线程同步,程之间所具有的一种制约关系,一个线程的执行依赖另一个线程的消息或状态。当某个线程需要等待另一个线程完成某项任务后才能继续执行时,就需要进行线程同步。同步的目的是确保线程之间按照一定的顺序或规则来访问共享资源或执行特定操作。
线程同步是要强调顺序性的,而非单一的竞争,这就为访问临界资源提供了一定的合理性,不会让同一个线程连续多次的访问临界资源。这就与线程互斥有区别了,线程互斥只完成了排他性,但同步在排他的基础上,完成了线程访问资源的顺序性。同步的概念更为广泛,它不仅包括互斥,还包括其他形式的线程间协作和顺序控制。可以说互斥是同步的一种特殊形式。
条件变量
当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
条件变量是一种同步原语,它提供了一种线程间通信的方式。当线程需要等待某个条件成立时,它可以使用条件变量将自己挂起并进入等待状态。一旦条件成立,另一个线程会通知条件变量,从而唤醒等待的线程。
条件变量必须与互斥锁结合使用,以确保线程在检查条件和等待条件变量时的原子性。这意味着,在调用条件变量的等待函数之前,线程必须已经持有与条件变量关联的互斥锁。当线程被条件变量唤醒后,它会重新获取互斥锁,并再次检查条件是否真正满足。
条件变量提供了两种基本操作:
- 等待(wait):线程调用条件变量的等待函数时,会释放已持有的互斥锁并进入等待状态。此时,线程不再消耗CPU资源,直到被其他线程唤醒。唤醒后,线程会重新获取互斥锁,并继续执行后续操作。
- 通知(notify/notifyAll):当条件满足时,另一个线程会调用条件变量的通知函数来唤醒一个或所有等待的线程。通知操作必须在持有互斥锁的情况下进行,以确保线程同步的正确性。
条件变量通常是与线程队列相关联的,因为可能有多个线程等待同一个条件,条件满足时,条件变量会从队列中唤醒一个或多个线程,使它们能够继续执行。
设想这样一个场景,当多个线程想去访问一个临界资源的时候,但是这个临界资源又需要另一个线程改变到某个值才行,在没有改变的时候多个线程会不断的去申请锁,检查,然后释放锁,此时能改变这个临界资源的线程竞争到锁的可能性就变小了,此时其他线程不断的竞争锁然后检查再释放,在没有改变前是做的无用功。条件变量的作用就是避免这些无用功,多个线程在检查到临界资源没有到达期望值的时候在一个队列里面去等待这个条件变量,改变临界资源的线程在做出修改后唤醒条件变量。这就避免多个线程不断的去竞争这个锁。这也是条件变量必须与互斥锁结合使用的原因,在改变临界资源的过程中,其他线程因为没有锁去访问了临界资源就造成的数据错误。
条件变量相关接口
pthread_cond_init
pthread_cond_init
函数是用于初始化一个条件变量。
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
cond
是指向条件变量对象的指针,该对象将被初始化。
attr
是指向条件变量属性的指针,用于指定条件变量的属性。如果此参数为 NULL,则使用默认属性。在大多数应用中,通常传递 NULL。
函数成功时返回 0;出错时返回错误码。
在实际使用中,静态初始化的方式(如示例中 PTHREAD_COND_INITIALIZER
和 PTHREAD_MUTEX_INITIALIZER
)通常用于全局或静态的条件变量和互斥锁。对于局部变量或需要动态配置属性的情况,应使用 pthread_cond_init
和 pthread_mutex_init
函数进行动态初始化。
pthread_cond_t
,该类型用于表示条件变量。pthread_cond_t 的使用中,通常需要与互斥锁(pthread_mutex_t)一起工作,以确保对共享数据的访问是同步的。条件变量本身不直接管理或保护任何数据;它们必须与互斥锁结合使用,以确保在检查条件(即“等待”条件)和修改条件(即“通知”或“广播”条件)时,数据的完整性得到保护。
pthread_cond_wait
pthread_cond_wait
,用于使线程在条件变量上等待,直到该条件变量被另一个线程的信号(pthread_cond_signal
)或广播(pthread_cond_broadcast
)唤醒。
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
cond
是指向条件变量对象的指针。
mutex
是指向互斥锁对象的指针,该互斥锁必须在调用 pthread_cond_wait 之前被当前线程持有(即锁定状态)。
在调用该函数时,互斥锁mutex必须被锁住,并等待唤醒。唤醒之后线程重新锁住互斥锁并继续执行。值得注意的是,pthread_cond_wait函数首先会解锁与之关联的互斥锁mutex,这也是为什么使用该函数时mutex必须是被锁住的。然后调用该函数的线程进入阻塞状态,直到被唤醒。最后,条件变量被通知之后,pthread_cond_wait重新锁定互斥锁mutex。
为什么要在pthread_cond_wait中传入互斥锁?
- 在调用 pthread_cond_wait 时,互斥锁是已经锁住的,确保没有其他线程可以修改共享资源。
- pthread_cond_wait 在进入等待状态之前会自动释放互斥锁,使得其他线程可以修改条件。
- 当线程被唤醒后,pthread_cond_wait 会重新获得互斥锁,然后再继续执行,因为此时还在临界区,还会访问临界资源。
pthread_cond_signal与pthread_cond_broadcast
接口:pthread_cond_signal(唤醒一个线程)和pthread_cond_broadcast(唤醒所有线程),是用于唤醒等待条件变
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
pthread_cond_signal函数如果有多个线程在等待条件变量,在等待队列里先等待的先唤醒,后等待的后唤醒。
pthread_cond_broadcast函数唤醒所有等待在条件变量 cond 上的线程。
cond 是指向要唤醒线程的条件变量的指针。
函数成功时返回 0;失败时返回错误码。
测试样例
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>const int num = 5;
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;void *Wait(void *args)
{std::string name = static_cast<const char *>(args);while (true){pthread_mutex_lock(&gmutex);pthread_cond_wait(&gcond, &gmutex /*?*/); // 这里就是线程等待的位置usleep(10000);std::cout << "I am : " << name << std::endl;pthread_mutex_unlock(&gmutex);// usleep(100000);}
}int main()
{pthread_t threads[num];for (int i = 0; i < num; i++){char *name = new char[1024];snprintf(name, 1024, "thread-%d", i + 1);pthread_create(threads + i, nullptr, Wait, (void *)name);usleep(10000);}sleep(1);// 唤醒其他线程while (true){pthread_cond_signal(&gcond);//pthread_cond_broadcast(&gcond);//std::cout << "唤醒所有线程...." << std::endl;std::cout << "唤醒一个线程...." << std::endl;sleep(2);}for (int i = 0; i < num; i++){pthread_join(threads[i], nullptr);}return 0;
}
可以看到线程被唤醒的顺序也是固定的。
上述代码中,主要就是线程执行的函数中的while循环,其中调用了pthread_cond_wait,执行到这里的线程就会进入到该函数所指定的条件变量的等待队列里。等待被唤醒,被唤醒后接着就执行下面的代码。
可以发现的是,pthread_cond_wait函数是在临界区里面的,这是不是就是带着锁去等待呢?如果这样的话,其他线程怎么进入临界区然后在进入到等待队列呢?设计者当然也知道这个问题,所有这个函数第二个参数是锁的地址,这就保证了在这个函数也可以释放锁。当唤醒条件变量返回时,需要再次参与锁的竞争,当竞争到锁后就会再次从等待的地方向下执行。如果没有竞争到锁就会阻塞在锁那里,等得到锁再次执行仍然是在pthread_cond_wait函数处开始执行的。这并不会造成效率降低,因为就算阻塞了也不是一直像原来一直加锁然后释放锁,这样做无用功。大部分情况下都是相互制约的,当条件成立的时候,另一个线程唤醒此线程,如果另一个线程竞争到锁,其实这个线程大概率会陷入等待,等待此线程唤醒他,此时就释放锁没有线程再竞争,此时此线程就得到了锁。
通过下面的生产消费模型会理解的更深刻。
生产消费模型
通过上述了解,也可以知道为什么条件变量和锁要一起使用,因为锁是为了多个线程不能同时访问同一份资源。但是如果只有锁,在某种条件不成立的情况下会不停的加锁检查然后释放锁,效率很低。于是使用条件变量来进行等待和通知。条件不成立直接去等待,等待条件满足,然后其他线程唤醒。
生产者消费模型是一种常见的多线程编程模式,广泛应用于解决并发编程中的数据共享和任务调度问题。在该模型中,我们将生产数据并放入缓冲区的线程称为生产者线程,反之,从缓冲区中获取数据的线程就称为消费者线程。生产者和消费者通过共享的缓冲区进行通信,并且使用同步机制(如互斥锁和条件变量)来协调操作,防止数据竞争和资源浪费。
为了便于记忆,可以将生产者消费模型看成是由1个共享区(缓冲区),2个角色(生产者和消费者)和3种关系(生产者和生产者,消费者和生产者,消费者和消费者)组成。
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型的特点
解耦:生产者和消费者不直接接触,彼此解耦,独立性强
效率高:通过缓冲区和同步机制,避免了过忙和过闲。
灵活性:可以通过调整缓冲区大小和线程数量来优化系统性能,即支持并发性。
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
BlockingQueue.hpp:
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>template<typename T>
class BlockQueue
{
private:bool IsFull(){return _block_queue.size() == _max_cap;}bool IsEmpty(){return _block_queue.empty();}public:BlockQueue(int cap = 5) : _max_cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_p_cond, nullptr);pthread_cond_init(&_c_cond, nullptr);}// 假设:2个消费者void Pop(T *out){pthread_mutex_lock(&_mutex);while (IsEmpty()) //while可以保证代码的鲁棒性,避免两个进程同时被唤醒导致数据访问数据出错{// 添加尚未满足,但是线程被异常唤醒的情况,叫做伪唤醒!pthread_cond_wait(&_c_cond, &_mutex); // 两个消费者都在这里等待了}// 1. 没有空 || 2. 被唤醒了*out = _block_queue.front();_block_queue.pop();pthread_mutex_unlock(&_mutex);//由生产者唤醒pthread_cond_signal(&_p_cond);}// 一个生产者void Equeue(const T &in){pthread_mutex_lock(&_mutex);while (IsFull()){// 满了,生产者不能生产,必须等待// 可是在临界区里面啊!!!pthread_cond_wait,被调用的时候:除了让自己继续排队等待,还会自己释放传入的锁// 函数返回的时候,不就还在临界区了!,返回时:必须先参与锁的竞争,重新加上锁,该函数才会返回!pthread_cond_wait(&_p_cond, &_mutex);}// 1. 没有满 || 2. 被唤醒了_block_queue.push(in); // 生产到阻塞队列pthread_mutex_unlock(&_mutex);// 让消费者消费,由消费者唤醒pthread_cond_signal(&_c_cond); // pthread_cond_broadcast : 一种场景}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_p_cond);pthread_cond_destroy(&_c_cond);}
private:std::queue<T> _block_queue;//临界资源int _max_cap;//最大容量pthread_mutex_t _mutex;//锁pthread_cond_t _p_cond;//生产者的条件变量pthread_cond_t _c_cond; // 消费者条件变量
};
- 构造函数 (BlockQueue): 初始化队列的最大容量、互斥锁和两个条件变量(一个用于生产者,一个用于消费者)。
- Pop 方法: 供消费者线程调用,从队列中移除并返回队列前端的元素。如果队列为空,则消费者线程将阻塞在pthread_cond_wait调用上,直到生产者向队列中添加了元素并通知了消费者。
- Equeue 方法: 供生产者线程调用,向队列中添加一个新元素。如果队列已满,则生产者线程将阻塞在pthread_cond_wait调用上,直到消费者从队列中移除了元素并通知了生产者。
- 析构函数 (~BlockQueue): 清理资源,销毁互斥锁和条件变量
main.cpp:
#include <iostream>
#include "BlockingQueue.hpp"
#include <unistd.h>
#include <ctime>using namespace std;void *Consumer(void *arg)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(arg); // 类型转换while (true){sleep(1);int val = 0;bq->Pop(&val);cout << "消费者取出数据->" << val << endl;}
}void *Produce(void *arg)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(arg); // 类型转换while (true){int val = rand() % 100;bq->Equeue(val);cout << "生产者生产数据->" << val << endl;//sleep(1);}
}int main()
{BlockQueue<int> *bq = new BlockQueue<int>();srand(time(nullptr));pthread_t t1, t2;pthread_create(&t1, NULL, Consumer, bq);pthread_create(&t2, NULL, Produce, bq);pthread_join(t1, nullptr);pthread_join(t2, nullptr);return 0;
}
在判断的时候不用if而用while,如果有两个消费者A和B,A竞争锁成功了,他就会执行临界区代码,先判断商品是否为空,如果空了就要wait;此时B可以持有锁,B也可以执行临界区代码,因为商品空了,所有B也要等待。
此时生产者生产了一个商品,使用 pthread_cond_broadcast把A和B都唤醒,那么如果A被唤醒拿走了一个商品,B也被唤醒,B现在锁处等待A释放锁,B拿到锁后从wait处返回,继续执行下面代码,去拿商品的时候却没有商品:添加尚未满足,但是线程被异常唤醒的情况,叫做伪唤醒!为了避免这种情况的发生,使用while判断商品是否为空,如果为空那么重新等待,对于生产者同样如此!
所以要使用while
while (IsEmpty())while (IsFull())
上面演示的只是一个简单的生产消费模型,生产消费模型不只是可以传简单的数字,同样可以传我们自己定的任务。
task.hpp:
#pragma once
#include<iostream>
// 要做加法
class task
{
public:task(){}task(int x, int y) : _x(x), _y(y){}void Excute(){_result = _x + _y;}void operator ()(){Excute();}std::string debug(){std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";return msg;}std::string result(){std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);return msg;}private:int _x;int _y;int _result;
};
main.cpp:
#include "BlockingQueue.hpp"
#include "task.hpp"
#include <pthread.h>
#include <ctime>
#include <unistd.h>void *Consumer(void *args)
{BlockQueue<task> *bq = static_cast<BlockQueue<task> *>(args);while(true){// 1. 获取数据task t;bq->Pop(&t);// 2. 处理数据t.Excute();std::cout << "Consumer -> " << t.result() << std::endl;}
}void *Productor(void *args)
{srand(time(nullptr) ^ getpid());BlockQueue<task> *bq = static_cast<BlockQueue<task> *>(args);while(true){// 1. 构建数据/任务int x = rand() % 10 + 1; // [1, 10]usleep(x * 1000);int y = rand() % 10 + 1; // [1, 10]task t(x, y);// 2. 生产数据bq->Equeue(t);std::cout << "Productor ->" <<t.debug()<< std::endl;sleep(1);}
}int main()
{BlockQueue<task> *bq = new BlockQueue<task>();pthread_t c1,c2, p1,p2,p3;pthread_create(&c1, nullptr, Consumer, bq);pthread_create(&c2, nullptr, Consumer, bq);pthread_create(&p1, nullptr, Productor, bq);pthread_create(&p2, nullptr, Productor, bq);pthread_create(&p3, nullptr, Productor, bq);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);pthread_join(p3, nullptr);return 0;
}
其实认真思考可以知道,任何时刻,队列都只有一个生产者或者消费者。这怎么提高了效率呢?
其实不能只看到生产者向队列里面加数据,消费者取数据。在实际应用中,生产者得到的数据可能来自很多地方,比如网卡,产生数据的时间是很费时间的,此时就可以让其他生产者传入他们已经有的任务。消费者执行也是很费时间的,此时就可以让其他消费者来执行任务。这就提高了效率。