在上一节中我们实现了同步日志,并输出到stdout。这节我们来实现异步。
在上一节中,添加了AppendFile类,这是对文件操作的一个底层的类,我们需要一个更加上层的,使用更便捷的接口来操控磁盘文件。比如说每输出了1000条日志消息,就需要冲刷AppendFile类中的缓冲区buffer_;还有日志文件大小若达到了预定的大小,就要新开一个日志文件等等。所以我们新创一个类,更加方便我们使用。
1.LogFile类
// 日志文件类,用于管理日志文件的输出
class LogFile
{
public: // 构造函数,初始化 LogFile 对象 // 参数 file_name: 日志文件名,flushEveryN: 每写入多少条日志消息就冲刷一次缓冲区 LogFile(const std::string& file_name, int flushEveryN) : filename_(file_name), // 初始化日志文件名 flushEveryN_(flushEveryN), // 设置每 n 条日志后强制冲刷一次 count_(0), // 初始化计数器为 0,记录写入的日志条数 file_(std::make_unique<AppendFile>(file_name)) // 创建 AppendFile 对象并初始化 unique_ptr { // 在这里可以添加其他初始化操作 } // 添加日志消息到 file_ 的缓冲区 // 参数 str: 指向日志消息的字符数组,len: 日志消息的长度 void Append(const char* str, int len) { file_->Append(str, len); // 将日志消息添加到 AppendFile 的缓冲区 count_++; // 增加计数器 // 检查计数器是否达到了冲刷的条件 if (count_ >= flushEveryN_) { count_ = 0; // 达到条件后重置计数器 file_->Flush(); // 冲刷文件的缓冲区,将日志写入磁盘 } } // 冲刷 file_ 的缓冲区 void Flush() { file_->Flush(); // 调用 AppendFile 的 Flush 方法 } private: // 日志文件名,存储日志输出的文件路径 const std::string filename_; // 每写 flushEveryN_ 条日志消息就强制冲刷 file_ 的缓冲区内容到磁盘中 const int flushEveryN_; // 计数器,记录调用 Append() 的次数 int count_; // 智能指针,用于管理 AppendFile 对象的生命周期 std::unique_ptr<AppendFile> file_;
};
LogFile类中有个AppendFile的智能指针成员变量file_,LogFile类通过操控file_就可以更加方便实现我们上述的需求。
该构造函数中创建一个AppendFile类对象的智能指针赋给成员变量file_。
在LogFile::Append()中调用AppendFile::Append()函数,并记录Append()的使用次数,(即也是记录添加了多少条日志消息)。达到次数后,就进行fflush()冲刷,即把日志消息写到磁盘文件中。
// LogFile 类构造函数
LogFile::LogFile(const std::string& file_name, int flushEveryN) : filename_(file_name) // 初始化日志文件名,使用传入的参数 file_name , flushEveryN_(flushEveryN) // 设置每 n 条日志后需要冲刷的频率 , count_(0) // 初始化记录已写入日志消息数量的计数器为 0
{ // 使用智能指针创建 AppendFile 对象,负责管理对日志文件的操作 file_ = std::make_unique<AppendFile>(filename_); // 将文件名传递给 AppendFile 的构造函数
} // 添加日志消息到 file_ 的缓冲区
void LogFile::Append(const char* logline, int len)
{ // 将日志消息添加到 AppendFile 的缓冲区 file_->Append(logline, len); // 增加计数器,记录已写入的日志条数 if (++count_ >= flushEveryN_) { count_ = 0; // 达到冲刷条件后重置计数器 file_->Flush(); // 强制冲刷缓冲区,将日志写入磁盘文件 }
}
这里可能又有疑惑了:为什么不把这些操作直接放到AppendFile类中呢,为什么就非要创建多一个类呢。
这是为了方便我们的使用,AppendFile类封装了对磁盘文件的操作,LogFile类就只需要操控AppendFile的智能指针file_就行,这样就能很大便利我们去编写代码。
封装起来方便我们编写代码,这也是为了让类的责任分工明确。
或者你也可以去试试把这两个类写成一个类看看是什么效果。
接着来看看重头戏,实现异步的类。
2.AsyncLogger类
// 异步日志类,用于在单独的线程中处理日志输出
class AsyncLogger
{
public: // 构造函数,参数1为日志文件名,参数2为每隔多少秒冲刷缓冲区,默认为3秒 AsyncLogger(const std::string& fileName, int flushInterval = 3) : flushInterval_(flushInterval), // 设置冲刷时间间隔 is_running_(false), // 初始化运行状态 fileName_(fileName), // 保存日志文件名 currentBuffer_(std::make_unique<Buffer>()), // 初始化当前缓冲区 nextBuffer_(std::make_unique<Buffer>()) // 初始化下一个缓存区 { // 这里可以添加其他初始化操作 } // 析构函数 ~AsyncLogger() { stop(); // 确保在析构时停止线程 } // 将日志消息添加到缓冲区 void Append(const char* logline, int len); // 启动异步日志线程 void start() { is_running_ = true; // 设置运行状态为真 thread_ = std::thread([this]() { ThreadFunc(); }); // 创建并启动日志处理线程 } // 停止异步日志线程 void stop(); public: // 后端日志线程函数 void ThreadFunc(); // 定义固定大小的缓冲区 using Buffer = FixedBuffer<KLargeBuffer>; using BufferPtr = std::unique_ptr<Buffer>; // 缓冲区智能指针 using BufferVector = std::vector<BufferPtr>; // 缓冲区向量类型 // 冲刷时间间隔 const int flushInterval_; bool is_running_; // 日志线程运行状态 std::string fileName_; // 日志文件名 // 后端线程 std::thread thread_; // 互斥锁,用于保护共享数据的线程安全 std::mutex mutex_; std::condition_variable cond_; // 条件变量,用于线程同步 // 当前和下一个缓冲区指针 BufferPtr currentBuffer_; BufferPtr nextBuffer_; // 等待写入文件的已填满的缓冲向量 BufferVector buffers_;
};
这里的异步主要是用空间换时间。该类里面的成员变量很多。
muduo中的日志使用的是双缓冲技术,这里讲解的也主要是参考陈硕书籍《Linux多线程服务端编程》。基本思路是先准备好两块buffer:currentBuffer_和nextBuffer_。前端负责往currentBuffer_中填写日志消息,后端负责将nextBuffer_的数据写入文件中(磁盘)。
当currentBuffer_写满后,交换currentBuffer_和nextBuffer_,(交换之后,那么前端就是往nextBuffer_中填写日志消息了),让后端将currentBuffer_的数据写入文件,而前端就往nextBuffer_中填入新的日志消息,如此往复。
先来看看发送方的代码,即是前端代码。
// 将日志消息添加到当前缓冲区
void AsyncLogger::Append(const char* logline, int len)
{ // 使用唯一锁保护当前的共享资源,确保线程安全 std::unique_lock<std::mutex> lock(mutex_); // 检查当前缓冲区是否有足够的可用空间来添加新的日志消息 if (currentBuffer_->Available() > len) { // 当前缓冲区有足够的空间,直接将日志消息添加到缓冲区 currentBuffer_->Append(logline, len); } else { // 当前缓冲区没有足够的空间,准备处理已满的缓冲区 // 将已满的当前缓冲区移动到等待写入的缓冲区容器中 buffers_.emplace_back(std::move(currentBuffer_)); // 如果下一个可用的缓冲区不为空,则将其移动给 currentBuffer_ if (nextBuffer_) { currentBuffer_ = std::move(nextBuffer_); } else { // 否则,创建一个新的缓冲区并赋值给 currentBuffer_ currentBuffer_.reset(new Buffer); } // 将日志消息添加到新的当前缓冲区 currentBuffer_->Append(logline, len); // 通知后端日志线程,可以读取新填充的缓冲区 cond_.notify_one(); }
}
该函数中主要有3种情况,逻辑也清晰,要说的也在注释中了哈。
而cond_.notify_one()就会唤醒后端log线程,把日志消息真正写入到磁盘文件中了。
来看看后端log线程函数。
// 后端日志线程函数,负责从缓冲区读取日志并写入磁盘文件
void AsyncLogger::ThreadFunc()
{ // ======================== 步骤1 ============================ // 打开磁盘日志文件,创建 LogFile 对象 LogFile output(fileName_); // 通过文件名初始化日志文件对象 // 准备好后端备用的缓冲区 1 和 2 auto newBuffer1 = std::make_unique<Buffer>(); // 创建第一个备用缓冲区 auto newBuffer2 = std::make_unique<Buffer>(); // 创建第二个备用缓冲区 // 准备读取的缓冲区向量 BufferVector buffersToWrite; buffersToWrite.reserve(16); // 预留 16 个元素空间,以提高效率 // 后端线程的无限循环,处理日志的写入 while (is_running_) { { // ============================= 步骤2 ========================= std::unique_lock<std::mutex> lock(mutex_); // 锁定 mutex_ 以确保线程安全 // 如果当前没有缓存的日志数据,等待条件变量 if (buffers_.empty()) { // 对条件变量进行等待,直到超时或有新的日志写入 cond_.wait_for(lock, std::chrono::seconds(flushInterval_)); } // 将当前缓冲区移动到待写入的缓冲区容器中 buffers_.emplace_back(std::move(currentBuffer_)); // 将后端线程准备好的缓冲区 1 赋值给 currentBuffer_ currentBuffer_ = std::move(newBuffer1); // 交换 buffers_ 和 buffersToWrite,保证后端线程可以写入 buffersToWrite.swap(buffers_); // 如果下一个可用的缓冲区为空,将后端线程的缓冲区 2 赋给 nextBuffer_ if (!nextBuffer_) { nextBuffer_ = std::move(newBuffer2); } } // 离开锁的作用域, mutex_ 解锁 // ======================== 步骤3 ======================= // 将缓冲区内容写入日志文件 for (size_t i = 0; i < buffersToWrite.size(); ++i) { // 将已填满的缓冲区中的数据写入到磁盘文件 output.Append(buffersToWrite[i]->GetData(), buffersToWrite[i]->GetLength()); } // ========================= 步骤4 ===================== // 如果待写入的缓冲区数量过多,丢弃多余的缓冲区 if (buffersToWrite.size() > 2) { buffersToWrite.resize(2); // 仅保留最近的两个缓冲区 } // 恢复后端备用缓冲区 if (!newBuffer1) { // 检查 newBuffer1 是否为空 newBuffer1 = buffersToWrite.back(); // 从待写入的缓冲区中获取一个 buffersToWrite.pop_back(); // 从向量中移除它 newBuffer1->Reset(); // 将缓冲区的数据指针归零,准备重新使用 } // newBuffer2 的操作与 newBuffer1 相似,此处省略具体代码 // 丢弃无用的已写缓冲区 buffersToWrite.clear(); // 清除当前的待写入缓冲区 output.Flush(); // 冲刷输出,确保数据写入磁盘 } output.Flush(); // 在线程结束前最后一次冲刷,确保所有数据都写入
}
步骤1:
先打开保存日志的磁盘文件,准备好后端备用的空闲缓冲区1、2,用来交换已写好日志消息的缓冲区。也备好读取的缓冲区vector,用来交换已写入文件的填完完毕的缓冲vector。
这里都是为了在临界区内交换而不用阻塞的。
步骤2:到了while(1)循环里面了。也到了需要加锁的临界区。
首先等待条件是否触发,等待触发的条件有两个:其一是超时(超过默认的3s),其二是前端写满了一个或多个buffer。注意这里使用的是if(),没有使用while()循环的。
当条件满足时,将当前缓冲(currentBuffer_)移入buffers_,并立刻将空闲的newBuffer1移为当前缓冲。要注意的是,这整段代码是位于临界区内的,所以不会有任何race condition。
接下来将已填写完毕的缓冲vector(buffers_)与备好的缓冲区buffersToWrite交换(和第12节中的函数doPendingFunctors()中也是要进行交换后,才好执行任务回调函数的),后面的代码就可以在临界区外安全地访问buffersToWrite,将其中的日志数据写入到磁盘文件中。
接着用newBuffer2替换nextBuffer_,这样前端就始终有一个预备buffer 可以调配。
nextBuffer_可以减少前端临界区分配内存的概率,缩短前端临界区长度。(因为前端Append()函数中有一种情况会currentBuffer_.reset(new Buffer);)
步骤3:这时已经出了临界区
若缓冲区过多,说明前端产生log的速度远大于后端消费的速度,这里只是简单的将它们丢弃(调用erase()函数)。之后把日志消息写入到磁盘(调用LogFile::Append函数),
步骤4:
调用vector的resize()函数重置该缓冲区vector大小,并将该缓冲区vector内的buffer重新填充给newBuffer1和newBuffer2,这样下一次执行的时候就还有两个空闲buffer可用于替换前端的当前缓冲和预备缓冲。
3.如何使用AsyncLogger类
在AsyncLogger类中我们使用的是LogFile类,不使用AppendFile类。AppendFile类是供LogFile对象使用的,我们基本不会直接去操作AppendFile类。
记住是通过LogFile类去调用AppendFile类。
那关键的异步日志的AsyncLogger类已实现。那该怎么使用该类呢。
对外界可见,可以直接使用的是Logger类,那怎么把这两个类关联起来呢。
AsyncLogger类中是使用AsyncLogger::start()开始后端线程,前端收集日志消息是AsyncLogger::Append()函数。我们就要从这两个函数下手。
在Logger类中添加
public: // 获取日志文件名,返回一个 std::string 类型 static std::string LogFileName() { return logFileName_; } // 定义输出回调函数类型,函数签名为接受一个字符指针和长度 using OutputFunc = void (*)(const char* msg, int len); // 定义冲刷回调函数类型,无参数返回类型 using FlushFunc = void (*)(); private: // 静态成员变量,存储日志文件的名字 static std::string logFileName_;
前一节我们输出到stdout是通过Logger类的析构函数来输出的。
//这是上一节的实现Logger::~Logger(){//其他的省略DefaultOutput(buf.Data(), buf.Length()); //是调用了fwrite()输出到stdout的}
那我们可以修改析构函数中的DefaultOutput()函数就行,我们可以调用AsyncLogger::Append()函数来让消息日志写到磁盘中。AsyncLogger::Append()就是收集消息日志,写到缓冲中,等到缓存满了,条件符合了,就会notify_one(),唤醒后端log线程,把日志消息真正写到磁盘中去。
再来看看AsyncLogger类对象在哪生成呢。
// 声明一个静态唯一指针,指向 AsyncLogger 实例
static std::unique_ptr<AsyncLogger> asyncLogger; // 声明一个静态 once_flag,用于确保某个操作只执行一次
static std::once_flag g_once_flag; // 静态成员变量,用于存储日志文件名
std::string Logger::logFileName_ = "../li22.log"; // 用于初始化 AsyncLogger 的函数
void OnceInit()
{ // 创建 AsyncLogger 的唯一指针并启动后端线程 asyncLogger = std::make_unique<AsyncLogger>(Logger::LogFileName()); asyncLogger->start(); // 开启后端日志处理线程
} // 异步输出日志消息的函数
void AsyncOutput(const char* logline, int len)
{ // 确保 OnceInit 只执行一次,防止多线程环境下的竞态条件 std::call_once(g_once_flag, OnceInit); // 仅初始化一次 // 将日志消息添加到 AsyncLogger 缓冲区 asyncLogger->Append(logline, len);
} // 全局变量:输出函数,指向 AsyncOutput 函数
Logger::OutputFunc g_output = AsyncOutput; // Logger 的析构函数
Logger::~Logger()
{ // 省略其他清理代码... // 调用全局输出函数,将缓冲区内容写入 g_output(buf.Data(), buf.Length()); // 输出当前缓冲区的数据
}
logFileName_ 就是我们的日志文件名。
那我们从该析构函数Logger::~Logger()开始,g_output()是全局输出函数,这里设置成了AsyncOutput()函数,AsyncOutput()函数使用了std::call_once,这是c++11新增的,可以保证不管是开多少个线程,该函数(OnceInit())只会执行一次。这就可以保证了后端log线程只有一个,这就不会有抢占写日志到磁盘的情况发生。asyncLogger的后端log线程就开始了,那就接着调用AsyncLogger::Append()函数就可以了。下面是其主要的流程图。
异步日志的主体就完成了,还有一些功能,小细节没有实现,下一节再讲解了。
这个双缓冲的设计真不错。
这一节我们就可以把日志输出到磁盘文件了。