一個輕量快速的C++日志庫


limlog

作一篇文章記錄實現,驅動優化迭代。 代碼倉庫

對日志庫的 特點期望

  • 正確性,這個是最重要也是最基本的,包括
    • 全部寫入.
    • 多個線程間的日志不穿插干擾.
    • 日志線程不能干擾主程序的運行邏輯.
  • 易讀性
    • 每條日志記錄占用一行空間,便於 awk 等工具的時候方便查找, 查看教程.
    • 日志信息包含必要的信息,包括日期時間、線程id、日志等級、日志發生的文件和行數.
  • 易用性
    • 在類 printf 和 std::cout 的用法中選擇類 std::cout 用法.
    • 根據時間和日志文件大小自動滾動文件.
    • 代碼行數不超過 2000 行
  • 速度
    • 速度的重要性放在最后,線程增加(不超過CPU數量)的情況下,對速度的影響在30%內,每條日志達到耗時在 1us 左右基本可以滿足要求.

日志格式和日志文件格式期望如下:

  • 日志文件的格式為:文件名.日期.時間.日志文件滾動的索引.log
    test_log_file.20191224.160354.7.log
  • 日志行的格式為 時間(精確到微秒)線程id 日志等級 日志信息 - 文件名:調用函數:行
    20191224 16:04:05.125049 2970 ERROR 5266030 chello, world - LogTest.cpp:thr_1():20

之前看的兩個日志庫,恰好都叫做 NanoLog:

  • Iyengar111/NanoLog, 跨平台、C++11、實現簡單(不超過1000行),每個日志行對應一個流對象、速度較快。
  • PlatformLab/NanoLog, 斯坦福一個實驗室的項目,Linux 平台、C++17、日志行異步寫入,每個線程有一個局部的循環字節緩沖區,速度恐怖,是見過最快的日志庫了,實現也非常細膩,優化的地方非常多,部分固定的日志信息直接編譯期確定下來。

在前一個日志庫學到了用戶接口宏的定義方式,后一個 nanolog 則是吸收了其后端的設計部分,尤其是 thread_local 的使用。

用法

通過宏定義為用戶提供調用接口。

/// Create a logline with log level \p level .
/// Custom macro \p __REL_FILE__ is relative file path as filename.
#ifndef __REL_FILE__
#define __REL_FILE__ __FILE__
#endif
#define LOG(level)                                                             \
    if (limlog::getLogLevel() <= level)                                        \
    limlog::LogLine(level, __REL_FILE__, __FUNCTION__, __LINE__)

#define LOG_TRACE LOG(limlog::LogLevel::TRACE)
#define LOG_DEBUG LOG(limlog::LogLevel::DEBUG)
#define LOG_INFO LOG(limlog::LogLevel::INFO)
#define LOG_WARN LOG(limlog::LogLevel::WARN)
#define LOG_ERROR LOG(limlog::LogLevel::ERROR)
#define LOG_FATAL LOG(limlog::LogLevel::FATAL)

每一條日志都會創建一個 LogLine 對象,這個對象非常小,用完就銷毀,幾乎沒有開銷。用法同 std::cout

#include "Log.h"
#include <string>

int main() {
    setLogFile("./test_log_file");
    setLogLevel(limlog::LogLevel::DEBUG);
    setRollSize(64); // 64MB

    std::string str("std::string");
    LOG_DEBUG << 'c' << 65535 << -9223372036854775808 << 3.14159 << "c@string" << str;

    return 0;
}

實現

前后端分離實現

  • 后端負責日志信息的落地處理,是整個日志系統的重心。
  • 前端負責組織日志信息的格式。

后端實現

采用單例模式,全局只有一個 LimLog 對象,在這個對象構造時也創建一個后台線程,通過無限循環掃描是否有數據可以寫入至文件中。通過條件變量控制該循環的終止及 LimLog 對象的銷毀。

使用 C++ 11 的關鍵字 thread_local 為每個線程都創建一個線程局部緩沖區,這樣我們的前端的日志寫入就可以不用加鎖(雖然不用加鎖,但是有另外的處理,后說明),每次都寫入到各自線程的緩沖區中。然后在上面的循環中掃描這些緩沖區,將其中的日志數據讀取至內部的輸出緩沖區中,再將其寫入到文件中。

每個緩沖區內存只創建一次使用,避免每次使用創建帶來不必要的申請消耗。每個線程局部緩沖區的大小為 1M(1 << 20 bytes), LimLog 對象內的輸出緩沖區大小為 16M(1 << 24 bytes), 這個大小並不是像看起來那么大就可以高枕無憂了,見 benchmark 性能分析

后端日志類的成員變量如下,


class LimLog{
    ···
  private:
    LogSink logSink_;
    bool threadSync_; // 前后端同步的標示
    bool threadExit_; // 后台線程標示
    bool outputFull_; // 輸出緩沖區滿的標示
    LogLevel level_;
    uint32_t bufferSize_;        // 輸出緩沖區的大小
    uint32_t sinkCount_;         // 寫入文件的次數
    uint32_t perConsumeBytes_;   // 每輪循環讀取的字節數
    uint64_t totalConsumeBytes_; // 總讀取的字節數
    char *outputBuffer_;         // first internal buffer.
    char *doubleBuffer_;         // second internal buffer, unused.
    std::vector<BlockingBuffer *> threadBuffers_;
    std::thread sinkThread_;
    std::mutex bufferMutex_; // internel buffer mutex.
    std::mutex condMutex_;
    std::condition_variable proceedCond_;  // 后台線程是否繼續運行的標條件變量
    std::condition_variable hitEmptyCond_; // 前端緩沖區為空的條件變量
    static thread_local BlockingBuffer *blockingBuffer_; // 線程局部緩沖區
};

LimLog 對象的析構和后台線程的退出需要十分的謹慎,這里程序出問題的重災區,前一個日志庫就是這個地方沒有處理好,導致有的時候日志還沒有寫完程序就退出了。為了保證這個邏輯的正確運行,采用了兩個條件變量 proceedCond_hitEmptyCond_。在執行對象的析構函數時,需要等待無可讀數據的條件變量 hitEmptyCond_, 保證這個時候線程局部緩沖區的數據都已經讀取完成,然后在析構函數中設置后台線程循環退出的條件,並且使用 proceedCond_ 通知后台線程運行。

以上邏輯在析構函數中的邏輯如下:

LimLog::~LimLog() {
    {
        // notify background thread befor the object detoryed.
        std::unique_lock<std::mutex> lock(condMutex_);
        threadSync_ = true;
        proceedCond_.notify_all();
        hitEmptyCond_.wait(lock);
    }

    {
        // stop sink thread.
        std::lock_guard<std::mutex> lock(condMutex_);
        threadExit_ = true;
        proceedCond_.notify_all();
    }
    ···
}

之后就轉入到后台線程的循環處理中,以下是所有后台線程處理的邏輯:

// Sink log info to file with async.
void LimLog::sinkThreadFunc() {
    while (!threadExit_) {
        // move front-end data to internal buffer.
        {
            std::lock_guard<std::mutex> lock(bufferMutex_);
            uint32_t bbufferIdx = 0;
            // while (!threadExit_ && !outputFull_ && !threadBuffers_.empty()) {
            while (!threadExit_ && !outputFull_ &&
                   (bbufferIdx < threadBuffers_.size())) {
                BlockingBuffer *bbuffer = threadBuffers_[bbufferIdx];
                uint32_t consumableBytes = bbuffer->consumable(); // 如果這個地方使用 used() 代替,就會出現日志行截斷的現象

                if (bufferSize_ - perConsumeBytes_ < consumableBytes) {
                    outputFull_ = true;
                    break;
                }

                if (consumableBytes > 0) {
                    uint32_t n = bbuffer->consume(
                        outputBuffer_ + perConsumeBytes_, consumableBytes);
                    perConsumeBytes_ += n;
                } else {
                    // threadBuffers_.erase(threadBuffers_.begin() +
                    // bbufferIdx);
                }
                bbufferIdx++;
            }
        }

        // not data to sink, go to sleep, 50us.
        if (perConsumeBytes_ == 0) {
            std::unique_lock<std::mutex> lock(condMutex_);

            // if front-end generated sync operation, consume again.
            if (threadSync_) {
                threadSync_ = false;
                continue;
            }

            hitEmptyCond_.notify_one();
            proceedCond_.wait_for(lock, std::chrono::microseconds(50));
        } else {
            logSink_.sink(outputBuffer_, perConsumeBytes_);
            sinkCount_++;
            totalConsumeBytes_ += perConsumeBytes_;
            perConsumeBytes_ = 0;
            outputFull_ = false;
        }
    }
}

當掃描發現無數據可取時,這個時候我們再循環讀取一下,防止析構函數在掃描數據和判斷 threadSync_ 執行的時間差之內又有日志產生,在又一輪循環后,確保所有的數據都已經讀取,通知析構函數繼續執行,然后退出循環,該后台線程函數退出,線程銷毀。 wait_for 是節約系統資源,出現無數據時當前線程就直接休眠 50us 避免不必要的循環消耗.

以上是對后台線程的處理,對數據的處理其實還是比較簡單的,后端提供了一個寫入數據的接口,該接口將前端產生的數據統一寫入至線程局部緩沖區中。

void LimLog::produce(const char *data, size_t n) {
    if (!blockingBuffer_) {
        std::lock_guard<std::mutex> lock(bufferMutex_);
        blockingBuffer_ =
            static_cast<BlockingBuffer *>(malloc(sizeof(BlockingBuffer)));
        threadBuffers_.push_back(blockingBuffer_);  // 添加到后端的緩沖區數組中
    }

    blockingBuffer_->produce(data, static_cast<uint32_t>(n)); // 將數據添加到線程局部緩沖區中
}

后台寫入線程這個時候,遍歷緩沖數組,讀取數據至內部的輸出緩沖區中,當出現可讀取的數據大於緩沖區時,直接寫入文件,剩余的數據下一路再讀取;當無可讀取的數據時,判斷是對象析構了還只是簡單的沒有數據讀取,並且休眠 50us;有數據可讀則寫入至文件,重置相關數據。

線程局部緩沖區為一個阻塞環形生產者消費者隊列,設計為阻塞態的原因為緩沖區的大小足夠大了,如果改為非阻塞態,當緩沖區的滿了的時候重新分配內存,還要管理一次內存的釋放操作。雖然線程內的操作不需要使用鎖來同步,在后台線程的循環范圍內,每次都要獲取數據的可讀大小,這里可能就涉及多個線程對數據的訪問了。最初的版本測試在沒有同步並且開啟優化的情況下,有一定概率在 consume() 操作陷入死循環,原因是該線程的 cache 的變量未及時得到更新。現在使用內存屏障來解決這個問題,避免編譯器優化。

對於后台線程讀取各線程內的數據有一個數據截斷的問題,如果使用 BlockingBuffer::unsed()獲取已讀數據長度,可能會出現日志行只被讀取了一半,然后立刻被另外的一個線程的日志截斷的情況,這不是我們期望的。為了解決這個問題,這里又提供了一個接口 BlockingBuffer::incConsumablePos() 來移動可以讀取的位置,該接口每次遞增的長度為一條日志行的長度,在一日志行數據寫入到局部緩沖區后調用該接口來移動可以讀取到的位置。而又提供接口 BlockingBuffer::consumable() 來獲取可以讀取的長度,這樣就避免了一條日志行被截斷的情況。

另外關於線程內無鎖的處理,使用 fence 指令來保持緩存一致性,因為局部線程內緩沖區中的 BlockingBuffer::consumablePos_BlockingBuffer::consumePos_ 會由於 BlockingBuffer::consumable() 暴露給其他線程。如果不使用同步措施,在多核的情況下,一個core cache 中的數據發生改變時,其他core感知不到變化,仍然對老的數據做處理。在這里一個 core 中的 producePos_ 等於 consumePos_ 的時候獲取不到新的數據變化,就會陷入無限循環。加上 fence 指令,保證 fence 前操作先於在 fence 后的操作完成,且變化被fence之后的操作感知。

uint32_t BlockingBuffer::used() const {
    std::atomic_thread_fence(std::memory_order_acquire);
    return producePos_ - consumePos_;
}

void BlockingBuffer::produce(const char *from, uint32_t n) {
    while (unused() < n)
        /* blocking */;
    ···
}

前端實現

一條日志信息在內存中的布局如下:
+------+-----------+-------+------+------+----------+------+
| time | thread id | level | logs | file | function | line |
+------+-----------+-------+------+------+----------+------+

20191224 16:04:05.125044 2970 ERROR  5266025chello, world - LogTest.cpp:thr_1():20
20191224 16:04:05.125044 2970 ERROR  5266026chello, world - LogTest.cpp:thr_1():20
20191224 16:04:05.125045 2970 ERROR  5266027chello, world - LogTest.cpp:thr_1():20

前端實現雖然簡單,但是優化的空間很大,使用 perf 做性能分析,性能熱點集中在參數格式化為字符串和時間的處理上面。

我們用 LogLine 來表示一個日志行,這個類非常簡單,重載了多個參數類型的操作符 <<, 及保存一些日志行相關信息,包括 文件名稱,函數,行和最重要的寫入字節數。
LogLine 不包含緩沖區,直接調用后端提供的接口 LimLog::produce() 將數據寫入到局部緩沖區內。

limlog 中,多次使用到了 thread_local 關鍵字,在前端實現部分也是如此。

time 的處理

在 Linux 中使用 gettimeofday(2) 來獲取當前的時間戳,其他平台使用 std::chrono 來獲取時間戳,用 localtime()strftime() 獲取本地的格式化時間 %Y%m%d %H:%M:%S, 合並微秒 timestamp % 1000000 組成 time.

這里利用 thread_local 做一個優化,格式化時間的函數調用次數。定義一個線程局部變量來存儲時間戳對應的秒數 t_second 和 字符數組來存儲格式化時間 t_datetime[24],當秒數未發生改變時,直接取線程局部字符數組而不用再調用格式化函數。

在 x86-64 體系下,gettimeofday(2) 在 vdso 機制下已經不是系統調用了,經測試發現直接調用 gettimeofday(2)std::chrono 的高精度時鍾快 15% 左右,雖然不是系統調用但是耗時還是大頭,一個調用大概需要 600ns 左右的時間。

thread id 的獲取

在 Linux 下,使用 gettid() 而不是 pthread_self() 來獲取線程id。
Windows 現在也支持了,我們要獲取當前線程id需要調用 std::this_thread::get_id(),但是返回的是 thread::id 對象,不是期望的整形數字,stl 實現如下(thread文件):

    /// get_id
    inline thread::id
    get_id() noexcept
    {
#ifdef __GLIBC__
      if (!__gthread_active_p())
	return thread::id(1);
#endif
      return thread::id(__gthread_self());
    }

使用 pthread_self() 構造一個 id 對象返回,觀察這個id對象是有一個 _M_thread 的私有變量的,這個值是 pthread_self()返回得來的。雖然不能直接獲取 _M_thread 變量,但是 thread::id 重載了 << 操作符,根據這個特點借助 std::stringstream 就可以取出這個變量了,實現如下(Log.cpp):

pid_t tid;

std::stringstream ss;
ss << std::this_thread::get_id();
ss >> tid;

這里繼續使用 thread_local 來對線程id優化,每個線程的id調用一次線程id獲取函數。創建一個線程局部線程id變量,判斷線程id存在時,就不再調用 gettid() 了。

日志行的其他項

日志等級、文件、函數和行數是簡單的字符串寫入操作了。

優化

使用一些先進的算法來實現加速處理。

整形轉換至字符串優化

實現參考了 葉勁鋒 大佬的 itoa-benchmark

老的算法是使用 std::to_string(vsnprintf) 來進行字符串格式化的操作,效率一般,這里 使用查表的方式加速字符串轉換處理,極大加快了處理的速度。

測試

TDD 是一種比較好的個人開發習慣,完整的測試用例有助於提高程序的正確性。

  • Timestamp, 對時間戳的測試,這里是日期的測試,見 test_timestamp().
  • BlockingBuffer, 對循環生產者消費者隊列的測試,見 test_blocking_buffer(), 對隊列的相關屬性(大小,生產消費索引,可消費索引,已/未使用空間)進行測試;隊列的相關索引都是自然增長的,到了 UINT32_MAX 自然的歸零,所以這里也測試了索引不超過 BlockingBuffer::size() 和超過這個大小的情況。
  • NumToString, 目前只有整形轉字符串,對 INT*_MAX, INT*_MIN, UINT*_MAX 進行測試,目前程序刻意留出了三個錯誤的用例,見 test_itoa().

benchmark

即使速度在日志庫中不是最重要的,但是速度肯定是要測的。這里使用的測試用例比較多,也是測試的一個難點了,因為磁盤的的容量比較有限。測試的機器為 i7 9700K,普通硬盤,OS 為 Linux 4.4(WSL).

測試用例 如下:

注: - 表示一個該日志包含的元素,-4 表示4個,-+4096 表示除了寫入一個該元素外,再追加 4096 字節長度的該元素。99(596/6) 則表示該用例包括對六種日志的測試,每種日志分別打印 4 次相同元素,六種日志平均長度為 99.

序號/類型 長度(byte) bool char int16_t uint16_t int32_t uint32_t int64_t uint64_t double c@string std::string
1 82(495/6) - - - - - -
2 99(596/6) -4 -4 -4 -4 -4 -4
3 167(1006/6) -16 -16 -16 -16 -16 -16
4 180 - - - - - - - - - -
5 243 - - - - - - - - - + 64 -
6 243 - - - - - - - - - - + 64
7 435 - - - - - - - - - + 256 -
8 435 - - - - - - - - - - + 256
9 1202 - - - - - - - - - + 1024 -
10 1202 - - - - - - - - - - + 1024
11 4275 - - - - - - - - - + 4096 -
12 4275 - - - - - - - - - - + 4096

單位:微秒/條。

線程/序號 單個用例條數 (1.2.3條數x6) 1 2 3 4 5 6 7 8 9 10 11 12
1 10w 1.23 1.50 2.55 2.66 3.45 3.44 5.93 5.66 1.30 1.09 1.45 1.32
2 10w 1.20 1.46 2.45 2.51 3.29 3.27 5.14 5.15 1.19 1.16 12.79 65.54
4 10w 1.13 1.36 1.70 2.00 1.42 1.12 1.12 1.12 1.21 33.56 127.66 131.84
8 10w 1.22 1.34 1.96 1.74 16.41 6.67 26.08 28.47 68.16 75.51 260.09 262.99
1 100w 1.24 1.51 2.59 2.58 3.45 3.45 5.88 5.87 2.16 8.79 31.82 31.40
2 100w 1.21 1.52 2.55 2.86 3.27 3.26 5.03 5.17 17.17 17.65 63.49 63.20
4 100w 1.14 2.99 5.51 5.36 6.92 7.31 13.21 12.88 35.16 36.24 128.66 130.22
8 100w 3.17 6.37 11.03 10.89 14.73 14.73 28.66 23.08 71.57 72.51 263.82 273.63

由於是在WSL下跑的測試用例,這個性能比我老i3還慢,在vs2019下編譯,速度大概一條80字節的速度大概在0.55us左右,后面再在原生linux上和用vs編譯一下進行壓測。測試的用例數沒有繼續調高,是因為在單個用例在 100w 的時候,8線程壓測的情況下生成的日志就有 100G 以上了,這里可以優化一下,寫一個腳本單批次運行這些用例。

觀察這個表格,用例1是比較簡單的寫入了,100w 單個用例條數在8線程的情況下明顯耗時升高,CPU是8核心,程序此時是 9 個線程滿負荷的運行了,猜測這個情況是后端日志明顯變多,這個時候后台線程不可能進入睡眠狀態了,只能前端生產線程進行切換了,切換有開銷,且另外一個線程此時得不到處理器。

性能分析

在數據量小的時候,增加線程對速度的影響並不大,一旦增加單條日志的長度,速度線性降低,產生這個現象的原因是:
后端是一個消費線程對應多個生產線程,消費線程讀取數據的能力固定(磁盤寫入的速度180M/s),當生產線程數量增加時,均攤到各生產線程的數據讀取延遲增加,而前端寫入是阻塞模式的,線程局部緩沖區大小只有 1M(1<<20 bytes), 在單條日志數據長度很大的時候,線程局部緩沖區很容易達到飽並且而發生阻塞,進而整個日志系統的速度下降,就產生了耗時線性增加的情況。

在有些日志庫中提供了 非阻塞 寫入方式,如果達到磁盤的寫入瓶頸,就丟棄最老的日志數據,見 spdlog 的處理。

TODO

  • 速度
    • LogLine::operator<<() 浮點數轉字符串做優化
    • 日志行的時間戳獲取優化,這里的時間占去60%左右的開銷

Change Log

  1. 跨平台實現.
    1. windows平台線程id獲取. commit.
    2. 使用 std::atomic_thread_fence 代替gcc內聯匯編 fence 指令. commit
  2. 優化整形數字轉換到字符串的算法. commit.
  3. 增加測試用例和性能壓測. commit.
  4. 增加每天日志文件滾動的功能. commit

參考

  1. Iyengar111/NanoLog, Low Latency C++11 Logging Library.
  2. PlatformLab/NanoLog, Nanolog is an extremely performant nanosecond scale logging system for C++ that exposes a simple printf-like API.
  3. kfifo, 內核環形生產者消費者隊列.
  4. memory barries, 內核的內存屏障.
  5. itoa-benchmark, 幾種對整形數字轉換成字符串的實現和性能比較,limlog 選擇的是相對簡單的查表算法。
  6. spdlog, Fast C++ logging library. 易用性和性能非常好的日志庫,工業使用值得推薦。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM