1.安裝包
pip3 install kafka-python
2.消費者
from kafka import KafkaConsumer import json consumer = KafkaConsumer('sink',group_id='test',bootstrap_servers=['192.168.186.174:9092']) for msg in consumer: recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print(recv) print("接收成功")
3.生產者
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='192.168.186.174:9092') # 連接kafka msg = "Hello World".encode('utf-8') # 發送內容,必須是bytes類型 producer.send('example', msg) # 發送的topic為example producer.close() print("發送成功")