背景:一個小應用,用celery下發任務,任務內容為kafka生產一些數據。
問題:使用confluent_kafka模塊時,單獨啟用kafka可以正常生產消息,但是套上celery后,kafka就無法將新消息生產到topic隊列中了。
解決:換了個pykafka模塊,結果問題就沒有了。
我很疑惑啊,是我調用confluent_kafka的方法不對嗎,怎么套上celery就不行了呢?
可以用的pykafka代碼:
tasks.py
from celery import Celery from pykafka import KafkaClient import json app = Celery('tasks', backend='amqp', broker='amqp://xxx:xxxxxx@localhost/xxxhost') @app.task def produce(): client = KafkaClient(hosts="localhost:9092") print client.topics topic = client.topics['test_result'] with topic.get_sync_producer() as producer: for i in range(3): data = {"info": {"ip": "1.2.3.4", "port": i}, "type": "test", "task_id": "test_celery_kafka"} print('Producing message: %s' % data) producer.produce(json.dumps(data)) print "finish produce" producer.stop() print "stop"
run_worker.py
from tasks import produce for i in range(1000): result = produce.delay() print result.status
無法正常生產數據的confluent_kafka代碼:
tasks.py
from celery import Celery from kafka_producer import p import json app = Celery('tasks', backend='amqp', broker='amqp://xxx:xxxxxx@localhost/xxxhost') @app.task def produce(): for i in range(3000): data = {"info": {"ip": "1.2.3.4"}, "type": "test", "task_id": "test_celery_kafka"} print('Producing message: %s' % data) p.produce('test_result3', json.dumps(data)) print "finish produce" p.flush() print "finish flush"
run_worker.py
from tasks import produce result = produce.delay() print result.status print result.ready() print result.get() print result.status