1、導入模塊
from pykafka import KafkaClient,common
2、實例化
client = KafkaClient(hosts="host:port") #實例化
print(client.topics)
print(client.brokers)
ps:一個坑
broker打印出來的host如果是域名形式的(kafka自己設置了只能通過域名連接),實例化KafkaClient時傳入的是ip,在沒有配置host的情況下,連接會報socket連接錯誤
出現socket錯誤時,首先檢查是否是這種情況,如果是,在host文件中配置域名和ip的映射可以解決
3、生產者demo
from pykafka import KafkaClient,common client = KafkaClient(hosts="ip:port") #實例化 print(client.topics) print(client.brokers) topic = client.topics['test_topic'] #指定topic,沒有就新建 producer = topic.get_producer() for i in range(1): producer.produce(('test message ' + str(i ** 2)).encode()) producer.stop()
4、消費者demo
from pykafka import KafkaClient,common import time import json class KafkaTest(object): def __init__(self, host): self.host = host self.client = KafkaClient(hosts=self.host) def balance_consumer(self, topic, offset=0): """ 使用balance consumer去消費kafka :return: """ result=[] topic = self.client.topics[topic.encode()] # managed=True 設置后,使用新式reblance分區方法,不需要使用zk,而False是通過zk來實現reblance的需要使用zk,必須指定 # zookeeper_connect = "zookeeperIp",consumer_group='test_group', consumer = topic.get_balanced_consumer(consumer_group='test_group', auto_commit_enable=True,managed=True, consumer_timeout_ms=1000) partitions = topic.partitions print("分區 {}".format(partitions)) earliest_offsets = topic.earliest_available_offsets() print("最早可用offset {}".format(earliest_offsets)) last_offsets = topic.latest_available_offsets() print("最近可用offset {}".format(last_offsets)) offset = consumer.held_offsets print("當前消費者分區offset情況{}".format(offset)) while True: msg = consumer.consume() if msg: offset = consumer.held_offsets print("當前位移:{}".format(offset)) result.append(eval(msg.value.decode())) print(msg.value.decode()) consumer.commit_offsets() #commit一下 else: print("沒有數據") if __name__ == '__main__': host = 'ip:port' kafka_ins = KafkaTest(host) topic = 'test_topic' kafka_ins.balance_consumer(topic)
