kafka 生產者基本操作


kafka自帶了一個在終端演示生產者發布消息的腳本--kafka-console-producer.sh

運行該腳本會啟動一個進程,在運行該腳本時可以傳遞相應配置以覆蓋默認配置。

參數--

  -- producer.config,用於加載一個生產者級別相關的配置文件

  -- producer-property 直接在啟動命令中設置參數,覆蓋默認配置的參數

  -- property 通過該命令可以設置消費者相關的配置

 

1. 啟動生產者

必傳參數 broker-list(指定kafka的代理地址列表)  和  topic(消息被發送的目標主題)

執行下列命令,啟動一個向主題 kafka-action 發送消息的生產者,同時指定每條消息包含有key

./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-action --property parse.key=true

 由於以上命令沒有指定消息key與消息凈荷(payload)之間的分隔符,默認為制表符,如果希望修改分隔符

通過key.separator指定。

./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-action --property parse.key=true --property key.separator=' '

 然后在控制台輸入一批消息,輸入一下命令驗證是否發送成功

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kafka-action --time -1

輸出如下

kafka-action:2:4
kafka-action:1:4
kafka-action:0:0

 分別表示:主題名,分區,消息偏移量,共8條消息

 

開啟一個消費者:

[root@haha ~]# kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic
kafka-action
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. aaaabbb bbbbb 
98230841023
123123
3333

 沒法送一條會依次存儲在 三個不同的分區  

例如:分別發送了 1111 2222  33333  444 555 6666 六條消息

那么

partition-1 log日志文件存儲的是  1111  444 

partition-2 log日志文件存儲的是 2222 555

partition-3 log日志文件存儲的是 33333 6666

 

 

 

2.創建主題,如果開啟了 auto.create.topics.enable=true,當生產者向一個不存在的主題發送消息時,kafka會自動創建主題,

./kafka-console-producer.sh --broker-list localhost:9092 --topic producer-create-topic
Producer sends message to a topic that doesn't exist yet
[2018-10-25 09:56:16,888] WARN Error while fetching metadata with correlation id 1 : {producer-create-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

 producer-create-topic 並不存在,控制台拋出警告

 

3. 查看消息內容,使用kafka.tools.DumpLogSegments工具

[root@haha ~]# kafka-run-class.sh kafka.tools.DumpLogSegments --files /data/appData/kafka/test3-0/00000000000000000000.log
Dumping /data/appData/kafka/test3-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 CreateTime: 1541061414817 isvalid: true payloadsize: 13 magic: 1 compresscodec: NONE crc: 686614980
offset: 1 position: 47 CreateTime: 1541061439270 isvalid: true payloadsize: 0 magic: 1 compresscodec: NONE crc: 988414825
offset: 2 position: 81 CreateTime: 1541061441260 isvalid: true payloadsize: 11 magic: 1 compresscodec: NONE crc: 2421154020
offset: 3 position: 126 CreateTime: 1541061575221 isvalid: true payloadsize: 6 magic: 1 compresscodec: NONE crc: 905626548
offset: 4 position: 166 CreateTime: 1541061703621 isvalid: true payloadsize: 4 magic: 1 compresscodec: NONE crc: 2986172870
offset: 5 position: 204 CreateTime: 1541061741306 isvalid: true payloadsize: 3 magic: 1 compresscodec: NONE crc: 2132028322
offset: 6 position: 241 CreateTime: 1541061762979 isvalid: true payloadsize: 16 magic: 1 compresscodec: NONE crc: 1956799962

 

 

4.生產者測試工具

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM