日志架構
所有日志由Rsyslog或者Filebeat收集,然后傳輸給Kafka,Logstash作為Consumer消費Kafka里邊的數據,分別寫入Elasticsearch和Hadoop,最后使用Kibana輸出到web端供相關人員查看,或者是由Spark接手進入更深層次的分析。
在以上整個架構中,核心的幾個組件Kafka、Elasticsearch、Hadoop天生支持高可用,唯獨Logstash是不支持的,用單個Logstash去處理日志,不僅存在處理瓶頸更重要的是在整個系統中存在單點的問題,
如果Logstash宕機則將會導致整個集群的不可用,后果可想而知。
如何解決Logstash的單點問題呢?我們可以借助Kafka的Consumer Group來實現。
Kafka Consumer Group
Consumer Group: 是個邏輯上的概念,為一組consumer的集合,同一個topic的數據會廣播給不同的group,同一個group中只有一個consumer能拿到這個數據。
也就是說對於同一個topic,每個group都可以拿到同樣的所有數據,但是數據進入group后只能被其中的一個consumer消費,
基於這一點我們只需要啟動多個logstsh,並將這些logstash分配在同一個組里邊就可以實現logstash的高可用了。
配置
input { kafka { bootstrap_servers => "172.x.x.91:9092,172.x.x.92:9092,172.x.x.93:9092" #kafka集群地址 group_id => "groupLog" #logstash集群消費kafka集群的身份標識,必須集群相同且唯一 topics => ["logstash-log"] #要消費的kafka主題,logstash集群相同 consumer_threads => 6 #消費線程數,集群中所有logstash相加最好等於 topic 分區數 auto_offset_reset => "latest" decorate_events => true type => "app_log" codec => json } }
以上為logstash消費kafka集群的配置,其中加入了group_id
參數,group_id
是一個的字符串,唯一標識一個group,具有相同group_id
的consumer構成了一個consumer group,
這樣啟動多個logstash進程,只需要保證group_id
一致就能達到logstash高可用的目的,一個logstash掛掉同一Group內的logstash可以繼續消費。
除了高可用外同一Group內的多個Logstash可以同時消費kafka內topic的數據,從而提高logstash的處理能力,但需要注意的是消費kafka數據時,
每個consumer最多只能使用一個partition,當一個Group內consumer的數量大於partition的數量時,只有等於partition個數的consumer能同時消費,其他的consumer處於等待狀態。
例如一個topic下有3個partition,那么在一個有5個consumer的group中只有3個consumer在同時消費topic的數據,而另外兩個consumer處於等待狀態,
所以想要增加logstash的消費性能,可以適當的增加topic的partition數量,但kafka中partition數量過多也會導致kafka集群故障恢復時間過長,
消耗更多的文件句柄與客戶端內存等問題,也並不是partition配置越多越好,需要在使用中找到一個平衡。
配置說明
1、codec (反序列化JSON)
es是按照json格式存儲數據的,上面的例子中,我們輸入到kafka的數據是json格式的,但是經Logstash寫入到es之后,整條數據變成一個字符串存儲到message
字段里面了。
如果我們想要保持原來的json格式寫入到es,只需要在input里面再加一條配置項:codec => "json"
.
2、consumer_threads(並行傳輸)
Logstash的input讀取數的時候可以多線程並行讀取,logstash-input-kafka
插件中對應的配置項是consumer_threads
,默認值為1。一般這個默認值不是最佳選擇,那這個值該配置多少呢?這個需要對kafka的模型有一定了解:
- kafka的topic是分區的,數據存儲在每個分區內;
- kafka的consumer是分組的,任何一個consumer屬於某一個組,一個組可以包含多個consumer,同一個組內的consumer不會重復消費的同一份數據。
所以,對於kafka的consumer,一般最佳配置是同一個組內consumer個數(或線程數)等於topic的分區數,這樣consumer就會均分topic的分區,達到比較好的均衡效果。
舉個例子,比如一個topic有n個分區,consumer有m個線程。那最佳場景就是n=m,此時一個線程消費一個分區。如果n小於m,即線程數多於分區數,那多出來的線程就會空閑。
如果n大於m,那就會存在一些線程同時消費多個分區的數據,造成線程間負載不均衡。
所以,一般consumer_threads
配置為你消費的topic的所包含的partition個數即可。如果有多個Logstash實例,那就讓實例個數 * consumer_threads
等於分區數即可。
沒有配置consumer_threads
,使用默認值1,可以在Logstash中看到如下日志:
[2019-09-19T22:54:48,207][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [nyc-test-1, nyc-test-0]
因為只有一個consumer,所以兩個分區都分給了它。這次我們將consumer_threads
設置成了2,看下效果:
3、如何避免重復數據
有些業務場景可能不能忍受重復數據,有一些配置項可以幫我們在一定程度上解決問題。這里需要先梳理一下可能造成重復數據的場景:
- 數據產生的時候就有重復,業務想對重復數據去重(注意是去重,不是merge)。
- 數據寫入到Kafka時沒有重復,但后續流程可能因為網絡抖動、傳輸失敗等導致重試造成數據重復。
對於第1種場景,只要原始數據中有唯一字段就可以去重;對於第2種場景,不需要依賴業務數據就可以去重。去重的原理也很簡單,利用es document id即可。
對於es,如果寫入數據時沒有指定document id,就會隨機生成一個uuid,如果指定了,就使用指定的值。對於需要去重的場景,我們指定document id即可。
在output elasticsearch中可以通過document_id
字段指定document id。對於場景1非常簡單,指定業務中的惟一字段為document id即可。主要看下場景2。
對於場景2,我們需要構造出一個“uuid”能惟一標識kafka中的一條數據,這個也非常簡單:<topic>+<partition>+<offset>
,這三個值的組合就可以惟一標識kafka集群中的一條數據。
input kafka插件也已經幫我們把消息對應的元數據信息記錄到了@metadata
(Logstash的元數據字段,不會輸出到output里面去)字段里面:
[@metadata][kafka][topic]
:索引信息[@metadata][kafka][consumer_group]
:消費者組信息[@metadata][kafka][partition]
:分區信息[@metadata][kafka][offset]
:offset信息[@metadata][kafka][key]
:消息的key(如果有的話)[@metadata][kafka][timestamp]
:時間戳信息(消息創建的時間或者broker收到的時間)
所以,就可以這樣配置document id了:
當然,如果每條kafka消息都有一個唯一的uuid的話,也可以在寫入kafka的時候,將其寫為key,然后這里就可以使用[@metadata][kafka][key]
作為document id了。
最后一定要注意,只有當decorate_events
選項配置為true的時候,上面的@metadata才會記錄那些元數據,否則不會記錄。而該配置項的默認值是false,即不記錄。
4、auto_offset_reset
Kafka中沒有初始偏移量或偏移量超出范圍時該怎么辦:
- earliest:將偏移量自動重置為最早的偏移量
- latest:自動將偏移量重置為最新偏移量
- none:如果未找到消費者組的先前偏移量,則向消費者拋出異常
引用: