我大約是把kafka消費不到數據的特殊情況都經歷了一遍了吧= =、
kafka消費不到數據的原因,首先檢查配置之類的,如是否設置了group.id,對應的topic是否正確等等,這些不多說。
下面是我遇到的幾種kafka消費不到數據的情況:
1.多分區,單例消費者的情況,只消費到一個分區,應多加幾個消費者,不能用單例,直接subscribe的話,rebalance機制啟動,手動的話如下
consumer.Assign(new List<TopicPartitionOffset>(){ new TopicPartitionOffset(new TopicPartition("topic", 1), Offset.Stored) });
2.長時間不消費導致 log.retention.hours或者 log.retention.minutes超時,清除log,Offset.Stored失效
解決辦法一:
consumer.Assign(new List<TopicPartitionOffset>(){ new TopicPartitionOffset(new TopicPartition("topic", 1), new Offset(index)) });
此處的index為該分區當前的offset,要自己做存儲然后手動配置,可測試用。
解決辦法二:見問題三,同樣解決方式 但是會從頭開始消費新進來的數據
3.我一次加數據太多導致磁盤耗盡,kafka管理員幫我改到20G內存,但是仍然有一部分數據超出,分區offset靠前的數據被清除,導致再次消費不到。清除掉的數據無法再次被消費,但是還保存的數據可以消費到
解決辦法:
consumer.Assign(new List<TopicPartitionOffset>(){ new TopicPartitionOffset(new TopicPartition("topic", 1), Offset.Beginning) });
或者在配置中加
auto.offset.reset=smallest //.NET 默認是largest auto.offset.reset=earliest//Java 默認是latest
關於該配置的測試,請看下面的鏈接
http://blog.csdn.net/lishuangzhe7047/article/details/74530417