【python】kafka在與celery和gevent連用時遇到的問題


前提:kafka有同步,多線程,gevent異步和rdkafka異步四種模式。但是在與celery和gevent連用的時候,有的模式會出錯。

下面是我代碼運行的結果。

 

結論:使用多線程方式!

 

使用同步方式可以成功發送數據

def send_data_kafka(data):
    try:
        client = KafkaClient(hosts=broker_list)
        topic = client.topics[topic_name]
        with topic.get_sync_producer() as producer:  
            for d in data:
                print "send data"
                msg = json.dumps(d)
                producer.produce(msg)
            producer.stop()
    except Exception, e:
        LOGGER.exception("error in send_data_kafka")
        print e

 

使用rdkafka異步,只打印了一條send data之后卡住

def send_data_kafka(data):
    try:
        client = KafkaClient(hosts=broker_list)
        topic = client.topics[topic_name]
        with topic.get_producer(use_rdkafka=True) as producer:  
            for d in data:
                print "send data"
                msg = json.dumps(d)
                producer.produce(msg)
            producer.stop()
    except Exception, e:
        LOGGER.exception("error in send_data_kafka")
        print e

 

 

使用多線程,可以正常生產所有數據

def send_data_kafka(data):
    try:
        client = KafkaClient(hosts=broker_list)
        topic = client.topics[topic_name]
        with topic.get_producer() as producer:  
            for d in data:
                print "send data"
                msg = json.dumps(d)
                producer.produce(msg)
            producer.stop()
    except Exception, e:
        LOGGER.exception("error in send_data_kafka")
        print e

 

沒有用with,rdkafka異步,打印了所有的send data,后卡住

client = KafkaClient(hosts=broker_list)
topic = client.topics[topic_name]
producer = topic.get_producer(use_rdkafka=True)  # 異步,使用rdkafka庫,速度最快的方案

def send_data_kafka(data):
    try:
        for d in data:
            print "send data"
            msg = json.dumps(d)
            producer.produce(msg)
        producer.stop()
    except Exception, e:
        LOGGER.exception("error in send_data_kafka")
        print e

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM