kafka-console-consumer.sh 腳本是一個簡易的消費者控制台。該 shell 腳本的功能通過調用 kafka.tools 包下的 ConsoleConsumer
類,並將提供的命令行參數全部傳給該類實現。
1.手動插入數據
kafka-console-producer.sh --broker-list 192.168.104.91:9092,192.168.104.92:9092,192.168.104.93:9092 --topic capture_test
2. 消息消費
bin/kafka-console-consumer.sh --bootstrap-server 192.168.104.91:9092,192.168.104.92:9092,192.168.104.93:9092 --topic topicName bin/kafka-console-consumer.sh --zookeeper 192.168.104.91:2181,192.168.104.92:2181,192.168.104.93:2181 --topic topicName 表示從 latest 位移位置開始消費該主題的所有分區消息,即僅消費正在寫入的消息。
3.從開始位置消費
bin/kafka-console-consumer.sh --bootstrap-server 192.168.104.91:9092,192.168.104.92:9092,192.168.104.93:9092 --from-beginning --topic topicName bin/kafka-console-consumer.sh --zookeeper 192.168.104.91:2181,192.168.104.92:2181,192.168.104.93:2181 --from-beginning --topic topicName
4.顯示key消費
bin/kafka-console-consumer.sh --bootstrap-server 192.168.104.91:9092,192.168.104.92:9092,192.168.104.93:9092 --property print.key=true --topic topicName bin/kafka-console-consumer.sh --zookeeper 192.168.104.91:2181,192.168.104.92:2181,192.168.104.93:2181 --property print.key=true --topic topicName 消費出的消息結果將打印出消息體的 key 和 value。
參數 | 值類型 | 說明 | 有效值 |
---|---|---|---|
--topic | string | 被消費的topic | |
--whitelist | string | 正則表達式,指定要包含以供使用的主題的白名單 | |
--partition | integer | 指定分區 除非指定’–offset’,否則從分區結束(latest)開始消費 |
|
--offset | string | 執行消費的起始offset位置 默認值:latest |
latest earliest <offset> |
--consumer-property | string | 將用戶定義的屬性以key=value的形式傳遞給使用者 | |
--consumer.config | string | 消費者配置屬性文件 請注意,[consumer-property]優先於此配置 |
|
--formatter | string | 用於格式化kafka消息以供顯示的類的名稱 默認值:kafka.tools.DefaultMessageFormatter |
kafka.tools.DefaultMessageFormatter kafka.tools.LoggingMessageFormatter kafka.tools.NoOpMessageFormatter kafka.tools.ChecksumMessageFormatter |
--property | string | 初始化消息格式化程序的屬性 | print.timestamp=true|false print.key=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> key.deserializer=<key.deserializer> value.deserializer=<value.deserializer> |
--from-beginning | 從存在的最早消息開始,而不是從最新消息開始 | ||
--max-messages | integer | 消費的最大數據量,若不指定,則持續消費下去 | |
--timeout-ms | integer | 在指定時間間隔內沒有消息可用時退出 | |
--skip-message-on-error | 如果處理消息時出錯,請跳過它而不是暫停 | ||
--bootstrap-server | string | 必需(除非使用舊版本的消費者),要連接的服務器 | |
--key-deserializer | string | ||
--value-deserializer | string | ||
--enable-systest-events | 除記錄消費的消息外,還記錄消費者的生命周期 (用於系統測試) |
||
--isolation-level | string | 設置為read_committed以過濾掉未提交的事務性消息 設置為read_uncommitted以讀取所有消息 默認值:read_uncommitted |
|
--group | string | 指定消費者所屬組的ID | |
--blacklist | string | 要從消費中排除的主題黑名單 | |
--csv-reporter-enabled | 如果設置,將啟用csv metrics報告器 | ||
--delete-consumer-offsets | 如果指定,則啟動時刪除zookeeper中的消費者信息 | ||
--metrics-dir | string | 輸出csv度量值 需與[csv-reporter-enable]配合使用 |
|
--zookeeper | string | 必需(僅當使用舊的使用者時)連接zookeeper的字符串。 可以給出多個URL以允許故障轉移 |