1 内存序
2 简介
std::memory_order
是 C++11
引入的一个枚举类型,用于和 <atomic>
原子操作一起使用,控制多线程环境下内存的可见性和执行顺序。
它的主要作用是:告诉编译器和 CPU,在执行某个原子操作时,哪些内存读写可以重排,哪些必须按顺序来.
3 理论
略
4 实践
- 实现一个拆分数据的测试例子,主线程把数据写入内存,设置标识为 false,等待另外 2 个线程处理始数据,处理完了把标识设置为 true.
- 发现在 debug 模式正常拆分,在 RelWithDebInfo 模式拆分错误.
- 猜测是内存序的问题,把标识修改为 std::atomic,加上内存序号操作后正确.
4.1 主线程拆分
4.2 拆分线程
4.3 结果
4.4 解决
// 在读取时,使用memory_order_acquire,表示在此之后的代码不会被编译器重排到前面if (info_arr_[i].is_finished.load(std::memory_order::memory_order_acquire) == false) {split(i);}// 在设置时,使用memory_order_release,表示在此之前的代码不会被编译器重排到后面
info.is_finished.store(false, std::memory_order::memory_order_release);
5 源码
#include <iostream>
#include <fstream>
#include <vector>
#include <bitset>
#include "xdebug.h"
#include "chan_split.h"#define BUffER_SIZE 1024 * 1024 * 2#if 0
int main() {uint8_t* pdata = new uint8_t[1024 * 1024 * 2];chan_split chan_split_(0, 1024 * 1024 * 2);uint8_t* ptmp = pdata;size_t loop_count = 1024 * 1024 * 2 / 32;for (size_t i = 0; i < loop_count; i++) {memset(ptmp, i & 255, 32);ptmp += 32;}std::string filename = "file.bin";std::ofstream file(filename, std::ios::binary);if (file) {file.write((const char*)pdata, 1024 * 1024 * 2);}chan_split_.start_split_async();for (int i = 0; i < 150; i++) {chan_split_.push_data(pdata, 1024 * 1024 * 2, 2);}std::this_thread::sleep_for(std::chrono::seconds(1));chan_split_.stop_split_async();xdebug("done.");return 0;
}
#else
int main() {chan_split chan_split_(0, BUffER_SIZE);uint8_t* pdata = new uint8_t[BUffER_SIZE];std::ifstream file("xdma_test.dat", std::ios::binary);if (!file) {xdebug("open file failed.");return 1;}chan_split_.start_split_async();int count = 0;while (file.eof() == false) {file.read((char*)pdata, BUffER_SIZE);std::streamsize byte_read_len = file.gcount();chan_split_.push_data(pdata, byte_read_len, 2);count++;}xdebug("count=%d", count);std::this_thread::sleep_for(std::chrono::seconds(1));chan_split_.stop_split_async();delete[] pdata;return 0;
}#endif
#pragma once#include <thread>
#include <shared_mutex>
#include <future>
#include <atomic>
#include "ThreadPool.h"#define USE_ATIMIC_BOOL_C 1struct split_info_t {int chan_index{0}; /* 拆分的索引 */int dma_index{0}; /* dma索引 */int chan_count{1}; /* 通道数 */uint8_t* pdata{nullptr}; /* 待拆分数据 */size_t pdata_len{0}; /* 待拆分数据长度 */std::vector<uint8_t> data; /* 处理完成的数据 */size_t data_len{0}; /* 待拆分数据长度 */#ifdef USE_ATIMIC_BOOL_Cstd::atomic<bool> is_finished; /* 是否拆分完成 */
#elsebool is_finished;
#endif
};class chan_split {
public:chan_split(int dma_index, size_t read_dma_len);~chan_split();void push_data(uint8_t* pdata, size_t len, int chan_count);void start_split_async();void stop_split_async();private:void split(int chan_index);void extract(split_info_t& pinfo);private:split_info_t info_arr_[2];bool is_running_{true};ThreadPool pool_;std::vector<std::future<int>> results_;
};
#include "chan_split.h"
#include <string>
#include <fstream>
#include "xdebug.h"chan_split::chan_split(int dma_index, size_t read_dma_len): pool_(2) {for (int i = 0; i < 2; i++) {split_info_t& info = info_arr_[i];info.chan_index = i;info.dma_index = dma_index;info.data.resize(read_dma_len);}
}chan_split::~chan_split() {is_running_ = false;
}void chan_split::start_split_async() {for (int i = 0; i < 2; i++) {/* clang-format off */results_.emplace_back(pool_.enqueue([this, i] {while (is_running_) {
#ifdef USE_ATIMIC_BOOL_Cif (info_arr_[i].is_finished.load(std::memory_order::memory_order_acquire) == false) {#elseif(info_arr_[i].is_finished == false){
#endifsplit(i);}}return 0;}));/* clang-format on */}
}void chan_split::stop_split_async() {is_running_ = false;for (int i = 0; i < 2; i++) {if (results_[i].valid() == true) {results_[i].wait();}}results_.clear();
}void chan_split::push_data(uint8_t* pdata, size_t len, int chan_count) {for (int i = 0; i < chan_count; i++) {split_info_t& info = info_arr_[i];info.chan_count = chan_count;info.pdata = pdata;info.pdata_len = len;
#ifdef USE_ATIMIC_BOOL_Cinfo.is_finished.store(false, std::memory_order::memory_order_release);#elseinfo.is_finished = false;
#endif}for (int i = 0; i < chan_count; i++) {
#ifdef USE_ATIMIC_BOOL_Cwhile (info_arr_[i].is_finished.load(std::memory_order::memory_order_acquire) == false) {}#elsewhile (info_arr_[i].is_finished == false) {}
#endif}
}void chan_split::split(int chan_index) {split_info_t& info = info_arr_[chan_index];switch (info.chan_count) {case 1: {/* 直接调用外部函数 */break;}default: {/* 拆分数据 */extract(info);break;}}#ifdef USE_ATIMIC_BOOL_Cinfo.is_finished.store(true, std::memory_order::memory_order_release);
#elseinfo.is_finished = true;#endif
}void chan_split::extract(split_info_t& info) {int ele_size = 16;int loop_size = ele_size * info.chan_count;size_t loop_count = info.pdata_len / ele_size;uint8_t* src = info.pdata + info.chan_index * ele_size;uint8_t* dst = info.data.data();for (int i = 0; i < loop_count; i++) {memcpy(dst, src, ele_size);dst += ele_size;src += loop_size;i++;}info.data_len = info.pdata_len / info.chan_count;/* 调用回调函数 */std::string filename = "xdma_test_" + std::to_string(info.chan_index) + ".dat";std::ofstream file(filename, std::ios::binary | std::ios::app);if (file) {file.write((const char*)info.data.data(), info.data_len);}
}
#ifndef THREAD_POOL_H
#define THREAD_POOL_H#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>class ThreadPool {
public:ThreadPool(size_t);~ThreadPool();/* 函数为enqueue(F&& f, Args&&... args)返回的类型是推导出来的 std::future<typename std::result_of<F(Args...)>::type>*/template <class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;private:// need to keep track of threads so we can join them 需要跟踪线程,便于join等待线程std::vector<std::thread> workers_;// the task queue 任务队列std::queue<std::function<void()>> tasks_;// synchronizationstd::mutex queue_mutex_;std::condition_variable condition_;bool stop_;
};// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads_count): stop_(false) {for (size_t i = 0; i < threads_count; ++i)workers_.emplace_back([this] {for (;;) {std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex_);this->condition_.wait(lock, [this] { // 先释放锁,再阻塞直到stop或者任务队列非空return this->stop_ || !this->tasks_.empty();});if (this->stop_ && this->tasks_.empty())return;task = std::move(this->tasks_.front());this->tasks_.pop();}task();}});
}// add new work item to the pool
template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {using return_type = typename std::result_of<F(Args...)>::type;auto task =std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex_);// don't allow enqueueing after stopping the poolif (stop_)throw std::runtime_error("enqueue on stopped ThreadPool");tasks_.emplace([task]() {(*task)();});}condition_.notify_one();return res;
}// the destructor joins all threads
inline ThreadPool::~ThreadPool() {{std::unique_lock<std::mutex> lock(queue_mutex_);stop_ = true;}condition_.notify_all();for (std::thread& worker : workers_)worker.join();
}#endif