安裝包 pykafka
代碼如下:
from pykafka import KafkaClient client = KafkaClient(hosts="test43:9092") print(client.topics) topic = client.topics[b'rokid'] #topic名稱 consumer = topic.get_simple_consumer() for record in consumer: if record is not None: valuestr = record.value.decode() #從bytes轉為string類型 valuedict = eval(valuestr) message = valuedict["message"] fields = message.split("\u0001") for field in fields: kv = field.split("\u0002") if len(kv) == 2: print(kv[0],'----',kv[1]) print('-'*100)
以上僅供開發測試使用,真正發布到線上需要多地方加固。。。
mark