Centos7下安裝kafka,並使用python操作kafka的簡單使用


kafka(端口9091-9093)-zookeeper(端口2181-2183)集群配置,使用自帶的


 第一步:安裝jdk


 jdk的安裝參考:https://www.cnblogs.com/smilecindy/p/13736470.html


 第二步:安裝kafka並啟動kafka


 (1)下載kafka

直接從官網下載:wget https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz,進行解壓安裝

也可以使用命令下載:wget  wget https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz

(2)解壓kafka: tar -zxvf  kafka_2.11-1.0.0.tgz

 

 (3)運行zookeeper(kafka自帶的zookeeper):

cd kafka_2.11-1.0.0/     # 打開kafka目錄

sh bin/zookeeper-server-start.sh -daemon config/zookeeper.properties   # 后台運行zookeeper

(4)運行kafka:

sh bin/kafka-server-start.sh config/server.properties # 運行kafka服務

出現 “started” 則是啟動成功:

注意: 運行成功之后,該窗口不能關閉,該窗口為【運行服務窗口】


 第三步:新打開一個窗口,進行創建topic



命令:sh bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test5

 


 第四步:創建監聽test5的消息隊列程序----(consumer【消費者窗口】)


 命令:sh bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test5 --from-beginning

 


 第五步:再先打開一個窗口,創建發送test5消息隊列的生產者程序---(producer【生產者窗口】)


 命令:sh bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test5

在>后輸入信息並【enter】,則代表生產者發送了信息,如下圖:

 

回車之后,打開消費者端能夠監聽到生產者發送的消息,如下圖:

 


 第六步:使用python操作kafka


 1.安裝kafka的模塊:pip install kafka-python;注意python3使用的是kafka-python

2.新建一個kafka_consumer.py文件,並在【消費者窗口】執行該文件:

from kafka import KafkaConsumer
consumer = KafkaConsumer('test1', bootstrap_servers=['127.0.0.1:9092'])
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print(recv)

 3.新建一個kafka_producer.py文件,並在【生產者窗口】執行該文件:

import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
msg_dict = {
"status": {
"code": 0,
"message": "ok"
},
"data": [
{
"set": {
"bk_set_name": "空閑機池",
"bk_set_id": 10,
"TopModuleName": "",
"bk_parent_id": 3,
"bk_service_status": "1",
"bk_set_desc": "",
"bk_set_env": "3",
"description": "",
"bk_capacity": 0,
"subnet": "",
"wan_gate": "",
"node_name": "智能邊緣雲##空閑機池"
},
"room": None,
"lake": None,
"lake_list": []
}
],
"paging": {
"total_page": 1,
"page": 1,
"per_page": 10,
"total_record": 1
}
}
msg = json.dumps(msg_dict)
producer.send('test1', bytes(msg,'ascii'), partition=0)
producer.close()

文件執行成功之后,則回到消費者窗口,查看是否進行消費信息,如下圖:

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM