本文將會介紹在windows環境下啟動Kafka,並通過nodejs作為客戶端,生產並消費消息。
第一步,Kafka需要java運行時,先安裝配置java環境。通過在命令行中輸入java -version確認java是否成功安裝(可能需要查看windows的環境變量中是否有java)。
第二步,Kafka官網下載最新版本的壓縮包(.tgz格式),並解壓。
分別在兩個命令行里面啟動zookeeper、kafka(解壓縮路徑下)
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties
說明一下zookeeper和kafka的關系:zookeeper是集群的調度者,kafka才是消息隊列。 zookeeper的默認端口:2181,kafka的默認端口:9092
相關配置可以在config文件下的server.properties和zookeeper.properties中找到
消費者客戶端需要的group.id可以在config->consumer.properties中找到。
第三步,使用命令行測試生產、消費
//創建一個topic:test
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
//查看topic
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
//創建生產者
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
//創建消費者
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
第四步,發送消息
在生產者窗口,隨意輸入一條消息,可以在消費者窗口看到該消息。
最后,使用nodejs訪問kafka
npm init -y
npm install kafkajs
新建demo.js,輸入以下代碼
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
})
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-consumer-group' })
const run = async () => {
// Producing
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello KafkaJS user!' },
],
})
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
},
})
}
run().catch(console.error)
最后執行命令
node demo.js
參考:
https://zhuanlan.zhihu.com/p/101162159
https://blog.csdn.net/weixin_28829325/article/details/113414177
https://github.com/tulios/kafkajs