nodejs中的kafkajs,消費順序,不重復消費


參考:https://kafka.js.org/docs
確保同一個消息發送到同一個partition,一個topic,一個partition,一個consumer,內部單線程消費
1.封裝kafkaUtil類



const { Kafka, logLevel } = require('kafkajs') //const cache = require('../conn/redis.js'); const kafka = new Kafka({ clientId: 'my-app', brokers: [ "lcoalhost:8092", "localhost:8093", "localhost:8094", "lcoalhost:8095", "localhost:8096", ], retry: { retries: 8 }, logLevel: logLevel.ERROR }) /** * 如果groupId已存在重復的,建立不同的kafka實例會報錯 */ /** * kafka生產者發送消息 * messages: [{ value: 'Hello KafkaJS user!', }, { value: 'Hello KafkaJS user2!', }], */ exports.producer = async (topic, groupId, msg) => { try { const producer = kafka.producer({ groupId: groupId }) await producer.connect() await producer.send({ topic: topic, messages: msg, acks: 1 }) } catch (error) { throw error; } } exports.consumer = async (topic, groupId, callback) => { try { const consumer = kafka.consumer({ groupId: groupId }) await consumer.connect() await consumer.subscribe({ topic: topic }) await consumer.run({ autoCommit: true, eachMessage: async ({ topic, partition, message }) => {
//防止重復消費數據 await consumer.commitOffsets([{ topic: topic, partition: partition, offset: Number(message.offset) + 1 }]) let msg = message.value.toString() console.log(72, '消費者接收到的數據為:', msg); callback(msg); } }) } catch (err) { throw err; } }

2.producer.js

   

const kafka = require('./kafkaUtil');
(async function () {
    const topic = 'MY——TOPIC1'
    const groupId = 'MY——TOPIC1'
    try {
        for (let i = 0; i < 10000; i++) {
            await new Promise((resolve, reject) => {
                setTimeout(async () => {
                    resolve(1)
                }, 1000)
            }).then(async () => {
                console.log('發送的數據為:', i)
                await kafka.producer(topic, groupId, [{
                    key: "a",//key值為了保證消費者按照生產者生產的數據順序,消費數據,key值必須一致;如果不需要消費者按照生產的順序消費,key去掉即可,參考: https://www.zhihu.com/question/266390197
                    value: `${i}`
                }])
            })
        }
    } catch (error) {
        console.log(14, error)
        throw error;
    }

})()

3.consumer.js

 

const kafka = require('./kafkaUtil');
(async function () {
    const fs = require('fs');
    let count = 1;
    const topic = 'MY——TOPIC1'
    const groupId = 'MY——TOPIC1'
    try {
        await kafka.consumer(topic, groupId, async (msg) => {
            let str = `第${count}接收到的數據為:${msg}`;
            count++;
            fs.writeFileSync(`${process.cwd()}/test01.txt`, str, {
                flag: 'a',
            })
            console.log(str)
        })
    } catch (error) {
        console.log(14, error)
        throw error;
    }
})()

經實際測試,沒有發現消費問題。如有發現問題,請多多指教,謝謝。。。  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM