Python往kafka生產消費數據


安裝 kafka:  pip install kafka-python

生產數據

 1 from kafka import KafkaProducer
 2 import json
 3  
 4 '''
 5     生產者demo
 6     向test_lyl2主題中循環寫入10條json數據
 7     注意事項:要寫入json數據需加上value_serializer參數,如下代碼
 8 '''
 9 producer = KafkaProducer(
10                             value_serializer=lambda v: json.dumps(v).encode('utf-8'),
11                             bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667']
12                          )
13 for i in range(10):
14     data={
15         "name":"李四",
16         "age":23,
17         "gender":"",
18         "id":i
19     }
20     producer.send('test_lyl2', data)
21 producer.close()

消費數據

 1 from kafka import KafkaConsumer
 2 import json
 3  
 4 '''
 5     消費者demo
 6     消費test_lyl2主題中的數據
 7     注意事項:如需以json格式讀取數據需加上value_deserializer參數
 8 '''
 9  
10  
11 consumer = KafkaConsumer('test_lyl2',group_id="lyl-gid1",
12                          bootstrap_servers=['192.168.12.101:6667','192.168.12.102:6667','192.168.12.103:6667'],
13                          auto_offset_reset='earliest',value_deserializer=json.loads
14                          )
15 for message in consumer:
16     print(message.value)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM