1.消費消息
消費者以pull的方式獲取消息,
每個消費者屬於某一個消費組,在創建時不指定消費者的groupId,則該消費者屬於默認消費組test-consumer-group ,在配置文件./consumer.properties中設置
同一消費組下的各個消費者在消費消息是是互斥的,也即是說,同一條消息,只能被同一個消費組下的某個消費者消費,但能被其它組的消費者消費
kafka-console-consumer.sh腳本模擬終端消費者消費消息
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-action --consumer-property group.id=old-consumer-test --consumer-property consumer.id=old-consumer-cl --from-beginning --delete-consumer-offsets
consumer-property 參數以鍵值對的形式指定消費者級別的配置。
from-beginning 設置消息起始位置開始消費。默認是從新位置 latest開始消費
delete-consumer-offsets 刪除在zookeeper中記錄已消費的偏移量
舊版消費者默認將消費偏移量保存到zookeeper中,可以通過offsets.storage=kafka設置
offsets.storage=kaka,則保存到kafka主題中,
offsets.storage=zookeeper, 則保存到zookeeper中
ids 記錄該消費組下正在運行的消費者列表
owners 記錄該消費組消費的主題列表
offsets 記錄該消費組下每個消費者所消費主題的各個分區的偏移量
2.新版本消費者
新版本消費者去掉了對zookeeper的依賴,當啟動一個消費者時不再向zookeeper注冊,而是
由消費組協調器統一管理,已消費的消息偏移量提交后會保存到名為“__consumer_offsets”
內部主題中
啟動一個新的消費者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --new-consumer --consumer-property group.id=new-consumer-test --consumer-property client.id=new-consumer-cl --topic kafka-action
老版本啟動用的參數是zookeeper ,新版本的參數是bootstrap-server
通過命令查看消費組名稱信息
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer