最近需要做一个订阅kafka中的消息并存入Phoenix中的项目,首先使用了pykakfa模块,代码如下:
1 """创建kafka连接""" 2 kafka_client = KafkaClient(zookeeper_hosts="10.10.10.36:2181,10.10.10.37:2181,10.10.10.38:2181") 3 topic = kafka_client.topics['base_count'] 4 balanced_consumer = topic.get_balanced_consumer( 5 consumer_group='testgroup', 6 auto_commit_enable=True # 设置为Flase的时候不需要添加 consumer_group 7 ) 8 9 for message in balanced_consumer: 10 if message is not None: 11 # print dir(message) 12 # print "compression_type",message.compression_type 13 # print "decode",message.decode 14 # print "delivery_report_q",message.delivery_report_q 15 # print "offset",message.offset 16 # print "pack_into",message.pack_into 17 # print "partition",message.partition 18 # print "partition_id",message.partition_id 19 # print "partition_key",message.partition_key 20 # print "produce_attempt",message.produce_attempt 21 # print "value",message.value 22 # exit(1) 23 """将消息切分""" 24 a,b,c,d = message.value.split("|")
在windows中运行正常,但是放到ubuntu14.04下,出现错误:
No handlers could be found for logger "pykafka.connection"
在官网https://github.com/Parsely/pykafka查看介绍,发现这句话:We currently test against librdkafka 0.9.1 only. Note that use on pypy is not recommended at this time; the producer is certainly expected to crash.
之后查看用apt-get install librdkafka-dev安装的版本为0.8.3,想着可能是这个的原因,就去https://github.com/edenhill/librdkafka下载了源码进行编译,编译倒是很顺利,之后也用源码的方式进行了Pykafka的编译安装,可测试后还是出现以上错误。
之后换了kafka的模块,代码如下:
1 kafka_hosts = ["10.10.10.36:6667","10.10.10.37:6667","10.10.10.38:6667"] 2 kafka_topic = "base_count" 3 consumer = KafkaConsumer(kafka_topic,group_id="test_aaa",bootstrap_servers=kafka_hosts) 4 for message in consumer: 5 if message is not None: 6 """将消息切分""" 7 minute, base, people, carrieroperator = message.value.split("|")
这次在windows下还是正常运行,在Ubuntu14.04下报错:
Traceback (most recent call last): File "new_main.py", line 16, in <module> consumer = KafkaConsumer(kafka_topic,group_id="test_aaa",bootstrap_servers=kafka_hosts) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 288, in __init__ self._client = KafkaClient(metrics=self._metrics, **self.config) File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 204, in __init__ self.config['api_version'] = self.check_version(timeout=check_timeout) File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 815, in check_version raise Errors.NoBrokersAvailable() kafka.errors.NoBrokersAvailable: NoBrokersAvailable
百度了一下,发现问题出在kafka_hosts的配置上,我虽然使用了IP,但是据说kafka返回的是域名,我在本地windows段将整个大数据平台都配置到了hosts,所以本地调试的时候一切正常,这台ubuntu是一台新装的机器,还没有配置hosts,所以无法访问,在将hosts配置完成后,程序正常,问题解决。
pykafka