利用python对kafka进行消息生产和消息消费


转载

原文地址:https://blog.csdn.net/weixin_41512727/article/details/89249668

消息生产代码:

#coding:utf-8
#producer.py

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(
    bootstrap_servers = ["127.0.0.1:9092"]
)

future = producer.send("test", b"this is a python to kafka")
try:
    record = future.get(timeout=10)
    print(record)
except KafkaError as e:
    print(e)

消息消费代码:

#coding:utf-8
#consumer

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "test",
    bootstrap_servers = [
        "127.0.0.1:9092"
    ]
)
for each in consumer:
    print("%s:%d:%d: key=%s value=%s"%(
        each.topic, each.partition,
        each.offset, each.key, each.value
    ))

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM