python-使用pykafka模塊連接kafka


1、導入模塊

from pykafka import KafkaClient,common

 2、實例化

client = KafkaClient(hosts="host:port")   #實例化
print(client.topics)
print(client.brokers)

ps:一個坑
broker打印出來的host如果是域名形式的(kafka自己設置了只能通過域名連接),實例化KafkaClient時傳入的是ip,在沒有配置host的情況下,連接會報socket連接錯誤
出現socket錯誤時,首先檢查是否是這種情況,如果是,在host文件中配置域名和ip的映射可以解決

3、生產者demo

 

from pykafka import KafkaClient,common
client = KafkaClient(hosts="ip:port")   #實例化
print(client.topics)
print(client.brokers)
topic = client.topics['test_topic']  #指定topic,沒有就新建
producer = topic.get_producer()
for i in range(1):
    producer.produce(('test message ' + str(i ** 2)).encode())
producer.stop()

 

 

 

4、消費者demo

from pykafka import KafkaClient,common
import time
import json
class KafkaTest(object):
    def __init__(self, host):
        self.host = host
        self.client = KafkaClient(hosts=self.host)

    def balance_consumer(self, topic, offset=0):
        """
        使用balance consumer去消費kafka
        :return:
        """
        result=[]

        topic = self.client.topics[topic.encode()]
        # managed=True 設置后,使用新式reblance分區方法,不需要使用zk,而False是通過zk來實現reblance的需要使用zk,必須指定        # zookeeper_connect = "zookeeperIp",consumer_group='test_group',
        consumer = topic.get_balanced_consumer(consumer_group='test_group',
                                    auto_commit_enable=True,managed=True,
                                            consumer_timeout_ms=1000)
        partitions = topic.partitions
        print("分區 {}".format(partitions))
        earliest_offsets = topic.earliest_available_offsets()
        print("最早可用offset {}".format(earliest_offsets))
        last_offsets = topic.latest_available_offsets()
        print("最近可用offset {}".format(last_offsets))
        offset = consumer.held_offsets
        print("當前消費者分區offset情況{}".format(offset))
        while True:
            msg = consumer.consume()
            if msg:
                offset = consumer.held_offsets
                print("當前位移:{}".format(offset))
                result.append(eval(msg.value.decode()))
                print(msg.value.decode())
                consumer.commit_offsets()   #commit一下
                
            else:
                print("沒有數據")



if __name__ == '__main__':
    host = 'ip:port' 
    kafka_ins = KafkaTest(host)
    topic = 'test_topic'
    kafka_ins.balance_consumer(topic)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM