使用python操作kafka


使用python操作kafka目前比較常用的庫是kafka-python庫

安裝kafka-python

pip3 install kafka-python

生產者

producer_test.py

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='192.168.0.121:9092')  # 連接kafka

msg = "Hello World".encode('utf-8')  # 發送內容,必須是bytes類型
producer.send('test', msg)  # 發送的topic為test
producer.close()

執行此程序,它沒有輸出!這個是正常的

消費者

from kafka import KafkaConsumer

consumer = KafkaConsumer('test', bootstrap_servers=['192.168.0.121:9092'])
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print(recv)

執行此程序,此時會hold住,因為它在等待生產者發送消息!

再次執行生產者,此時會輸出:

test:0:9: key=None value=b'Hello World'


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM