最近工作中需要一個日志收集系統,使用了kafka來實現。日志收集系統主要功能是,producer將接收到的logs存儲到kafka里,然后consumer從kafka里邊取數據進行消費處理。由於沒有接觸過kafka,在使用中遇到了一些問題,在此做個記錄。
問題 1. consumer有時候可以消費topic,有時候卻不能消費
基本情況:
- 配置兩個consumer group,這兩個consumer group用於消費同一個topic,但做不同的處理任務。每個consumer group中都只有一個consumer實例進行消費。
- 一個topic,此topic只配置了一個partition。
問題:
兩個consumer group都用來消費同一個topic,測試時發現,在有時候,consumer1(屬於consumer group1)能消費,consumer2(屬於consumer group2)卻不能消費。Topic有一個partition,兩個consumer group都只有一個consumer實例,每個consumer實例都應該能夠被分配到這唯一的partition進行消費,為什么會出現有的consumer不能消費的情況?
原因:
查看程序log發現,在存在問題的時候,consumer1分配到了partition,consumer2卻沒有分配到partition,從而導致只有consumer1有消費,consumer2沒有消費。為什么consumer2沒有分配到partition,這實在令人費解!
原來,之前在服務器上測試時已經運行了此程序,在本地主機進行測試時又運行了一個,即服務器和本地主機同時運行了此日志收集程序,雖然在此程序中每個consumer group只有一個consumer實例,但是服務器上的程序和本地主機上的程序各創建了一個consumer實例,這兩個consumer實例擁有相同的group ID,實質上導致了每個consumer group有兩個consumer實例。Topic只有一個partition,每個consumer group有兩個consumer實例,這兩個consumer實例只能有一個被分配到partition並消費。
在kafka中,每個consumer實例都有一個group ID用於標識它是屬於哪一個group,group ID相同的各個consumer實例都屬於同一個consumer group。本地主機上運行的程序創建的consumer2(屬於consumer group2)與服務器上運行的程序創建的consumer2擁有同一個group ID,因此屬於同一個group,而此topic只有一個partition,因此這兩個consumer只能有一個能夠分配到這個partition進行消費。暫時關閉服務器上的程序后,一切就恢復了正常。
問題 2. 兩台設備上只有一個上存在logs
基本情況:
- 一個topic,此topic配置了四個partition。
- 兩個consumer group,這兩個consumer group用於消費同一個topic,但做不同的處理任務。每個consumer group中都只有一個consumer實例進行消費。
- 兩台服務器,都運行此日志收集程序。
問題:
兩個consumer group用於消費同一個topic並做不同的處理,其中一個consumer group(稱作 group2)是將消費到的日志寫入服務器磁盤文件中。有兩台服務器都在運行此日志收集程序,每個服務器上的程序都創建了一個group2的consumer實例,此consumer實例會分配到兩個partition進行處理,因此每個服務器都只存儲了一部分日志文件。但是在測試時發現,所有日志都寫入了server1,server2上沒有日志,即便使用測試工具發送了大量數據,server2仍然沒有日志。
原因:
查看log發現,server1上的consumer實例分配的partition為partition_0 partition_1,server2上的consumer實例分配的partition為partition_3、partition_4,兩個server上的consumer實例都被分配了partition,partition分配正常,消費應該沒有問題。server2上沒有日志數據,說明沒有數據供其消費,也就是說,所有數據都被producer發送到了partition_1或partition_2上,這是生產的問題,應該是與生產者的分區路由有關,因此有必要了解下生產者的分區路由策略。
Kafka中的每個Topic分配了4個Partition,生產者(Producer)在將消息記錄(ProducerRecord)發送到某個Topic時是要選擇對應的Partition的,選擇Partition的策略如下:
- 判斷消息中的partition字段是否有值,有值的話就是指定了partition,直接將該消息發送到指定的partition就行。
- 如果沒有指定分區(partition),則使用分區器進行分區路由,首先判斷消息中是否指定了key。
- 如果指定了key,則使用該key進行hash操作,並轉為正數,然后將其對topic相應的分區數進行取余操作,得到一個分區。
- 如果沒有指定key,則在一個隨機數上以自增的方式產生一個數(第一次時生成隨機數,之后在其基礎上進行自增),轉為正數之后對分區數量進行取余操作,得到一個分區。
由於在程序中Producer發送記錄的時候指定了固定的key,根據這個key進行分區路由總是會選擇同一個分區,所有日志都被發送給了同一個分區,因此只有關聯這個分區的consumer實例才能消費,只有此consumer實例所在的server上才有日志。
參考鏈接:
https://blog.csdn.net/abinge317/article/details/84542073
