基於pykafka簡單實現KAFKA消費者
By: 授客 QQ:1033553122
1.測試環境
python 3.4
zookeeper-3.4.13.tar.gz
下載地址1:
http://zookeeper.apache.org/releases.html#download
https://www.apache.org/dyn/closer.cgi/zookeeper/
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
下載地址2:
https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ
kafka_2.12-2.1.0.tgz
下載地址1:
http://kafka.apache.org/downloads.html
下載地址2:
https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw
pykafka-2.8.0.tar.gz
下載地址1:
https://pypi.org/project/pykafka/
2.問題描述
使用python-kafka類庫實現kafka消費者時,發現程序有時候會自動停止消費,對一些參數進行配置后無果,換成pykafka類庫實現,搞定
3.代碼簡單實現
#-*- encoding:utf-8 -*-
__author__ = 'shouke'
from pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")
# 獲取主題
print(client.topics)
topic = client.topics['MY_TOPIC1']
# 獲取消費者
consumer = topic.get_balanced_consumer('MY_GROUP1', auto_commit_enable=True, auto_commit_interval_ms=3000)
for message in consumer:
if message is not None:
print(message.offset, message.value)
參考鏈接:
