上文主要介紹了Graylog的功能與架構,本篇我們來看看Graylog的源碼
一. 項目啟動(CmdLineTool)
啟動基本做了這幾件事:初始化logger,插件加載(這里用到了Java SPI機制),性能度量Metrics初始化(用的是codahale metrics,這個在開源軟件中用的
還挺多的,Kafka用的也是這個),最后將使用了JMXReporter將性能監控暴露給JMX。
1. 插件加載(CmdLineTools類):
Graylog自定義了一個ClassLoader用於加載指定目錄下的插件(ChainingClassLoader),將插件加載至內存后做了一個簡單的版本校驗。
之前有提到Graylog插件采用的是Java SPI機制,可以在PluginLoader這個類中看到:
這里,終於看到了熟悉的ServiceLoader類,對SPI機制感興趣的朋友,可以搜索相關文章。
2.Rest接口服務(JerseyService類):
Rest方面,Graylog使用的是Jersey提供的web service,Jersey在國內好像一直不溫不火,但是國外的開源項目里用到的還挺多的。
項目啟動就介紹到這兒,Graylog在依賴注入方面,大量用到了Google的Guice框架,不過我對Guice一直是只聞其名,有機會再研究吧 :)。
二.Graylog的Journal機制
通常,在項目中,如果遇到大量日志處理問題,我們很可能會選擇Kafka做消息隊列,但在有些客戶系統資源有限的情況下,消息隊列集群顯然是一個
奢侈的選擇,Graylog的處理方式很有意思,它並沒有完全實現自己的一套消息隊列機制,而是使用了Kafka日志處理底層的API,你可以認為,Graylog
將Kafka做的一些工作(磁盤日志管理,日志緩沖,定時清理等)放到自己進程里進行。
熟悉Kafka的朋友看到這里應該不會陌生了,Graylog同Kafka一樣,將磁盤上的日志分為Segments進行管理。
我們來看下Graylog寫入磁盤的文件,你會發現和Kafka並沒有什么不同
此外Graylog中還有PeriodicalsService定時服務(負責系統所有定時任務),ActivityWriter用戶操作入庫服務.etc,比較簡單,在此不一一列舉.。
三. Graylog中的數據流轉
說了這么多,Graylog既然是日志處理軟件,那么一條來自系統外部的日志進入Graylog server后的處理流程是什么呢?
我將Graylog對日志的處理進行了簡單的分層,數據的處理流程大致是:
系統外部的原始數據->Transport(數據傳輸層) -> Input(數據接入層)-> InputBuffer(接入層緩沖ringBuffer)-> Encoder/Decoder(數據編解碼層)-> 自帶的Kafka(可選)->Process Buffer(業務處理層緩沖ringBuffer)-> ProcessBufferProcessor(日志業務處理器) –> OutputBuffer(日志輸出/入庫/轉發緩沖RingBuffer) –> OutputBufferProcessor(輸出/入庫/轉發處理器)
下面以Kafka日志接入為例,看看數據在graylog的整體處理流程:
- 日志接入層(KafkaTransport):
2.數據進入接入層緩沖(MessageInput)
InputBufferImpl
3. 日志解碼處理器+日志業務處理器+寫入自帶的Kafka,通過Disruptor Handler (InputBufferImpl)
4. 日志直接寫入業務邏輯緩沖RingBuffer(不通過Kafka)
5. 日志寫入kafka,由后續流程消費(JournallingMessageHandler)
6. 一個后台線程,不斷從自帶的Kafka中讀取數據,寫入到下個流程的Buffer里(JournalReader類)
7. 業務處理器 ProcessBufferProcessor(graylog所有對日志進行的業務處理都綁定到了這個類里,如日志過濾,規則,威脅情報富化,地理位置富化,知識庫…)
具體的處理器實現比較復雜,放到最后講吧。
8. 數據輸出/轉發/入庫Buffer
9. 數據輸出(OutputBufferProcessor)
數據可能會有多個output,輸出到output的過程是異步而且有時間限制,不會影響到系統整體吞吐量。
以寫入ES為例(BlockingBatchedESOutput)
此外,系統還有一個線程負責定時將內存數據flush到ES,這里就不貼代碼了。
10. MessageProcessor
系統自帶的數據處理器包括GeoIpProcessor,MessageFilterChainProcessor,PipelineInterpreter
(1) GeoIpProcessor:數據富化(為原始日志添加地理位置,后續可視化時使用)
(2)MessageFilterChainProcessor(包含了所有的日志過濾器MessageFilter)
日志會一一經過排序后的過濾器,如果滿足filter條件,標記為丟棄,並更新kafka offset。下面逐一分析過濾器。