python3操作Kafka


# -- coding: UTF-8
import datetime
import json
import time
from kafka import KafkaProducer

producer=KafkaProducer(bootstrap_servers='192.168.10.10:9092')
for i in range(111):
    future = producer.send('test', json.dumps(
        {"method": "get", "step": i, "type": "test", "testName": "kafka",
         "cid": "{0}".format(datetime.datetime.now().strftime('%Y%m%d%H%M%S')),
         "info": "demo{}".format(1)}).encode())
    record_metadata = future.get(timeout=10)
    print(record_metadata, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    time.sleep(3)
from kafka import KafkaConsumer

consumer = KafkaConsumer('test', bootstrap_servers=['192.168.10.10:9092'], auto_offset_reset='earliest')

for message in consumer:
    print(message)
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                         message.offset, message.key,
                                         message.value))

  


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM