使用python3第三方工具,實現kafka消費
1 # -*- coding: utf-8 -*- 2 3 import uuid 4 import json 5 from kafka import KafkaConsumer 6 from xxxxxx import MessageToDict 7 from xxx import ObjectInfo 8 9 import sys 10 import codecs 11 12 sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach()) 13 14 15 class ReadKafkaContent(object): 16 @staticmethod 17 def deserialize(msg): 18 """ 19 反序列化 20 :param msg: 21 :return: 22 """ 23 pb_obj = ObjectInfo() 24 pb_obj.Clear() 25 pb_obj.ParseFromString(msg.value) 26 return MessageToDict(pb_obj, including_default_value_fields=True, preserving_proto_field_name=True) 27 28 def consume_msg(self, consumer_obj): 29 """ 30 逐條消費,返回反序列化后的內容 31 :param consumer_obj: 32 :return: 33 """ 34 try: 35 while True: 36 msg = next(consumer_obj, None) 37 if not msg: 38 continue 39 content = self.deserialize(msg) 40 return content 41 except Exception as ex: 42 print(u"消費kafka錯誤,退出測試") 43 return None 44 45 def entry(self, topic, ip, count=10, log="log_read_kafka_content.json"): 46 """ 47 48 :param topic:topic 49 :param ip:ip 50 :param count:查詢kafka數據數量,默認10條 51 :param log:內容保存地址,默認 52 :return: 53 """ 54 print(u"開始......") 55 try: 56 # 創建kafka消費對象 57 print(u"創建kafka消費對象...") 58 consumer = KafkaConsumer(topic, group_id=uuid.uuid4().hex, 59 bootstrap_servers=[ip], 60 auto_offset_reset="latest", consumer_timeout_ms=3 * 1000) 61 except Exception as ex: 62 print(u"連接kafka失敗!") 63 return False 64 print(u"kafka消費對象創建成功.") 65 66 with open(log, "w") as f: 67 for i in range(count): 68 print(u"開始消費第%s條數據..." % str(i + 1)) 69 content = self.consume_msg(consumer) 70 if not content: 71 return False 72 73 # dict轉json保存數據內容 74 content_json = json.dumps(content, ensure_ascii=False, indent=4) 75 f.write(content_json) 76 f.write("\n\n") 77 print(u"第%s條數據寫入完成." % str(i + 1)) 78 79 print(u"完成.")