kafka的生產與消費
在生產前需要 .需要創建一個topic,和消費的的groupid
比如可以在kafka管理系統中創建,不需要手動敲命令創建
1.創建topic和綁定消費組
2.kafka的生產
import json from kafka import KafkaProducer topic_name="active_user_simplified" #生產的topic kafka_addr = '172.17.9.151:9092,172.17.9.157:9092,172.17.9.155:9092' def create_kafaka(): """ 寫入kafka數據,useractive數據,如果用戶活躍,useractiveUpdata服務就會消費 :return: """ producer = KafkaProducer(bootstrap_servers=kafka_addr, value_serializer=lambda m: json.dumps(m).encode()) for i in range(1): data = { "app_key": "07b6ed26c8bcce540204c8f7", "uid": 8004540429, "platform": "W", "type": "active_terminate", "itime": 1608977776 } result=producer.send(topic_name, data) print(result) create_kafaka()
3.kafka的消費
from kafka import KafkaConsumer topic_name="active_user_simplified" kafka_addr = '172.17.9.151:9092,172.17.9.157:9092,172.17.9.155:9092' def consumer_kafaka(): group_id = "group-segment-useractiveUpdate" consumer = KafkaConsumer(topic_name, bootstrap_servers=kafka_addr, group_id=group_id, auto_offset_reset='earliest') for msg in consumer: print(msg.value) consumer_kafaka()
消費數據的日志