背景
在基於elk的日志系統中,filebeat幾乎是其中必不可少的一個組件,例外是使用性能較差的logstash file input插件或自己造個功能類似的輪子:)。
在使用和了解filebeat的過程中,筆者對其一些功能上的實現產生了疑問,諸如:
- 為什么libbeat能如此容易的進行擴展,衍生出多個應用廣泛的beat運輸程序?
- 為什么它的性能比logstash好? (https://logz.io/blog/filebeat-vs-logstash/)
- 是如何實現‘保證至少發送一次’這個feature的呢?
- 代碼模塊是如何划分、如何組織、如何運行的呢?
- ...
為了找到答案,筆者閱讀了filebeat和部分libbeat的源碼(read the fucking source code),本文即是對此過程的一次總結。一方面是方便日后回顧,另一方面也希望能解答大家對filebeat的一些疑惑。
本文主要內容包括filebeat基本介紹、源碼解析兩個部分,主要面向的是:想要了解filebeat實現、想改造或擴展filebeat功能或想參考filebeat開發自定義beats
的讀者。
filebeat基本介紹
filebeat是一個開源的日志運輸程序,屬於beats家族中的一員,和其他beats一樣都基於libbeat庫實現。其中,libbeat是一個提供公共功能的庫,功能包括: 配置解析、日志打印、事件處理和發送等。
對於任一種beats來說,主要邏輯都包含兩個部分[2]
:
- 收集數據並轉換成事件
- 發送事件到指定的輸出
其中第二點已由libbeat實現,因此各個beats實際只需要關心如何收集數據並生成事件后發送給libbeat的Publisher。beats和libeat的交互如下圖所示:

具體到filebeat,它能采集數據的類型包括: log文件、標准輸入、redis、udp和tcp包、容器日志和syslog,其中最常見的是使用log類型采集文件日志發送到Elasticsearch或Logstash。而后續的源碼解析,也主要基於這種使用場景。
基於libbeat實現的filebeat,主要擁有以下幾個特性[3]
:
- 在運輸日志內容方面它擁有健壯性:正常情況下,filebeat讀取並運輸日志行,但如果期間程序因某些原因被中斷了,它會記住中斷前已處理成功的讀取位置,在程序再次啟動時恢復。
- 可以解析多種常見的日志格式,簡化用戶操作: filebeta內置多個模塊(module):auditd、Apache、NGINX、System、MySQL等,它們將常見日志格式的收集、解析和可視化簡化成了一個單獨命令,模塊的實現方式:基於操作系統定義各個模塊對應日志的默認路徑、使用ingest node的pipeline解析特定的日志格式、結合kibana dashboard可視化解析后特定格式的日志。
- 支持容器應用的日志收集,並且能通過libbeat的autodiscover特性檢測新加入的容器並使用對應的模塊(module)或輸入
- 不會使pipeline超過負載:使用backpressure-sensitive 協議感知后端(比如logstash、elasticsesarch等)壓力,如果后端忙於處理數據,則降低讀日志的速度;一旦阻塞被解決,則恢復。
- 可以將運輸日志到elasticsearch或logstash中,在kibana進行可視化
filebeat源碼解析
模塊結構
下圖是filebeat及使用libbeat的一些主要模塊,為筆者根據源碼的理解所作。

1. filebeat主要模塊
- Crawler: 管理所有Input收集數據並發送事件到libbeat的Publisher
- Input: 對應可配置的一種輸入類型,每種類型都有具體的Input和Harvester實現。 配置項
- Harvester: 對應一個輸入源,是收集數據的實際工作者。配置中,一個具體的Input可以包含多個輸入源(Harvester)
- module: 簡化了一些常見程序日志(比如nginx日志)收集、解析、可視化(kibana dashboard)配置項
- fileset: module下具體的一種Input定義(比如nginx包括access和error log),包含:1)輸入配置;2)es ingest node pipeline定義;3)事件字段定義;4)示例kibana dashboard
- Registrar:用於在事件發送成功后記錄文件狀態
2. libbeat主要模塊
- Publisher:
- autodiscover:用於自動發現容器並將其作為輸入源
filebeat目錄組織
├── autodiscover # 包含filebeat的autodiscover適配器(adapter),當autodiscover發現新容器時創建對應類型的輸入
├── beater # 包含與libbeat庫交互相關的文件
├── channel # 包含filebeat輸出到pipeline相關的文件
├── config # 包含filebeat配置結構和解析函數
├── crawler # 包含Crawler結構和相關函數
├── fileset # 包含module和fileset相關的結構
├── harvester # 包含Harvester接口定義、Reader接口及實現等
├── input # 包含所有輸入類型的實現(比如: log, stdin, syslog) ├── inputsource # 在syslog輸入類型中用於讀取tcp或udp syslog ├── module # 包含各module和fileset配置 ├── modules.d # 包含各module對應的日志路徑配置文件,用於修改默認路徑 ├── processor # 用於從容器日志的事件字段source中提取容器id ├── prospector # 包含舊版本的輸入結構Prospector,現已被Input取代 ├── registrar # 包含Registrar結構和方法 └── util # 包含beat事件和文件狀態的通用結構Data └── ...
除了以上目錄注釋外,以下將介紹一些個人認為比較重要的文件的詳細內容,讀者可作為閱讀源碼時的一個參考。
/beater
包含與libbeat庫交互相關的文件:
- acker.go: 包含在libbeat設置的ack回調函數,事件成功發送后被調用
- channels.go: 包含在ack回調函數中被調用的記錄者(logger),包括:
- registrarLogger: 將已確認事件寫入registrar運行隊列
- finishedLogger: 統計已確認事件數量
- filebeat.go: 包含實現了beater接口的filebeat結構,接口函數包括:
- New:創建了filebeat實例
- Run:運行filebeat
- Stop: 停止filebeat運行
- signalwait.go:基於channel實現的等待函數,在filebeat中用於:
- 等待fileebat結束
- 等待確認事件被寫入registry文件
/channel
filebeat輸出(到pipeline)相關的文件
- factory.go: 包含OutletFactory,用於創建輸出器Outleter對象
- interface.go: 定義輸出接口Outleter
- outlet.go: 實現Outleter,封裝了libbeat的pipeline client,其在harvester中被調用用於將事件發送給pipeline
- util.go: 定義ack回調的參數結構data,包含beat事件和文件狀態
/input
包含Input接口及各種輸入類型的Input和Harvester實現
- Input:對應配置中的一個Input項,同個Input下可包含多個輸入源(比如文件)
- Harvester:每個輸入源對應一個Harvester,負責實際收集數據、並發送事件到pipeline
/harvester
包含Harvester接口定義、Reader接口及實現等
- forwarder.go: Forwarder結構(包含outlet)定義,用於轉發事件
- harvester.go: Harvester接口定義,具體實現則在/input目錄下
- registry.go: Registry結構,用於在Input中管理多個Harvester(輸入源)的啟動和停止
- source.go: Source接口定義,表示輸入源。目前僅有Pipe一種實現(包含os.File),用在log、stdin和docker輸入類型中。btw,這三種輸入類型都是用的log input的實現。
- /reader目錄: Reader接口定義和各種Reader實現
重要數據結構
beats通用事件結構(libbeat/beat/event.go
):
type Event struct {
Timestamp time.Time // 收集日志時記錄的時間戳,對應es文檔中的@timestamp字段 Meta common.MapStr // meta信息,outpus可選的將其作為事件字段輸出。比如輸出為es且指定了pipeline時,其pipeline id就被包含在此字段中 Fields common.MapStr // 默認輸出字段定義在field.yml,其他字段可以在通過fields配置項指定 Private interface{} // for beats private use }
Crawler(filebeat/crawler/crawler.go
):
// Crawler 負責抓取日志並發送到libbeat pipeline
type Crawler struct { inputs map[uint64]*input.Runner // 包含所有輸入的runner inputConfigs []*common.Config out channel.Factory wg sync.WaitGroup InputsFactory cfgfile.RunnerFactory ModulesFactory cfgfile.RunnerFactory modulesReloader *cfgfile.Reloader inputReloader *cfgfile.Reloader once bool beatVersion string beatDone chan struct{} }
log類型Input(filebeat/input/log/input.go
)
// Input contains the input and its config
type Input struct { cfg *common.Config config config states *file.States harvesters *harvester.Registry // 包含Input所有Harvester outlet channel.Outleter // Input共享的Publisher client stateOutlet channel.Outleter done chan struct{} numHarvesters atomic.Uint32 meta map[string]string }
log類型Harvester(filebeat/input/log/harvester.go
):
type Harvester struct {
id uuid.UUID config config source harvester.Source // the source being watched // shutdown handling done chan struct{} stopOnce sync.Once stopWg *sync.WaitGroup stopLock sync.Mutex // internal harvester state state file.State states *file.States log *Log // file reader pipeline reader reader.Reader encodingFactory encoding.EncodingFactory encoding encoding.Encoding // event/state publishing outletFactory OutletFactory publishState func(*util.Data) bool onTerminate func() }
Registrar(filebeat/registrar/registrar.go
):
type Registrar struct {
Channel chan []file.State out successLogger done chan struct{} registryFile string // Path to the Registry File fileMode os.FileMode // Permissions to apply on the Registry File wg sync.WaitGroup states *file.States // Map with all file paths inside and the corresponding state gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write gcEnabled bool // gcEnabled indictes the registry contains some state that can be gc'ed in the future flushTimeout time.Duration bufferedStateUpdates int }
libbeat Pipeline(libbeat/publisher/pipeline/pipeline.go
)
type Pipeline struct {
beatInfo beat.Info logger *logp.Logger queue queue.Queue output *outputController observer observer eventer pipelineEventer // wait close support waitCloseMode WaitCloseMode waitCloseTimeout time.Duration waitCloser *waitCloser // pipeline ack ackMode pipelineACKMode ackActive atomic.Bool ackDone chan struct{} ackBuilder ackBuilder // pipelineEventsACK eventSema *sema processors pipelineProcessors }
執行邏輯
filebeat啟動
filebeat啟動流程如下圖所示:

1. 執行root命令
在filebeat/main.go
文件中,main
函數調用了cmd.RootCmd.Execute()
,而RootCmd
則是在cmd/root.go
中被init
函數初始化,其中就注冊了filebeat.go:New
函數以創建實現了beater
接口的filebeat實例
對於任意一個beats來說,都需要有:1) 實現Beater
接口的具體Beater(如Filebeat); 2) 創建該具體Beater的(New)函數[4]
。
beater接口定義(beat/beat.go):
type Beater interface { // The main event loop. This method should block until signalled to stop by an // invocation of the Stop() method. Run(b *Beat) error // Stop is invoked to signal that the Run method should finish its execution. // It will be invoked at most once. Stop() }
2. 初始化和運行Filebeat
- 創建
libbeat/cmd/instance/beat.go:Beat
結構 - 執行
(*Beat).launch
方法(*Beat).Init()
初始化Beat:加載beats公共config(*Beat).createBeater
registerTemplateLoading
: 當輸出為es時,注冊加載es模板的回調函數pipeline.Load
: 創建Pipeline:包含隊列、事件處理器、輸出等setupMetrics
: 安裝監控filebeat.New
: 解析配置(其中輸入配置包括配置文件中的Input和module Input)等
loadDashboards
加載kibana dashboard(*Filebeat).Run
: 運行filebeat
3. Filebeat運行
- 設置加載es pipeline的回調函數
- 初始化registrar和crawler
- 設置事件完成的回調函數
- 啟動Registrar、啟動Crawler、啟動Autodiscover
- 等待filebeat運行結束
日志收集
從收集日志、到發送事件到publisher,其數據流如下圖所示:

- Crawler根據Input配置創建並啟動具體Input對象
以log類型為例
- Log input對象創建時會從registry讀取文件狀態(主要是offset),然后為input配置中的文件路徑創建harvester並運行
- harvester啟動時會通過
Setup
方法創建一系列reader形成讀處理鏈
- harvester啟動時會通過
- harvester從registry記錄的文件位置開始讀取,組裝成事件(beat.Event)后發給Publisher
reader
關於log類型的reader處理鏈,如下圖所示:

opt表示根據配置決定是否創建該reader
Reader包括:
- Line: 包含
os.File
,用於從指定offset開始讀取日志行。雖然位於處理鏈的最內部,但其Next函數中實際的處理邏輯(讀文件行)卻是最新被執行的。 - Encode: 包含Line Reader,將其讀取到的行生成Message結構后返回
- JSON, DockerJSON: 將json形式的日志內容decode成字段
- StripNewLine:去除日志行尾部的空白符
- Multiline: 用於讀取多行日志
- Limit: 限制單行日志字節數
除了Line Reader外,這些reader都實現了Reader
接口:
type Reader interface { Next() (Message, error) }
Reader通過內部包含Reader
對象的方式,使Reader形成一個處理鏈,其實這就是設計模式中的責任鏈模式。
各Reader的Next方法的通用形式像是這樣:Next
方法調用內部Reader
對象的Next
方法獲取Message
,然后處理后返回。
func (r *SomeReader) Next() (Message, error) { message, err := r.reader.Next() if err != nil { return message, err } // do some processing... return message, nil }
事件處理和隊列
在Crawler收集日志並轉換成事件后,其就會通過調用Publisher對應client的Publish接口將事件送到Publisher,后續的處理流程也都將由libbeat完成,事件的流轉如下圖所示:

事件處理器processor
在harvester調用client.Publish
接口時,其內部會使用配置中定義的processors對事件進行處理,然后才將事件發送到Publisher隊列。
通過官方文檔了解到,processor包含兩種:在Input內定義作為局部(Input獨享)的processor,其只對該Input產生的事件生效;在頂層配置中定義作為全局processor,其對全部事件生效。
其對應的代碼實現方式是: filebeat在使用libbeat pipeline的ConnectWith
接口創建client時(factory.go
中(*OutletFactory)Create
函數),會將Input內部的定義processor作為參數傳遞給ConnectWith
接口。而在ConnectWith
實現中,會將參數中的processor和全局processor(在創建pipeline時生成)合並。從這里讀者也可以發現,實際上每個Input都獨享一個client,其包含一些Input自身的配置定義邏輯。
任一Processor都實現了Processor接口:Run函數包含處理邏輯,String返回Processor名。
type Processor interface { Run(event *beat.Event) (*beat.Event, error) String() string }
關於支持的processors及其使用,讀者可以參考官方文檔Filter and enhance the exported data這一小節
隊列queue
在事件經過處理器處理后,下一步將被發往Publisher的隊列。在client.go
在(*client) publish
方法中我們可以看到,事件是通過調用c.producer.Publish(pubEvent)
被實際發送的,而producer則通過具體Queue的Producer
方法生成。
隊列對象被包含在pipeline.go:Pipeline
結構中,其接口的定義如下:
type Queue interface { io.Closer BufferConfig() BufferConfig Producer(cfg ProducerConfig) Producer Consumer() Consumer }
主要的,Producer方法生成Producer對象,用於向隊列中push事件;Consumer方法生成Consumer對象,用於從隊列中取出事件。Producer
和Consumer
接口定義如下:
type Producer interface { Publish(event publisher.Event) bool TryPublish(event publisher.Event) bool Cancel() int } type Consumer interface { Get(sz int) (Batch, error) Close() error }
在配置中沒有指定隊列配置時,默認使用了memqueue
作為隊列實現,下面我們來看看memqueue及其對應producer和consumer定義:
Broker結構(memqueue在代碼中實際對應的結構名是Broker):
type Broker struct {
done chan struct{} logger logger bufSize int // buf brokerBuffer // minEvents int // idleTimeout time.Duration // api channels events chan pushRequest requests chan getRequest pubCancel chan producerCancelRequest // internal channels acks chan int scheduledACKs chan chanList eventer queue.Eventer // wait group for worker shutdown wg sync.WaitGroup waitOnClose bool }
根據是否需要ack分為forgetfullProducer和ackProducer兩種producer:
type forgetfullProducer struct {
broker *Broker openState openState } type ackProducer struct { broker *Broker cancel bool seq uint32 state produceState openState openState }
consumer結構:
type consumer struct {
broker *Broker resp chan getResponse done chan struct{} closed atomic.Bool }
三者的運作方式如下圖所示:

Producer
通過Publish
或TryPublish
事件放入Broker
的隊列,即結構中的channel對象evetns
Broker
的主事件循環EventLoop將(請求)事件從events channel取出,放入自身結構體對象ringBuffer中。- 主事件循環有兩種類型:1)直接(不帶buffer)事件循環結構
directEventLoop
:收到事件后盡可能快的轉發;2)帶buffer事件循環結構bufferingEventLoop
:當buffer滿或刷新超時時轉發。具體使用哪一種取決於memqueue配置項flush.min_events,大於1時使用后者,否則使用前者。
- 主事件循環有兩種類型:1)直接(不帶buffer)事件循環結構
eventConsumer
調用Consumer的Get
方法獲取事件:1)首先將獲取事件請求(包括請求事件數和用於存放其響應事件的channelresp
)放入Broker的請求隊列requests中,等待主事件循環EventLoop處理后將事件放入resp;2)獲取resp的事件,組裝成batch結構后返回eventConsumer
將事件放入output對應隊列中
這部分關於事件在隊列中各種channel間的流轉,筆者認為是比較消耗性能的,但不清楚設計者這樣設計的考量是什么。 另外值得思考的是,在多個go routine使用隊列交互的場景下,libbeat中都使用了go語言channel作為其底層的隊列,它是否可以完全替代加鎖隊列的使用呢?
事件發送
在隊列消費者將事件放入output工作隊列后,事件將在pipeline/output.go:netClientWorker
的run()
方法中被取出,然后使用具體output client將事件發送到指定輸出(比如:es、logstash等)。
其中,netClientWorker
的數目取決於具體輸出client的數目(比如es作為輸出時,client數目為host數目),它們共享相同的output工作隊列。
此時如果發送失敗會發生什么呢? 在outputs/elasticsearch/client.go:Client
的Publish
方法可以看到:發送失敗會重試失敗的事件,直到全部事件都發送成功后才調用ACK確認。
ack機制和registrar記錄文件狀態
在事件發送成功后, 其ack的數據流如下圖所示:

- 在事件發送成功后,其被放入
pipeline_ack.go:pipelineEventsACK
的事件隊列events
中 pipelineEventsACK
在worker中將事件取出,調用acker.go:(*eventACKer).ackEvents
,將ack(文件狀態)放入registrar的隊列Channel中。此回調函數在filebeat.go:(*Filebeat)Run
方法中通過Publisher.SetACKHandler
設置。- 在Registrar的
Run()
方法中取出隊列中的文件狀態,刷新registry文件
通過ack機制和registrar模塊,filebeat實現了對已發送成功事件對應文件狀態的記錄,這使它即使在程序crash后重啟的情況下也能從之前的文件位置恢復並繼續處理,保證了日志數據(事件)被至少發送一次。
總結
至此,本篇文章關於filebeat源碼解析的內容已經結束。
從整體看,filebeat的代碼沒有包含復雜的算法邏輯或底層實現,但其整體代碼結構還是比較清晰的,即使對於不需要參考filebeat特性實現去開發自定義beats的讀者來說,仍屬於值得一讀的源碼。
參考
- filebeat官方文檔: https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-getting-started.html
- Creating a New Beat: https://www.elastic.co/guide/en/beats/devguide/6.5/newbeat-overview.html
- filebeat主頁: https://www.elastic.co/products/beats/filebeat
- The Beater Interface: https://www.elastic.co/guide/en/beats/devguide/current/beater-interface.html#beater-interface
- filebeat源碼分析: https://segmentfault.com/a/1190000006124064#articleHeader2