python之kafka消費


使用python3第三方工具,實現kafka消費

 1 # -*- coding: utf-8 -*-
 2 
 3 import uuid
 4 import json
 5 from kafka import KafkaConsumer
 6 from xxxxxx import MessageToDict
 7 from xxx import ObjectInfo
 8 
 9 import sys
10 import codecs
11 
12 sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())
13 
14 
15 class ReadKafkaContent(object):
16     @staticmethod
17     def deserialize(msg):
18         """
19         反序列化
20         :param msg:
21         :return:
22         """
23         pb_obj = ObjectInfo()
24         pb_obj.Clear()
25         pb_obj.ParseFromString(msg.value)
26         return MessageToDict(pb_obj, including_default_value_fields=True, preserving_proto_field_name=True)
27 
28     def consume_msg(self, consumer_obj):
29         """
30         逐條消費,返回反序列化后的內容
31         :param consumer_obj:
32         :return:
33         """
34         try:
35             while True:
36                 msg = next(consumer_obj, None)
37                 if not msg:
38                     continue
39                 content = self.deserialize(msg)
40                 return content
41         except Exception as ex:
42             print(u"消費kafka錯誤,退出測試")
43             return None
44 
45     def entry(self, topic, ip, count=10, log="log_read_kafka_content.json"):
46         """
47 
48         :param topic:topic
49         :param ip:ip
50         :param count:查詢kafka數據數量,默認10條
51         :param log:內容保存地址,默認
52         :return:
53         """
54         print(u"開始......")
55         try:
56             # 創建kafka消費對象
57             print(u"創建kafka消費對象...")
58             consumer = KafkaConsumer(topic, group_id=uuid.uuid4().hex,
59                                      bootstrap_servers=[ip],
60                                      auto_offset_reset="latest", consumer_timeout_ms=3 * 1000)
61         except Exception as ex:
62             print(u"連接kafka失敗!")
63             return False
64         print(u"kafka消費對象創建成功.")
65 
66         with open(log, "w") as f:
67             for i in range(count):
68                 print(u"開始消費第%s條數據..." % str(i + 1))
69                 content = self.consume_msg(consumer)
70                 if not content:
71                     return False
72 
73                 # dict轉json保存數據內容
74                 content_json = json.dumps(content, ensure_ascii=False, indent=4)
75                 f.write(content_json)
76                 f.write("\n\n")
77                 print(u"第%s條數據寫入完成." % str(i + 1))
78 
79         print(u"完成.")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM