kafka消費者基本操作


1.消費消息

消費者以pull的方式獲取消息,

每個消費者屬於某一個消費組,在創建時不指定消費者的groupId,則該消費者屬於默認消費組test-consumer-group ,在配置文件./consumer.properties中設置

同一消費組下的各個消費者在消費消息是是互斥的,也即是說,同一條消息,只能被同一個消費組下的某個消費者消費,但能被其它組的消費者消費

 

kafka-console-consumer.sh腳本模擬終端消費者消費消息

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-action --consumer-property group.id=old-consumer-test --consumer-property consumer.id=old-consumer-cl --from-beginning --delete-consumer-offsets

consumer-property 參數以鍵值對的形式指定消費者級別的配置。

from-beginning 設置消息起始位置開始消費。默認是從新位置 latest開始消費

delete-consumer-offsets 刪除在zookeeper中記錄已消費的偏移量

 

 

 

 

舊版消費者默認將消費偏移量保存到zookeeper中,可以通過offsets.storage=kafka設置

offsets.storage=kaka,則保存到kafka主題中,

offsets.storage=zookeeper, 則保存到zookeeper中

 

ids 記錄該消費組下正在運行的消費者列表

owners  記錄該消費組消費的主題列表

offsets  記錄該消費組下每個消費者所消費主題的各個分區的偏移量

 

 

2.新版本消費者

新版本消費者去掉了對zookeeper的依賴,當啟動一個消費者時不再向zookeeper注冊,而是

由消費組協調器統一管理,已消費的消息偏移量提交后會保存到名為“__consumer_offsets”

內部主題中

啟動一個新的消費者

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --new-consumer --consumer-property group.id=new-consumer-test --consumer-property client.id=new-consumer-cl --topic kafka-action

 老版本啟動用的參數是zookeeper ,新版本的參數是bootstrap-server

 

通過命令查看消費組名稱信息

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM