Python3操作Kafka


前言

操作Kafka之前,先啟動Kafka:

方式一:進入Kafka安裝目錄,常規模式啟動:
bin/kafka-server-start.sh config/server.properties

方式二:進入Kafka安裝目錄,進程守護模式啟動kafka:
nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 & 

另外,Kafka的關閉命令:

進入Kafka安裝目錄,執行:
bin/kafka-server-stop.sh

Python操作Kafka發送字符串

編寫Kafka生產者Producer:

# kafka_producer.py

from kafka import KafkaProducer
from time import sleep

def start_producer():
    producer = KafkaProducer(bootstrap_servers='192.168.0.157:9092')
    for i in range(0,100000):
        msg = 'msg is ' + str(i)
        producer.send('topic_test', msg.encode('utf-8'))
        sleep(3)

if __name__ == '__main__':
    start_producer()

編寫Kafka消費者KafkaConsumer:

# kafka_consumer.py

from kafka import KafkaConsumer
import time

def start_consumer():
    consumer = KafkaConsumer('topic_test', bootstrap_servers = '192.168.0.157:9092')
    for msg in consumer:
        print(msg)
        print("topic = %s" % msg.topic) # topic default is string
        print("partition = %d" % msg.offset)
        print("value = %s" % msg.value.decode()) # bytes to string
        print("timestamp = %d" % msg.timestamp)
        print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( msg.timestamp/1000 )) )

if __name__ == '__main__':
    start_consumer()

以上兩個代碼,先運行消費者,后運行生產者,就看到消費者在監聽生產者發信息。

發送結構化json待續。。

以上。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM