1.在python中往kakfa寫數據和讀取數據,使用的是python-kafka庫
2.消費者需持續寫入數據,因groupid存在偏移量,才能看看到數據。
3.安裝庫的命令為pip install python-kafka -i https://pypi.douban.com/simple
4.其中返回的message為一個生成器,其中元素的type為<class 'kafka.consumer.fetcher.ConsumerRecord'>
代碼如下
#!/usr/bin/env python # -*- coding: utf-8 -*- from kafka import KafkaProducer from kafka import KafkaConsumer from kafka.errors import KafkaError import json class Kafka_producer(): ''' 使用kafka的生產模塊 ''' def __init__(self, kafkahost,kafkaport, kafkatopic): self.kafkaHost = kafkahost self.kafkaPort = kafkaport self.kafkatopic = kafkatopic self.producer = KafkaProducer(bootstrap_servers = '{kafka_host}:{kafka_port}'.format( kafka_host=self.kafkaHost, kafka_port=self.kafkaPort )) def sendjsondata(self, params): try: parmas_message = json.dumps(params) producer = self.producer producer.send(self.kafkatopic, parmas_message.encode('utf-8')) producer.flush() except KafkaError as e: print e class Kafka_consumer(): ''' 使用Kafka—python的消費模塊 ''' def __init__(self, kafkahost, kafkaport, kafkatopic, groupid): self.kafkaHost = kafkahost self.kafkaPort = kafkaport self.kafkatopic = kafkatopic self.groupid = groupid self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid, bootstrap_servers = '{kafka_host}:{kafka_port}'.format( kafka_host=self.kafkaHost, kafka_port=self.kafkaPort )) def consume_data(self): try: for message in self.consumer: # print json.loads(message.value) yield message except KeyboardInterrupt, e: print e def main(): ''' 測試consumer和producer :return: ''' ##測試生產模塊 #producer = Kafka_producer("127.0.0.1", 9092, "ranktest") #for id in range(10): # params = '{abetst}:{null}---'+str(i) # producer.sendjsondata(params) ##測試消費模塊 #消費模塊的返回格式為ConsumerRecord(topic=u'ranktest', partition=0, offset=202, timestamp=None, #\timestamp_type=None, key=None, value='"{abetst}:{null}---0"', checksum=-1868164195, #\serialized_key_size=-1, serialized_value_size=21) consumer = Kafka_consumer('127.0.0.1', 9092, "ranktest", 'test-python-ranktest') message = consumer.consume_data() for i in message: print i.value if __name__ == '__main__': main()
消費結果為:
i.value:
i.offset:
作 者:小閃電
出處:http://www.cnblogs.com/yueyanyu/
本文版權歸作者和博客園共有,歡迎轉載、交流,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。如果覺得本文對您有益,歡迎點贊、歡迎探討。本博客來源於互聯網的資源,若侵犯到您的權利,請聯系博主予以刪除。