利用python對kafka進行消息生產和消息消費


轉載

原文地址:https://blog.csdn.net/weixin_41512727/article/details/89249668

消息生產代碼:

#coding:utf-8
#producer.py

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(
    bootstrap_servers = ["127.0.0.1:9092"]
)

future = producer.send("test", b"this is a python to kafka")
try:
    record = future.get(timeout=10)
    print(record)
except KafkaError as e:
    print(e)

消息消費代碼:

#coding:utf-8
#consumer

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "test",
    bootstrap_servers = [
        "127.0.0.1:9092"
    ]
)
for each in consumer:
    print("%s:%d:%d: key=%s value=%s"%(
        each.topic, each.partition,
        each.offset, each.key, each.value
    ))

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM