一個輕巧高效的多線程c++stream風格異步日志(二)
前言
本文緊接上一篇文章: 介紹上文中的一條條日志是如何異步導入本地文件的.
首先會簡單介紹下LogFile類,之后會具體講解下AsyncLogging中的雙緩沖機制.
整個日志模塊的結構圖,

LogFile類
LogFile日志文件類 完成日志文件的管理工作.
rollFile() :滾動文件 當日志超過m_rollSize大小時會滾動一個新的日志文件出來.
getLogFileName() :用與滾動日志時,給日志文件取名,以滾動時間作為后綴.
m_mutex :用於append()數據時,給文件上鎖.
append() :黏入日志.
flush() :沖刷緩沖.
LogFile 有一個AppendFIle類,它是最終用於操作本地文件的類.
append() : 里面會調用系統函數fwrite()寫入本地文件.
flush() : 沖刷緩沖.
writtenBytes() : 獲取已寫字節數.

AsyncLogging類
AsyncLogging異步日志類, 完成日志的異步寫入工作.
介紹它的接口前,先描述下它的工作邏輯.
AsyncLogging 有以下述幾類緩存.
m_currentBuffer : 指向當前接收其他線程append過來的日志的緩存.
m_buffers : 用於存放當前已寫滿或過了沖刷周期的日志緩存的指針容器.
m_nextBuffer : 指向當m_currentBuffer滿后用於替代m_currentBuffer的緩存.
backupBuffer1 : 備用緩存.
backupBuffer2 : 備用緩存.
buffersToWrite : 和m_buffers通過交換swap()后append()到LogFile的指針容器.
AsyncLogging 使用的雙緩沖機制 有兩個緩存容器 : m_buffers 、buffersToWrite 交替使用 . 一下我們簡稱為 A 和 B .
A 用於接收 其他線程 append() 進來的日志.
B 用於將目前已接受的緩存 寫入 日志文件. 當B寫完時 , clean() B , 交換A,B,如此往復.
優點 : 新建的日志不必等待磁盤操作,也避免了每條新日志都觸發日志線程,而是將多條日志拼程一個大的buffer 傳送給日志線程寫入文件. 相當於批處理, 減少線程喚醒頻率 ,降低開銷。
另外 ,為了及時將 日志消息寫入文件, 即是 buffer A 中還沒有push進來日志 也會每三秒 執行一次上述的寫入操作.

AsyncLogging使用一個更大的LogBuffer來保存一條條Logger傳送過來的日志.
Mutex :用來控制多線程的寫入.
Condition : 用來等待緩沖區中的數據.
Thread : 使用一個線程處理緩存的交換,以及日志的寫入.

AsyncLogging實現
下面會給出AsyncLogging的簡單實現.
實際上還有幾個備用緩存,這里沒有加上去,以便於理解程序; 備用緩存主要是為了減少反復new 操作帶來的系統開銷,
#ifndef _ASYNC_LOGGING_HH
#define _ASYNC_LOGGING_HH
#include "MutexLock.hh"
#include "Thread.hh"
#include "LogStream.hh"
#include "ptr_vector.hh"
#include "Condition.hh"
#include <string>
class AsyncLogging
{
public:
AsyncLogging(const std::string filePath, off_t rollSize, int flushInterval = 3);
~AsyncLogging();
void start(){
m_isRunning = true;
m_thread.start();
}
void stop(){
m_isRunning = false;
m_cond.notify();
}
void append(const char *logline, int len);
private:
AsyncLogging(const AsyncLogging&);
AsyncLogging& operator=(const AsyncLogging&);
void threadRoutine();
typedef LogBuffer<kLargeBuffer> Buffer;
typedef oneself::ptr_vector<Buffer> BufferVector;
typedef oneself::auto_ptr<Buffer> BufferPtr;
const int m_flushInterval;
bool m_isRunning;
off_t m_rollSize;
std::string m_filePath;
Thread m_thread;
MutexLock m_mutex;
Condition m_cond;
BufferPtr m_currentBuffer;
BufferVector m_buffers;
};
#endif
//AsyncLogging.cpp
#include "AsyncLogging.hh"
#include "LogFile.hh"
#include <assert.h>
#include <stdio.h>
AsyncLogging::AsyncLogging(const std::string filePath, off_t rollSize, int flushInterval)
:m_filePath(filePath),
m_rollSize(2048),
m_flushInterval(flushInterval),
m_isRunning(false),
m_thread(std::bind(&AsyncLogging::threadRoutine, this)),
m_mutex(),
m_cond(m_mutex),
m_currentBuffer(new Buffer),
m_buffers()
{
}
AsyncLogging::~AsyncLogging(){
if(m_isRunning) stop();
}
void AsyncLogging::append(const char* logline, int len){
MutexLockGuard lock(m_mutex);
if(m_currentBuffer->avail() > len){
m_currentBuffer->append(logline, len);
}
else{
m_buffers.push_back(m_currentBuffer.release());
m_currentBuffer.reset(new Buffer);
m_currentBuffer->append(logline, len);
m_cond.notify();
}
}
void AsyncLogging::threadRoutine(){
assert(m_isRunning == true);
LogFile output(m_filePath, m_rollSize, false);
BufferVector buffersToWrite;
buffersToWrite.reserve(8);
while(m_isRunning){
assert(buffersToWrite.empty());
{
MutexLockGuard lock(m_mutex);
if(m_buffers.empty()){
m_cond.waitForSeconds(m_flushInterval);
}
m_buffers.push_back(m_currentBuffer.release());
m_currentBuffer.reset(new Buffer);
m_buffers.swap(buffersToWrite);
}
assert(!buffersToWrite.empty());
for(size_t i = 0; i < buffersToWrite.size(); ++i){
output.append(buffersToWrite[i]->data(), buffersToWrite[i]->length());
}
buffersToWrite.clear();
output.flush();
}
output.flush();
}
增加備用緩存
增加備用緩存優化上面程序,上面程序一共在兩個地方執行了new操作.
1.m_currentBuffer 填滿時,需要把它填進容器的時候.
2.到時間了需要把m_currentBuffer里面的內容寫入本地文件時,會把它當前的內容移出來,這時候需要new一個新緩存來給m_currentBuffer.
於是我們准備一個m_nextBuffer來做m_currentBuffer的備用緩存.同時在線程中增加兩個backupBuffer 給m_nextBuffer 當備用緩存;當日志量大到不夠用的時候, 再考慮用new 操作來動態添加緩存。

#ifndef _ASYNC_LOGGING_HH
#define _ASYNC_LOGGING_HH
#include "MutexLock.hh"
#include "Thread.hh"
#include "LogStream.hh"
#include "ptr_vector.hh"
#include "Condition.hh"
#include <memory>
#include <string>
class AsyncLogging
{
public:
AsyncLogging(const std::string filePath, off_t rollSize, int flushInterval = 3);
~AsyncLogging();
void start(){
m_isRunning = true;
m_thread.start();
}
void stop(){
m_isRunning = false;
m_cond.notify();
}
void append(const char *logline, int len);
private:
AsyncLogging(const AsyncLogging&);
AsyncLogging& operator=(const AsyncLogging&);
void threadRoutine();
typedef LogBuffer<kLargeBuffer> Buffer;
typedef myself::ptr_vector<Buffer> BufferVector;
typedef std::unique_ptr<Buffer> BufferPtr;
const int m_flushInterval;
bool m_isRunning;
off_t m_rollSize;
std::string m_filePath;
Thread m_thread;
MutexLock m_mutex;
Condition m_cond;
BufferPtr m_currentBuffer;
BufferPtr m_nextBuffer;
BufferVector m_buffers;
};
#endif
//AsynvLogging.cpp
#include "AsyncLogging.hh"
#include "LogFile.hh"
#include <assert.h>
#include <stdio.h>
AsyncLogging::AsyncLogging(const std::string filePath, off_t rollSize, int flushInterval)
:m_filePath(filePath),
m_rollSize(rollSize),
m_flushInterval(flushInterval),
m_isRunning(false),
m_thread(std::bind(&AsyncLogging::threadRoutine, this)),
m_mutex(),
m_cond(m_mutex),
m_currentBuffer(new Buffer),
m_nextBuffer(new Buffer),
m_buffers()
{
}
AsyncLogging::~AsyncLogging(){
if(m_isRunning) stop();
}
void AsyncLogging::append(const char* logline, int len){
MutexLockGuard lock(m_mutex);
if(m_currentBuffer->avail() > len){
m_currentBuffer->append(logline, len);
}
else{
m_buffers.push_back(m_currentBuffer.release());
if(m_nextBuffer){
m_currentBuffer = std::move(m_nextBuffer);
}
else{
m_currentBuffer.reset(new Buffer);
}
m_currentBuffer->append(logline, len);
m_cond.notify();
}
}
void AsyncLogging::threadRoutine(){
assert(m_isRunning == true);
LogFile output(m_filePath, m_rollSize, false);
BufferPtr backupBuffer1(new Buffer);
BufferPtr backupBuffer2(new Buffer);
BufferVector buffersToWrite;
buffersToWrite.reserve(8);
while(m_isRunning){
assert(buffersToWrite.empty());
{
MutexLockGuard lock(m_mutex);
if(m_buffers.empty()){
m_cond.waitForSeconds(m_flushInterval);
}
m_buffers.push_back(m_currentBuffer.release());
m_currentBuffer = std::move(backupBuffer1);
m_buffers.swap(buffersToWrite);
if(!m_nextBuffer)
m_nextBuffer = std::move(backupBuffer2);
}
assert(!buffersToWrite.empty());
for(size_t i = 0; i < buffersToWrite.size(); ++i){
output.append(buffersToWrite[i]->data(), buffersToWrite[i]->length());
}
if(buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing
buffersToWrite.resize(2);
}
if(!backupBuffer1)
{
assert(!buffersToWrite.empty());
backupBuffer1 = std::move(buffersToWrite.pop_back());
backupBuffer1->reset();
}
if(!backupBuffer2)
{
assert(!buffersToWrite.empty());
backupBuffer2 = std::move(buffersToWrite.pop_back());
backupBuffer2->reset();
}
buffersToWrite.clear();
output.flush();
}
output.flush();
}
結語
本文主要介紹了muduo中AsyncLogging類的實現,其中的雙緩存機制.
LogFile類及AppendFIle類 分別是日志文件管理類和本地文件的基本操作類. 不難理解,感興趣的話可以看看muduo的源碼,本文不再往下寫了,如果想要全部源碼可以留言。
最新源碼:
https://github.com/BethlyRoseDaisley/SimpleMuduo/tree/master/AsyncLogging
