Python KafkaProducer and KafkaConsumer的開發模塊


1.在python中往kakfa寫數據和讀取數據,使用的是python-kafka庫

2.消費者需持續寫入數據,因groupid存在偏移量,才能看看到數據。

3.安裝庫的命令為pip install python-kafka -i https://pypi.douban.com/simple

4.其中返回的message為一個生成器,其中元素的type為<class 'kafka.consumer.fetcher.ConsumerRecord'>

代碼如下

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json


class Kafka_producer():
    '''
    使用kafka的生產模塊
    '''

    def __init__(self, kafkahost,kafkaport, kafkatopic):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.producer = KafkaProducer(bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort
            ))

    def sendjsondata(self, params):
        try:
            parmas_message = json.dumps(params)
            producer = self.producer
            producer.send(self.kafkatopic, parmas_message.encode('utf-8'))
            producer.flush()
        except KafkaError as e:
            print e


class Kafka_consumer():
    '''
    使用Kafka—python的消費模塊
    '''

    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
                                      bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort ))

    def consume_data(self):
        try:
            for message in self.consumer:
                # print json.loads(message.value)
                yield message
        except KeyboardInterrupt, e:
            print e


def main():
    '''
    測試consumer和producer
    :return:
    '''
    ##測試生產模塊
    #producer = Kafka_producer("127.0.0.1", 9092, "ranktest")
    #for id in range(10):
    #    params = '{abetst}:{null}---'+str(i)
    #    producer.sendjsondata(params)
    ##測試消費模塊
    #消費模塊的返回格式為ConsumerRecord(topic=u'ranktest', partition=0, offset=202, timestamp=None, 
    #\timestamp_type=None, key=None, value='"{abetst}:{null}---0"', checksum=-1868164195, 
    #\serialized_key_size=-1, serialized_value_size=21)
    consumer = Kafka_consumer('127.0.0.1', 9092, "ranktest", 'test-python-ranktest')
    message = consumer.consume_data()
    for i in message:
        print i.value


if __name__ == '__main__':
    main()

消費結果為:

i.value:

i.offset:


作 者:小閃電 

出處:http://www.cnblogs.com/yueyanyu/ 

本文版權歸作者和博客園共有,歡迎轉載、交流,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。如果覺得本文對您有益,歡迎點贊、歡迎探討。本博客來源於互聯網的資源,若侵犯到您的權利,請聯系博主予以刪除。


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM