python模擬發送、消費kafka消息


參考文章: https://zhuanlan.zhihu.com/p/279784873

生產者代碼:

import traceback

from kafka import KafkaProducer,KafkaConsumer
from faker import Faker

fake=Faker()
# 生產者
kafka_topic = "test_kafka_demo"
kafka_bootstrap_servers = ['xx:9092','xx:9092','xxx:9092']
# 消費者
kafka_topic_group = "test-group-zeze" #消費者群組
def producer(num:int):
    producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers)
    phones = [fake.name()+"-"+str(i) for i in range(num)]
    for p in phones:
        msg = bytes(p, encoding='utf-8')
        # print("生成消息",msg)

      #同一個key的消息會被自動分配到同一個分區 future=producer.send(kafka_topic, key=b"test",value=msg)
      #加了監聽事件是否成功發送后,執行速度很慢,所以這里去掉了
# try: # future.get(timeout=2) # except Exception as e: # traceback.print_stack() print("成功生產{}條消息".format(num)) producer.close()

消費者代碼:

from kafka import KafkaConsumer


# 生產者
kafka_topic = "test_kafka_demo"
kafka_bootstrap_servers = ['xx:9092','xx:9092','xx:9092'] #xx為對應的ip地址
# 消費者
kafka_topic_group = "test-group-zeze" #消費者群組


def consumer():
    consumer = KafkaConsumer(kafka_topic,group_id=kafka_topic_group,bootstrap_servers=kafka_bootstrap_servers)

    for message in consumer:
        print ("開始消費:","%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                              message.offset, message.key,
                                              message.value))

    # consumer.close()

親測單機生產10w消息耗時20秒內,單線程

 

消費者沒記錄耗時,但是也非常快,kafka性能確實牛 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM