python訂閱kafka遇到無法連接的問題


最近需要做一個訂閱kafka中的消息並存入Phoenix中的項目,首先使用了pykakfa模塊,代碼如下:

 1 """創建kafka連接"""
 2     kafka_client = KafkaClient(zookeeper_hosts="10.10.10.36:2181,10.10.10.37:2181,10.10.10.38:2181")
 3     topic = kafka_client.topics['base_count']
 4     balanced_consumer = topic.get_balanced_consumer(
 5         consumer_group='testgroup',
 6         auto_commit_enable=True  # 設置為Flase的時候不需要添加 consumer_group
 7     )
 8 
 9     for message in balanced_consumer:
10         if message is not None:
11             # print dir(message)
12             # print "compression_type",message.compression_type
13             # print "decode",message.decode
14             # print "delivery_report_q",message.delivery_report_q
15             # print "offset",message.offset
16             # print "pack_into",message.pack_into
17             # print "partition",message.partition
18             # print "partition_id",message.partition_id
19             # print "partition_key",message.partition_key
20             # print "produce_attempt",message.produce_attempt
21             # print "value",message.value
22             # exit(1)
23             """將消息切分"""
24             a,b,c,d = message.value.split("|")

在windows中運行正常,但是放到ubuntu14.04下,出現錯誤:

No handlers could be found for logger "pykafka.connection"

在官網https://github.com/Parsely/pykafka查看介紹,發現這句話:We currently test against librdkafka 0.9.1 only. Note that use on pypy is not recommended at this time; the producer is certainly expected to crash.

之后查看用apt-get install librdkafka-dev安裝的版本為0.8.3,想着可能是這個的原因,就去https://github.com/edenhill/librdkafka下載了源碼進行編譯,編譯倒是很順利,之后也用源碼的方式進行了Pykafka的編譯安裝,可測試后還是出現以上錯誤。

之后換了kafka的模塊,代碼如下:

 

1 kafka_hosts = ["10.10.10.36:6667","10.10.10.37:6667","10.10.10.38:6667"]
2     kafka_topic = "base_count"
3     consumer = KafkaConsumer(kafka_topic,group_id="test_aaa",bootstrap_servers=kafka_hosts)
4     for message in consumer:
5         if message is not None:
6             """將消息切分"""
7             minute, base, people, carrieroperator = message.value.split("|")

 

這次在windows下還是正常運行,在Ubuntu14.04下報錯:

Traceback (most recent call last):
  File "new_main.py", line 16, in <module>
    consumer = KafkaConsumer(kafka_topic,group_id="test_aaa",bootstrap_servers=kafka_hosts)
  File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 288, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 204, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 815, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

百度了一下,發現問題出在kafka_hosts的配置上,我雖然使用了IP,但是據說kafka返回的是域名,我在本地windows段將整個大數據平台都配置到了hosts,所以本地調試的時候一切正常,這台ubuntu是一台新裝的機器,還沒有配置hosts,所以無法訪問,在將hosts配置完成后,程序正常,問題解決。

 

 

pykafka


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM