最近需要做一個訂閱kafka中的消息並存入Phoenix中的項目,首先使用了pykakfa模塊,代碼如下:
1 """創建kafka連接""" 2 kafka_client = KafkaClient(zookeeper_hosts="10.10.10.36:2181,10.10.10.37:2181,10.10.10.38:2181") 3 topic = kafka_client.topics['base_count'] 4 balanced_consumer = topic.get_balanced_consumer( 5 consumer_group='testgroup', 6 auto_commit_enable=True # 設置為Flase的時候不需要添加 consumer_group 7 ) 8 9 for message in balanced_consumer: 10 if message is not None: 11 # print dir(message) 12 # print "compression_type",message.compression_type 13 # print "decode",message.decode 14 # print "delivery_report_q",message.delivery_report_q 15 # print "offset",message.offset 16 # print "pack_into",message.pack_into 17 # print "partition",message.partition 18 # print "partition_id",message.partition_id 19 # print "partition_key",message.partition_key 20 # print "produce_attempt",message.produce_attempt 21 # print "value",message.value 22 # exit(1) 23 """將消息切分""" 24 a,b,c,d = message.value.split("|")
在windows中運行正常,但是放到ubuntu14.04下,出現錯誤:
No handlers could be found for logger "pykafka.connection"
在官網https://github.com/Parsely/pykafka查看介紹,發現這句話:We currently test against librdkafka 0.9.1 only. Note that use on pypy is not recommended at this time; the producer is certainly expected to crash.
之后查看用apt-get install librdkafka-dev安裝的版本為0.8.3,想着可能是這個的原因,就去https://github.com/edenhill/librdkafka下載了源碼進行編譯,編譯倒是很順利,之后也用源碼的方式進行了Pykafka的編譯安裝,可測試后還是出現以上錯誤。
之后換了kafka的模塊,代碼如下:
1 kafka_hosts = ["10.10.10.36:6667","10.10.10.37:6667","10.10.10.38:6667"] 2 kafka_topic = "base_count" 3 consumer = KafkaConsumer(kafka_topic,group_id="test_aaa",bootstrap_servers=kafka_hosts) 4 for message in consumer: 5 if message is not None: 6 """將消息切分""" 7 minute, base, people, carrieroperator = message.value.split("|")
這次在windows下還是正常運行,在Ubuntu14.04下報錯:
Traceback (most recent call last): File "new_main.py", line 16, in <module> consumer = KafkaConsumer(kafka_topic,group_id="test_aaa",bootstrap_servers=kafka_hosts) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 288, in __init__ self._client = KafkaClient(metrics=self._metrics, **self.config) File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 204, in __init__ self.config['api_version'] = self.check_version(timeout=check_timeout) File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 815, in check_version raise Errors.NoBrokersAvailable() kafka.errors.NoBrokersAvailable: NoBrokersAvailable
百度了一下,發現問題出在kafka_hosts的配置上,我雖然使用了IP,但是據說kafka返回的是域名,我在本地windows段將整個大數據平台都配置到了hosts,所以本地調試的時候一切正常,這台ubuntu是一台新裝的機器,還沒有配置hosts,所以無法訪問,在將hosts配置完成后,程序正常,問題解決。
pykafka