Python測試Kafka集群(kafka-python)


生產者代碼:

import time
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer


producer = KafkaProducer(bootstrap_servers = ['10.200.1.X:9092', '10.200.1.X:9092', '10.200.1.X:9092'])
# Assign a topic
topic = 'my-topic'

def test():
    print('begin')
    n = 1
    while (n<=100):
        producer.send(topic, str(n))
        print "send" + str(n)
        n += 1
        time.sleep(0.5)
    print('done')

if __name__ == '__main__':
    test()

 

消費者代碼:

from kafka import KafkaConsumer

#connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('my-topic', bootstrap_servers = ['10.200.1.X:9092', '10.200.1.X:9092', '10.200.1.X:9092'])

for msg in consumer:
    print msg

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM