前言
操作Kafka之前,先啟動Kafka:
方式一:進入Kafka安裝目錄,常規模式啟動:
bin/kafka-server-start.sh config/server.properties
方式二:進入Kafka安裝目錄,進程守護模式啟動kafka:
nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
另外,Kafka的關閉命令:
進入Kafka安裝目錄,執行:
bin/kafka-server-stop.sh
Python操作Kafka發送字符串
編寫Kafka生產者Producer:
# kafka_producer.py
from kafka import KafkaProducer
from time import sleep
def start_producer():
producer = KafkaProducer(bootstrap_servers='192.168.0.157:9092')
for i in range(0,100000):
msg = 'msg is ' + str(i)
producer.send('topic_test', msg.encode('utf-8'))
sleep(3)
if __name__ == '__main__':
start_producer()
編寫Kafka消費者KafkaConsumer:
# kafka_consumer.py
from kafka import KafkaConsumer
import time
def start_consumer():
consumer = KafkaConsumer('topic_test', bootstrap_servers = '192.168.0.157:9092')
for msg in consumer:
print(msg)
print("topic = %s" % msg.topic) # topic default is string
print("partition = %d" % msg.offset)
print("value = %s" % msg.value.decode()) # bytes to string
print("timestamp = %d" % msg.timestamp)
print("time = ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime( msg.timestamp/1000 )) )
if __name__ == '__main__':
start_consumer()
以上兩個代碼,先運行消費者,后運行生產者,就看到消費者在監聽生產者發信息。
發送結構化json待續。。
以上。