Logstash——利用Kafka Group實現高可用


 

日志架構

所有日志由Rsyslog或者Filebeat收集,然后傳輸給Kafka,Logstash作為Consumer消費Kafka里邊的數據,分別寫入Elasticsearch和Hadoop,最后使用Kibana輸出到web端供相關人員查看,或者是由Spark接手進入更深層次的分析。

在以上整個架構中,核心的幾個組件Kafka、Elasticsearch、Hadoop天生支持高可用,唯獨Logstash是不支持的,用單個Logstash去處理日志,不僅存在處理瓶頸更重要的是在整個系統中存在單點的問題

如果Logstash宕機則將會導致整個集群的不可用,后果可想而知。

如何解決Logstash的單點問題呢?我們可以借助Kafka的Consumer Group來實現。

 

Kafka Consumer Group

 

Consumer Group: 是個邏輯上的概念,為一組consumer的集合,同一個topic的數據會廣播給不同的group,同一個group中只有一個consumer能拿到這個數據。

也就是說對於同一個topic,每個group都可以拿到同樣的所有數據,但是數據進入group后只能被其中的一個consumer消費

基於這一點我們只需要啟動多個logstsh,並將這些logstash分配在同一個組里邊就可以實現logstash的高可用了。

 

配置

input {
        kafka {
                bootstrap_servers => "172.x.x.91:9092,172.x.x.92:9092,172.x.x.93:9092"    #kafka集群地址
                group_id => "groupLog"                                  #logstash集群消費kafka集群的身份標識,必須集群相同且唯一
                topics => ["logstash-log"]                              #要消費的kafka主題,logstash集群相同
                consumer_threads => 6                                    #消費線程數,集群中所有logstash相加最好等於 topic 分區數
                auto_offset_reset => "latest"
                decorate_events => true
                type => "app_log"
                codec => json
        }
}

以上為logstash消費kafka集群的配置,其中加入了group_id參數,group_id是一個的字符串,唯一標識一個group,具有相同group_id的consumer構成了一個consumer group,

這樣啟動多個logstash進程,只需要保證group_id一致就能達到logstash高可用的目的,一個logstash掛掉同一Group內的logstash可以繼續消費。

除了高可用外同一Group內的多個Logstash可以同時消費kafka內topic的數據,從而提高logstash的處理能力,但需要注意的是消費kafka數據時,

每個consumer最多只能使用一個partition,當一個Group內consumer的數量大於partition的數量時,只有等於partition個數的consumer能同時消費,其他的consumer處於等待狀態。

例如一個topic下有3個partition,那么在一個有5個consumer的group中只有3個consumer在同時消費topic的數據,而另外兩個consumer處於等待狀態,

所以想要增加logstash的消費性能,可以適當的增加topic的partition數量,但kafka中partition數量過多也會導致kafka集群故障恢復時間過長,

消耗更多的文件句柄與客戶端內存等問題,也並不是partition配置越多越好,需要在使用中找到一個平衡。

 

配置說明

1、codec (反序列化JSON)

es是按照json格式存儲數據的,上面的例子中,我們輸入到kafka的數據是json格式的,但是經Logstash寫入到es之后,整條數據變成一個字符串存儲到message字段里面了。

如果我們想要保持原來的json格式寫入到es,只需要在input里面再加一條配置項:codec => "json".

 

2、consumer_threads(並行傳輸)

Logstash的input讀取數的時候可以多線程並行讀取,logstash-input-kafka插件中對應的配置項是consumer_threads,默認值為1。一般這個默認值不是最佳選擇,那這個值該配置多少呢?這個需要對kafka的模型有一定了解:

  • kafka的topic是分區的,數據存儲在每個分區內;
  • kafka的consumer是分組的,任何一個consumer屬於某一個組,一個組可以包含多個consumer,同一個組內的consumer不會重復消費的同一份數據。

所以,對於kafka的consumer,一般最佳配置是同一個組內consumer個數(或線程數)等於topic的分區數,這樣consumer就會均分topic的分區,達到比較好的均衡效果。

舉個例子,比如一個topic有n個分區,consumer有m個線程。那最佳場景就是n=m,此時一個線程消費一個分區。如果n小於m,即線程數多於分區數,那多出來的線程就會空閑。

如果n大於m,那就會存在一些線程同時消費多個分區的數據,造成線程間負載不均衡。

所以,一般consumer_threads配置為你消費的topic的所包含的partition個數即可。如果有多個Logstash實例,那就讓實例個數 * consumer_threads等於分區數即可。

沒有配置consumer_threads,使用默認值1,可以在Logstash中看到如下日志:

[2019-09-19T22:54:48,207][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [nyc-test-1, nyc-test-0]

因為只有一個consumer,所以兩個分區都分給了它。這次我們將consumer_threads設置成了2,看下效果:

[2019-09-19T23:23:52,981][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [nyc-test-0]
[2019-09-19T23:23:52,982][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-1, groupId=logstash] Setting newly assigned partitions [nyc-test-1]

有兩個線程,即兩個consumer,所以各分到一個partition。

 

3、如何避免重復數據

有些業務場景可能不能忍受重復數據,有一些配置項可以幫我們在一定程度上解決問題。這里需要先梳理一下可能造成重復數據的場景:

  • 數據產生的時候就有重復,業務想對重復數據去重(注意是去重,不是merge)。
  • 數據寫入到Kafka時沒有重復,但后續流程可能因為網絡抖動、傳輸失敗等導致重試造成數據重復。

對於第1種場景,只要原始數據中有唯一字段就可以去重;對於第2種場景,不需要依賴業務數據就可以去重。去重的原理也很簡單,利用es document id即可。

對於es,如果寫入數據時沒有指定document id,就會隨機生成一個uuid,如果指定了,就使用指定的值。對於需要去重的場景,我們指定document id即可。

在output elasticsearch中可以通過document_id字段指定document id。對於場景1非常簡單,指定業務中的惟一字段為document id即可。主要看下場景2。

對於場景2,我們需要構造出一個“uuid”能惟一標識kafka中的一條數據,這個也非常簡單:<topic>+<partition>+<offset>,這三個值的組合就可以惟一標識kafka集群中的一條數據。

input kafka插件也已經幫我們把消息對應的元數據信息記錄到了@metadata(Logstash的元數據字段,不會輸出到output里面去)字段里面:

  • [@metadata][kafka][topic]:索引信息
  • [@metadata][kafka][consumer_group]:消費者組信息
  • [@metadata][kafka][partition]:分區信息
  • [@metadata][kafka][offset]:offset信息
  • [@metadata][kafka][key]:消息的key(如果有的話)
  • [@metadata][kafka][timestamp]:時間戳信息(消息創建的時間或者broker收到的時間)

所以,就可以這樣配置document id了:

document_id => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}"

當然,如果每條kafka消息都有一個唯一的uuid的話,也可以在寫入kafka的時候,將其寫為key,然后這里就可以使用[@metadata][kafka][key]作為document id了。

最后一定要注意,只有當decorate_events選項配置為true的時候,上面的@metadata才會記錄那些元數據,否則不會記錄。而該配置項的默認值是false,即不記錄。

 

4、auto_offset_reset

Kafka中沒有初始偏移量或偏移量超出范圍時該怎么辦:

  • earliest:將偏移量自動重置為最早的偏移量
  • latest:自動將偏移量重置為最新偏移量
  • none:如果未找到消費者組的先前偏移量,則向消費者拋出異常

 

引用:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM