使用Python進行 kafka的生產與消費


kafka的生產與消費

在生產前需要  .需要創建一個topic,和消費的的groupid

比如可以在kafka管理系統中創建,不需要手動敲命令創建

1.創建topic和綁定消費組

 

 

 2.kafka的生產

import json
from kafka import KafkaProducer


topic_name="active_user_simplified"  #生產的topic
kafka_addr = '172.17.9.151:9092,172.17.9.157:9092,172.17.9.155:9092'


def create_kafaka():
    """
    寫入kafka數據,useractive數據,如果用戶活躍,useractiveUpdata服務就會消費
    :return:
    """
    producer = KafkaProducer(bootstrap_servers=kafka_addr,
                             value_serializer=lambda m: json.dumps(m).encode())
    for i in range(1):

        data = {
            "app_key": "07b6ed26c8bcce540204c8f7",
            "uid": 8004540429,
            "platform": "W",
            "type": "active_terminate",
            "itime": 1608977776
        }
        result=producer.send(topic_name, data)

        print(result)


create_kafaka()

3.kafka的消費

from kafka import KafkaConsumer

topic_name="active_user_simplified"
kafka_addr = '172.17.9.151:9092,172.17.9.157:9092,172.17.9.155:9092'



def consumer_kafaka():
    group_id = "group-segment-useractiveUpdate"
    consumer = KafkaConsumer(topic_name,
                             bootstrap_servers=kafka_addr,
                             group_id=group_id,
                             auto_offset_reset='earliest')
    for msg in consumer:
        print(msg.value)



consumer_kafaka()

 

 消費數據的日志

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM