轉-filebeat 源碼分析


背景

在基於elk的日志系統中,filebeat幾乎是其中必不可少的一個組件,例外是使用性能較差的logstash file input插件或自己造個功能類似的輪子:)。

在使用和了解filebeat的過程中,筆者對其一些功能上的實現產生了疑問,諸如:

  1. 為什么libbeat能如此容易的進行擴展,衍生出多個應用廣泛的beat運輸程序?
  2. 為什么它的性能比logstash好? (https://logz.io/blog/filebeat-vs-logstash/
  3. 是如何實現‘保證至少發送一次’這個feature的呢?
  4. 代碼模塊是如何划分、如何組織、如何運行的呢?
  5. ...

為了找到答案,筆者閱讀了filebeat和部分libbeat的源碼(read the fucking source code),本文即是對此過程的一次總結。一方面是方便日后回顧,另一方面也希望能解答大家對filebeat的一些疑惑。

本文主要內容包括filebeat基本介紹、源碼解析兩個部分,主要面向的是:想要了解filebeat實現、想改造或擴展filebeat功能或想參考filebeat開發自定義beats的讀者。

filebeat基本介紹

filebeat是一個開源的日志運輸程序,屬於beats家族中的一員,和其他beats一樣都基於libbeat庫實現。其中,libbeat是一個提供公共功能的庫,功能包括: 配置解析、日志打印、事件處理和發送等。

對於任一種beats來說,主要邏輯都包含兩個部分[2]

  1. 收集數據並轉換成事件
  2. 發送事件到指定的輸出

其中第二點已由libbeat實現,因此各個beats實際只需要關心如何收集數據並生成事件后發送給libbeat的Publisher。beats和libeat的交互如下圖所示:

beats和libeat的交互

具體到filebeat,它能采集數據的類型包括: log文件、標准輸入、redis、udp和tcp包、容器日志和syslog,其中最常見的是使用log類型采集文件日志發送到Elasticsearch或Logstash。而后續的源碼解析,也主要基於這種使用場景。

基於libbeat實現的filebeat,主要擁有以下幾個特性[3]

  1. 在運輸日志內容方面它擁有健壯性:正常情況下,filebeat讀取並運輸日志行,但如果期間程序因某些原因被中斷了,它會記住中斷前已處理成功的讀取位置,在程序再次啟動時恢復。
  2. 可以解析多種常見的日志格式,簡化用戶操作: filebeta內置多個模塊(module):auditd、Apache、NGINX、System、MySQL等,它們將常見日志格式的收集、解析和可視化簡化成了一個單獨命令,模塊的實現方式:基於操作系統定義各個模塊對應日志的默認路徑、使用ingest node的pipeline解析特定的日志格式、結合kibana dashboard可視化解析后特定格式的日志。
  3. 支持容器應用的日志收集,並且能通過libbeat的autodiscover特性檢測新加入的容器並使用對應的模塊(module)或輸入
  4. 不會使pipeline超過負載:使用backpressure-sensitive 協議感知后端(比如logstash、elasticsesarch等)壓力,如果后端忙於處理數據,則降低讀日志的速度;一旦阻塞被解決,則恢復。
  5. 可以將運輸日志到elasticsearch或logstash中,在kibana進行可視化

filebeat源碼解析

模塊結構

下圖是filebeat及使用libbeat的一些主要模塊,為筆者根據源碼的理解所作。

filebeat模塊結構

1. filebeat主要模塊

  • Crawler: 管理所有Input收集數據並發送事件到libbeat的Publisher
  • Input: 對應可配置的一種輸入類型,每種類型都有具體的Input和Harvester實現。 配置項
    • Harvester: 對應一個輸入源,是收集數據的實際工作者。配置中,一個具體的Input可以包含多個輸入源(Harvester)
  • module: 簡化了一些常見程序日志(比如nginx日志)收集、解析、可視化(kibana dashboard)配置項
    • fileset: module下具體的一種Input定義(比如nginx包括access和error log),包含:1)輸入配置;2)es ingest node pipeline定義;3)事件字段定義;4)示例kibana dashboard
  • Registrar:用於在事件發送成功后記錄文件狀態

2. libbeat主要模塊

  • Publisher:
    • client: 提供Publish接口讓filebeat將事件發送到Publisher。在發送到隊列之前,內部會先調用processors(包括input 內部的processors和全局processors)進行處理。
    • processor: 事件處理器,可對事件按照配置中的條件進行各種處理(比如刪除事件、保留指定字段等)。配置項
    • queue: 事件隊列,有memqueue(基於內存)和spool(基於磁盤文件)兩種實現。配置項
    • outputs: 事件的輸出端,比如ES、Logstash、kafka等。配置項
    • acker: 事件確認回調,在事件發送成功后進行回調
  • autodiscover:用於自動發現容器並將其作為輸入源

filebeat目錄組織

├── autodiscover        # 包含filebeat的autodiscover適配器(adapter),當autodiscover發現新容器時創建對應類型的輸入
├── beater              # 包含與libbeat庫交互相關的文件
├── channel             # 包含filebeat輸出到pipeline相關的文件
├── config              # 包含filebeat配置結構和解析函數
├── crawler             # 包含Crawler結構和相關函數
├── fileset             # 包含module和fileset相關的結構
├── harvester           # 包含Harvester接口定義、Reader接口及實現等
├── input               # 包含所有輸入類型的實現(比如: log, stdin, syslog) ├── inputsource # 在syslog輸入類型中用於讀取tcp或udp syslog ├── module # 包含各module和fileset配置 ├── modules.d # 包含各module對應的日志路徑配置文件,用於修改默認路徑 ├── processor # 用於從容器日志的事件字段source中提取容器id ├── prospector # 包含舊版本的輸入結構Prospector,現已被Input取代 ├── registrar # 包含Registrar結構和方法 └── util # 包含beat事件和文件狀態的通用結構Data └── ...

除了以上目錄注釋外,以下將介紹一些個人認為比較重要的文件的詳細內容,讀者可作為閱讀源碼時的一個參考。

/beater

包含與libbeat庫交互相關的文件:

  • acker.go: 包含在libbeat設置的ack回調函數,事件成功發送后被調用
  • channels.go: 包含在ack回調函數中被調用的記錄者(logger),包括:
    1. registrarLogger: 將已確認事件寫入registrar運行隊列
    2. finishedLogger: 統計已確認事件數量
  • filebeat.go: 包含實現了beater接口的filebeat結構,接口函數包括:
    1. New:創建了filebeat實例
    2. Run:運行filebeat
    3. Stop: 停止filebeat運行
  • signalwait.go:基於channel實現的等待函數,在filebeat中用於:
    1. 等待fileebat結束
    2. 等待確認事件被寫入registry文件

/channel

filebeat輸出(到pipeline)相關的文件

  • factory.go: 包含OutletFactory,用於創建輸出器Outleter對象
  • interface.go: 定義輸出接口Outleter
  • outlet.go: 實現Outleter,封裝了libbeat的pipeline client,其在harvester中被調用用於將事件發送給pipeline
  • util.go: 定義ack回調的參數結構data,包含beat事件和文件狀態

/input

包含Input接口及各種輸入類型的Input和Harvester實現

  • Input:對應配置中的一個Input項,同個Input下可包含多個輸入源(比如文件)
  • Harvester:每個輸入源對應一個Harvester,負責實際收集數據、並發送事件到pipeline

/harvester

包含Harvester接口定義、Reader接口及實現等

  • forwarder.go: Forwarder結構(包含outlet)定義,用於轉發事件
  • harvester.go: Harvester接口定義,具體實現則在/input目錄下
  • registry.go: Registry結構,用於在Input中管理多個Harvester(輸入源)的啟動和停止
  • source.go: Source接口定義,表示輸入源。目前僅有Pipe一種實現(包含os.File),用在log、stdin和docker輸入類型中。btw,這三種輸入類型都是用的log input的實現。
  • /reader目錄: Reader接口定義和各種Reader實現

重要數據結構

beats通用事件結構(libbeat/beat/event.go):

type Event struct {
	Timestamp time.Time // 收集日志時記錄的時間戳,對應es文檔中的@timestamp字段 Meta common.MapStr // meta信息,outpus可選的將其作為事件字段輸出。比如輸出為es且指定了pipeline時,其pipeline id就被包含在此字段中 Fields common.MapStr // 默認輸出字段定義在field.yml,其他字段可以在通過fields配置項指定 Private interface{} // for beats private use }

Crawler(filebeat/crawler/crawler.go):

// Crawler 負責抓取日志並發送到libbeat pipeline
type Crawler struct { inputs map[uint64]*input.Runner // 包含所有輸入的runner inputConfigs []*common.Config out channel.Factory wg sync.WaitGroup InputsFactory cfgfile.RunnerFactory ModulesFactory cfgfile.RunnerFactory modulesReloader *cfgfile.Reloader inputReloader *cfgfile.Reloader once bool beatVersion string beatDone chan struct{} }

log類型Input(filebeat/input/log/input.go)

// Input contains the input and its config
type Input struct { cfg *common.Config config config states *file.States harvesters *harvester.Registry // 包含Input所有Harvester outlet channel.Outleter // Input共享的Publisher client stateOutlet channel.Outleter done chan struct{} numHarvesters atomic.Uint32 meta map[string]string }

log類型Harvester(filebeat/input/log/harvester.go):

type Harvester struct {
	id     uuid.UUID config config source harvester.Source // the source being watched // shutdown handling done chan struct{} stopOnce sync.Once stopWg *sync.WaitGroup stopLock sync.Mutex // internal harvester state state file.State states *file.States log *Log // file reader pipeline reader reader.Reader encodingFactory encoding.EncodingFactory encoding encoding.Encoding // event/state publishing outletFactory OutletFactory publishState func(*util.Data) bool onTerminate func() }

Registrar(filebeat/registrar/registrar.go):

type Registrar struct {
	Channel      chan []file.State out successLogger done chan struct{} registryFile string // Path to the Registry File fileMode os.FileMode // Permissions to apply on the Registry File wg sync.WaitGroup states *file.States // Map with all file paths inside and the corresponding state gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write gcEnabled bool // gcEnabled indictes the registry contains some state that can be gc'ed in the future flushTimeout time.Duration bufferedStateUpdates int }

libbeat Pipeline(libbeat/publisher/pipeline/pipeline.go)

type Pipeline struct {
	beatInfo beat.Info logger *logp.Logger queue queue.Queue output *outputController observer observer eventer pipelineEventer // wait close support waitCloseMode WaitCloseMode waitCloseTimeout time.Duration waitCloser *waitCloser // pipeline ack ackMode pipelineACKMode ackActive atomic.Bool ackDone chan struct{} ackBuilder ackBuilder // pipelineEventsACK eventSema *sema processors pipelineProcessors }

執行邏輯

filebeat啟動

filebeat啟動流程如下圖所示:

filebeat啟動流程

1. 執行root命令

filebeat/main.go文件中,main函數調用了cmd.RootCmd.Execute(),而RootCmd則是在cmd/root.go中被init函數初始化,其中就注冊了filebeat.go:New函數以創建實現了beater接口的filebeat實例

對於任意一個beats來說,都需要有:1) 實現Beater接口的具體Beater(如Filebeat); 2) 創建該具體Beater的(New)函數[4]

beater接口定義(beat/beat.go):

type Beater interface { // The main event loop. This method should block until signalled to stop by an // invocation of the Stop() method. Run(b *Beat) error // Stop is invoked to signal that the Run method should finish its execution. // It will be invoked at most once. Stop() }

2. 初始化和運行Filebeat

  • 創建libbeat/cmd/instance/beat.go:Beat結構
  • 執行(*Beat).launch方法
    • (*Beat).Init() 初始化Beat:加載beats公共config
    • (*Beat).createBeater
      • registerTemplateLoading: 當輸出為es時,注冊加載es模板的回調函數
      • pipeline.Load: 創建Pipeline:包含隊列、事件處理器、輸出等
      • setupMetrics: 安裝監控
      • filebeat.New: 解析配置(其中輸入配置包括配置文件中的Input和module Input)等
    • loadDashboards 加載kibana dashboard
    • (*Filebeat).Run: 運行filebeat

3. Filebeat運行

  • 設置加載es pipeline的回調函數
  • 初始化registrar和crawler
  • 設置事件完成的回調函數
  • 啟動Registrar、啟動Crawler、啟動Autodiscover
  • 等待filebeat運行結束

日志收集

從收集日志、到發送事件到publisher,其數據流如下圖所示:

日志收集數據流
  • Crawler根據Input配置創建並啟動具體Input對象

以log類型為例

  • Log input對象創建時會從registry讀取文件狀態(主要是offset),然后為input配置中的文件路徑創建harvester並運行
    • harvester啟動時會通過Setup方法創建一系列reader形成讀處理鏈
  • harvester從registry記錄的文件位置開始讀取,組裝成事件(beat.Event)后發給Publisher

reader

關於log類型的reader處理鏈,如下圖所示:

reader處理鏈

opt表示根據配置決定是否創建該reader

Reader包括:

  • Line: 包含os.File,用於從指定offset開始讀取日志行。雖然位於處理鏈的最內部,但其Next函數中實際的處理邏輯(讀文件行)卻是最新被執行的。
  • Encode: 包含Line Reader,將其讀取到的行生成Message結構后返回
  • JSON, DockerJSON: 將json形式的日志內容decode成字段
  • StripNewLine:去除日志行尾部的空白符
  • Multiline: 用於讀取多行日志
  • Limit: 限制單行日志字節數

除了Line Reader外,這些reader都實現了Reader接口:

type Reader interface { Next() (Message, error) }

Reader通過內部包含Reader對象的方式,使Reader形成一個處理鏈,其實這就是設計模式中的責任鏈模式。

各Reader的Next方法的通用形式像是這樣:Next方法調用內部Reader對象的Next方法獲取Message,然后處理后返回。

func (r *SomeReader) Next() (Message, error) { message, err := r.reader.Next() if err != nil { return message, err } // do some processing... return message, nil }

事件處理和隊列

在Crawler收集日志並轉換成事件后,其就會通過調用Publisher對應client的Publish接口將事件送到Publisher,后續的處理流程也都將由libbeat完成,事件的流轉如下圖所示:

事件處理、進入隊列及輸出過程

事件處理器processor

在harvester調用client.Publish接口時,其內部會使用配置中定義的processors對事件進行處理,然后才將事件發送到Publisher隊列。

通過官方文檔了解到,processor包含兩種:在Input內定義作為局部(Input獨享)的processor,其只對該Input產生的事件生效;在頂層配置中定義作為全局processor,其對全部事件生效。

其對應的代碼實現方式是: filebeat在使用libbeat pipeline的ConnectWith接口創建client時(factory.go(*OutletFactory)Create函數),會將Input內部的定義processor作為參數傳遞給ConnectWith接口。而在ConnectWith實現中,會將參數中的processor和全局processor(在創建pipeline時生成)合並。從這里讀者也可以發現,實際上每個Input都獨享一個client,其包含一些Input自身的配置定義邏輯。

任一Processor都實現了Processor接口:Run函數包含處理邏輯,String返回Processor名。

type Processor interface { Run(event *beat.Event) (*beat.Event, error) String() string }

關於支持的processors及其使用,讀者可以參考官方文檔Filter and enhance the exported data這一小節

隊列queue

在事件經過處理器處理后,下一步將被發往Publisher的隊列。在client.go(*client) publish方法中我們可以看到,事件是通過調用c.producer.Publish(pubEvent)被實際發送的,而producer則通過具體Queue的Producer方法生成。

隊列對象被包含在pipeline.go:Pipeline結構中,其接口的定義如下:

type Queue interface { io.Closer BufferConfig() BufferConfig Producer(cfg ProducerConfig) Producer Consumer() Consumer }

主要的,Producer方法生成Producer對象,用於向隊列中push事件;Consumer方法生成Consumer對象,用於從隊列中取出事件。ProducerConsumer接口定義如下:

type Producer interface { Publish(event publisher.Event) bool TryPublish(event publisher.Event) bool Cancel() int } type Consumer interface { Get(sz int) (Batch, error) Close() error }

在配置中沒有指定隊列配置時,默認使用了memqueue作為隊列實現,下面我們來看看memqueue及其對應producer和consumer定義:

Broker結構(memqueue在代碼中實際對應的結構名是Broker):

type Broker struct {
	done chan struct{} logger logger bufSize int // buf brokerBuffer // minEvents int // idleTimeout time.Duration // api channels events chan pushRequest requests chan getRequest pubCancel chan producerCancelRequest // internal channels acks chan int scheduledACKs chan chanList eventer queue.Eventer // wait group for worker shutdown wg sync.WaitGroup waitOnClose bool }

根據是否需要ack分為forgetfullProducer和ackProducer兩種producer:

type forgetfullProducer struct {
	broker    *Broker openState openState } type ackProducer struct { broker *Broker cancel bool seq uint32 state produceState openState openState }

consumer結構:

type consumer struct {
	broker *Broker resp chan getResponse done chan struct{} closed atomic.Bool }

三者的運作方式如下圖所示:

queue、producer、consumer關系
  • Producer通過PublishTryPublish事件放入Broker的隊列,即結構中的channel對象evetns
  • Broker的主事件循環EventLoop將(請求)事件從events channel取出,放入自身結構體對象ringBuffer中。
    • 主事件循環有兩種類型:1)直接(不帶buffer)事件循環結構directEventLoop:收到事件后盡可能快的轉發;2)帶buffer事件循環結構bufferingEventLoop:當buffer滿或刷新超時時轉發。具體使用哪一種取決於memqueue配置項flush.min_events,大於1時使用后者,否則使用前者。
  • eventConsumer調用Consumer的Get方法獲取事件:1)首先將獲取事件請求(包括請求事件數和用於存放其響應事件的channel resp)放入Broker的請求隊列requests中,等待主事件循環EventLoop處理后將事件放入resp;2)獲取resp的事件,組裝成batch結構后返回
  • eventConsumer將事件放入output對應隊列中

這部分關於事件在隊列中各種channel間的流轉,筆者認為是比較消耗性能的,但不清楚設計者這樣設計的考量是什么。 另外值得思考的是,在多個go routine使用隊列交互的場景下,libbeat中都使用了go語言channel作為其底層的隊列,它是否可以完全替代加鎖隊列的使用呢?

事件發送

在隊列消費者將事件放入output工作隊列后,事件將在pipeline/output.go:netClientWorkerrun()方法中被取出,然后使用具體output client將事件發送到指定輸出(比如:es、logstash等)。

其中,netClientWorker的數目取決於具體輸出client的數目(比如es作為輸出時,client數目為host數目),它們共享相同的output工作隊列。

此時如果發送失敗會發生什么呢? 在outputs/elasticsearch/client.go:ClientPublish方法可以看到:發送失敗會重試失敗的事件,直到全部事件都發送成功后才調用ACK確認。

ack機制和registrar記錄文件狀態

在事件發送成功后, 其ack的數據流如下圖所示:

registrar記錄文件狀態過程
  • 在事件發送成功后,其被放入pipeline_ack.go:pipelineEventsACK的事件隊列events
  • pipelineEventsACK在worker中將事件取出,調用 acker.go:(*eventACKer).ackEvents,將ack(文件狀態)放入registrar的隊列Channel中。此回調函數在filebeat.go:(*Filebeat)Run方法中通過Publisher.SetACKHandler設置。
  • 在Registrar的Run()方法中取出隊列中的文件狀態,刷新registry文件

通過ack機制和registrar模塊,filebeat實現了對已發送成功事件對應文件狀態的記錄,這使它即使在程序crash后重啟的情況下也能從之前的文件位置恢復並繼續處理,保證了日志數據(事件)被至少發送一次。

總結

至此,本篇文章關於filebeat源碼解析的內容已經結束。

從整體看,filebeat的代碼沒有包含復雜的算法邏輯或底層實現,但其整體代碼結構還是比較清晰的,即使對於不需要參考filebeat特性實現去開發自定義beats的讀者來說,仍屬於值得一讀的源碼。

參考

  1. filebeat官方文檔: https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-getting-started.html
  2. Creating a New Beat: https://www.elastic.co/guide/en/beats/devguide/6.5/newbeat-overview.html
  3. filebeat主頁: https://www.elastic.co/products/beats/filebeat
  4. The Beater Interface: https://www.elastic.co/guide/en/beats/devguide/current/beater-interface.html#beater-interface
  5. filebeat源碼分析: https://segmentfault.com/a/1190000006124064#articleHeader2


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM