Graylog源碼分析


上文主要介紹了Graylog的功能與架構,本篇我們來看看Graylog的源碼

一. 項目啟動(CmdLineTool)

 啟動基本做了這幾件事:初始化logger,插件加載(這里用到了Java SPI機制),性能度量Metrics初始化(用的是codahale metrics,這個在開源軟件中用的

還挺多的,Kafka用的也是這個),最后將使用了JMXReporter將性能監控暴露給JMX。

  1. 插件加載(CmdLineTools類):

  Graylog自定義了一個ClassLoader用於加載指定目錄下的插件(ChainingClassLoader),將插件加載至內存后做了一個簡單的版本校驗。

         

  之前有提到Graylog插件采用的是Java SPI機制,可以在PluginLoader這個類中看到:

 

  這里,終於看到了熟悉的ServiceLoader類,對SPI機制感興趣的朋友,可以搜索相關文章。

    2.Rest接口服務(JerseyService類):

      Rest方面,Graylog使用的是Jersey提供的web service,Jersey在國內好像一直不溫不火,但是國外的開源項目里用到的還挺多的。

  

 項目啟動就介紹到這兒,Graylog在依賴注入方面,大量用到了Google的Guice框架,不過我對Guice一直是只聞其名,有機會再研究吧 :)。

二.Graylog的Journal機制

     通常,在項目中,如果遇到大量日志處理問題,我們很可能會選擇Kafka做消息隊列,但在有些客戶系統資源有限的情況下,消息隊列集群顯然是一個

奢侈的選擇,Graylog的處理方式很有意思,它並沒有完全實現自己的一套消息隊列機制,而是使用了Kafka日志處理底層的API,你可以認為,Graylog

將Kafka做的一些工作(磁盤日志管理,日志緩沖,定時清理等)放到自己進程里進行。

熟悉Kafka的朋友看到這里應該不會陌生了,Graylog同Kafka一樣,將磁盤上的日志分為Segments進行管理。

 我們來看下Graylog寫入磁盤的文件,你會發現和Kafka並沒有什么不同

 

 

此外Graylog中還有PeriodicalsService定時服務(負責系統所有定時任務),ActivityWriter用戶操作入庫服務.etc,比較簡單,在此不一一列舉.。

 

三. Graylog中的數據流轉

   說了這么多,Graylog既然是日志處理軟件,那么一條來自系統外部的日志進入Graylog server后的處理流程是什么呢?

   我將Graylog對日志的處理進行了簡單的分層,數據的處理流程大致是:

  系統外部的原始數據->Transport(數據傳輸層) -> Input(數據接入層)-> InputBuffer(接入層緩沖ringBuffer)-> Encoder/Decoder(數據編解碼層)-> 自帶的Kafka(可選)->Process Buffer(業務處理層緩沖ringBuffer)-> ProcessBufferProcessor(日志業務處理器) –>       OutputBuffer(日志輸出/入庫/轉發緩沖RingBuffer) –> OutputBufferProcessor(輸出/入庫/轉發處理器)

 

下面以Kafka日志接入為例,看看數據在graylog的整體處理流程:

 

  1. 日志接入層(KafkaTransport):

2.數據進入接入層緩沖(MessageInput)

   

InputBufferImpl

 

3.  日志解碼處理器+日志業務處理器+寫入自帶的Kafka,通過Disruptor Handler (InputBufferImpl)

 

 4. 日志直接寫入業務邏輯緩沖RingBuffer(不通過Kafka) 

5. 日志寫入kafka,由后續流程消費(JournallingMessageHandler)

 6. 一個后台線程,不斷從自帶的Kafka中讀取數據,寫入到下個流程的Buffer里(JournalReader類)

   

7. 業務處理器  ProcessBufferProcessor(graylog所有對日志進行的業務處理都綁定到了這個類里,如日志過濾,規則,威脅情報富化,地理位置富化,知識庫…)

 

  具體的處理器實現比較復雜,放到最后講吧。

8. 數據輸出/轉發/入庫Buffer

9. 數據輸出(OutputBufferProcessor)

   數據可能會有多個output,輸出到output的過程是異步而且有時間限制,不會影響到系統整體吞吐量。

以寫入ES為例(BlockingBatchedESOutput)

此外,系統還有一個線程負責定時將內存數據flush到ES,這里就不貼代碼了。

 

10. MessageProcessor

系統自帶的數據處理器包括GeoIpProcessor,MessageFilterChainProcessor,PipelineInterpreter

 (1)  GeoIpProcessor:數據富化(為原始日志添加地理位置,后續可視化時使用)

 

(2)MessageFilterChainProcessor(包含了所有的日志過濾器MessageFilter)

   

日志會一一經過排序后的過濾器,如果滿足filter條件,標記為丟棄,並更新kafka offset。下面逐一分析過濾器。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM