node.js中kafka的封裝和高並發消費限流優雅降級以及egg-kafka的封裝說明


HI!,你好,我是zane,zanePerfor是一款我開發的一個前端性能監控平台,現在支持web瀏覽器端和微信小程序端。

我定義為一款完整,高性能,高可用的前端性能監控系統,這是未來會達到的目的,現今的架構也基本支持了高可用,高性能的部署。實際上還不夠,在很多地方還有優化的空間,我會持續的優化和升級。

開源不易,如果你也熱愛技術,擁抱開源,希望能小小的支持給個star。

項目的github地址:https://github.com/wangweianger/zanePerfor

項目開發文檔說明:https://blog.seosiwei.com/performance/index.html

 

Kafka是由Apache軟件基金會開發的一個開源流處理平台,由Scala和Java編寫。
Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 
這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 
這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。 
針對於zanePerfor這樣的用戶訪問行為,頁面性能監控系統,但又要求實時處理的限制,這是一個可行的解決方案。

 

zanePerfor中對於kafka的應用使用了kafka-node包,並在此基礎上封裝了egg-kafka插件。

zanePerfor初步的探索了kafka在node.js中的應用,以下內容主要講解kafka在zanePerfor項目中的使用方式。

如果你對在node.js中使用kafka有更多的建議和心得,也希望能跟我一起分享。

 

zanePerfor項目中kafka應用介紹:

啟用kafka配置說明:

// config/config.default.js
// kafka 配置 (report_data_type=kafka生效)
// 配置參考 https://www.npmjs.com/package/kafka-node
config.kafka = {
    client: {
        kafkaHost: 'localhost:9092',
    },
    producer: {
        web: {
            topic: 'zane_perfor_web',
            partition: 0, // default 0
            attributes: 0, // default: 0
            // timestamp: Date.now(),
        },
        wx: {
            topic: 'zane_perfor_wx',
        },
    },
    // consumer 和 consumerGroup消費任選其一即可
    // 優先選擇consumer消費,兩種消費配置任留一種即可
    consumer: {
        web: {
            topic: 'zane_perfor_web',
            offset: 0, // default 0
            partition: 0, // default 0
            isone: false, // 此參數默認不可更改
            total_limit: 10000, // 消息隊列消費池限制數, 0:不限制 number: 限制條數 高並發時服務優雅降級方案
        },
        wx: {
            topic: 'zane_perfor_wx',
            isone: false,
            total_limit: 10000,
        },
    },
    consumerGroup: {
        web: { // ConsumerGroup(options, topics)
            topic: 'zane_perfor_web',
            groupId: 'WebPerformanceGroup',
            commitOffsetsOnFirstJoin: true,
        },
        wx: {
            topic: 'zane_perfor_wx',
            groupId: 'WxPerformanceGroup',
            commitOffsetsOnFirstJoin: true,
        },
    },
};

  

配置說明:

client參數說明:

client參數即為kafka-node中的KafkaClient參數,參考地址:https://www.npmjs.com/package/kafka-node#kafkaclient

producer生產者參數說明:

producer分web端和wx端配置

producer參數為kafka-node中的send參數,參考地址:https://www.npmjs.com/package/kafka-node#sendpayloads-cb

consumer消費者參數說明:

consumer分web端和wx端配置

consumer參數為kafka-node中的consumer參數, 參考地址:https://www.npmjs.com/package/kafka-node#consumerclient-payloads-options

consumerGroup消費者參數說明:

consumerGroup分web端和wx端配置

consumerGroup參數為kafka-node中的consumerGroup參數,參考地址:https://www.npmjs.com/package/kafka-node#consumergroupoptions-topics

關於消費者說明

config配置中有consumer和consumerGroup配置,規則如下:

  • 如果consumer配置為真有限使用consumer配置
  • 如果想啟用consumerGroup配置,則注釋或者刪除consumer配置即可

 

kafka生產消費邏輯實現:

 

核心代碼實現:

一:生產者

kafka的性能非常強勁,能支持超高並發,因此所有客戶端上報的信息都存儲到消息隊列中,限流策略只使用在消費端,生產端不做限流設置。

// app/controller/api/web/report.js
// 通過kafka 消息隊列消費數據
    async saveWebReportDataForKafka(query) {
        // 生產者
        this.app.kafka.send(
            'web',
            JSON.stringify(query)
        );

        // 消費者
        if (!isKafkaConsumer && !this.app.config.kafka.consumer.web.isone) {
            this.ctx.service.web.reportTask.saveWebReportDatasForKafka();
            isKafkaConsumer = true;
            this.app.config.kafka.consumer.web.isone = true;
        }
    }

  

  • this.app.kafka.send是封裝的插件egg-kafka中的方法,功能就是生產信息

  • if (!isKafkaConsumer && !this.app.config.kafka.consumer.web.isone)是為了保證訂閱消息的方法只執行一次,后面一但有消息產生,會自動觸發訂閱函數進行數據消費消費。

二:消費者

// app/service/web/report_task.js
// kafka 消費信息
    async saveWebReportDatasForKafka() {
        if (this.kafkaConfig.consumer) {
            this.app.kafka.consumer('web', message => {
                this.consumerDatas(message);
            });
        } else if (this.kafkaConfig.consumerGroup) {
            this.app.kafka.consumerGroup('web', message => {
                this.consumerDatas(message);
            });
        }
    }

  

  • this.app.kafka.consumer 單獨消費,egg-kafka中暴露的方法

  • this.app.kafka.consumerGroup 以分組的方式消費消息

  • 優先使用consumer消費,其次使用consumerGroup進行消費

 

egg-kafka插件封裝說明

為了更好、更方便的使用kafka,項目中對node-kafka進行了一層封裝。

詳細請參考:/lib/plugin/egg-kafka/lib/kafka.js

send代碼實現如下:

send(type, data) {
        assert(type, '[egg-kafka] type is must required.');
        if (!data) return;
        let producer = this.app.config.kafka.producer[type] || {};
        let producers = [];
        if (typeof (data) === 'string') {
            producer.messages = data;
            producers = [ producer ];
        } else if (Object.prototype.toString.call(data) === '[object Object]') {
            producer = Object.assign({}, producer, data);
            producers = [ producer ];
        } else if (Object.prototype.toString.call(data) === '[object Array]') {
            for (let i = 0; i < data.length; i++) {
                data[i] = Object.assign({}, producer, data[i]);
            }
            producers = data;
        }
        this.producer.send(producers, (err, data) => {
            if (err) assert(err, '[egg-kafka] err. errmsg ${err}');
            console.log(data);
        });
    }

  

send有兩個參數,第一個參數type為發送類型,有web、wx兩個值可以選擇。

對data做了一定的判斷,send調用可以有以下幾種方式:

// 消息為String
this.app.kafka.send('web','hello world!');
// 消息為Object
this.app.kafka.send('web',{ topic:'test', messages:'hello world!' });
// 消息為Array
this.app.kafka.send('web',[{ topic: 'test', messages: 'hi', partition: 0}]);

  

consumer方法代碼實現:

consumer(type = 'web', fn) {
    assert(type, '[egg-kafka] consumers type argument must be required');
    const kafkaConfig = this.app.config.kafka;
    const consumer = kafkaConfig.consumer[type] || {};
    const consumers = Array.isArray(consumer) ? consumer : [ consumer ];
    const Consumer = kafka.Consumer;
    const _consumer = new Consumer(
        this.client,
        consumers,
        {
            autoCommit: true,
        }
    );
    _consumer.on('error', err => {
        this.app.coreLogger.error(`[egg-kafka] consumer have error ${err}`);
    });
    _consumer.on('message', message => {
        fn && fn(message);
    });
}

  

consumerGroup代碼實現:

consumerGroup(type = 'web', fn) {
    assert(type, '[egg-kafka] consumers type argument must be required');
    const kafkaConfig = this.app.config.kafka;
    const kafkaHost = kafkaConfig.client.kafkaHost;
    const consumerOption = kafkaConfig.consumerGroup[type] || {};
    const topic = consumerOption.topic;
    consumerOption.kafkaHost = kafkaHost;
    const ConsumerGroup = kafka.ConsumerGroup;
    const _consumer = new ConsumerGroup(consumerOption, topic);
    _consumer.on('error', err => {
        this.app.coreLogger.error(`[egg-kafka] consumer have error ${err}`);
    });
    _consumer.on('message', message => {
        fn && fn(message);
    });
}

  

消費限流策略:

由於kafka性能及其強悍,因此zanePerfor只對消費進行限流

 

代碼實現:

設置消費池數量

// config.default.js
{
	topic: 'zane_perfor_web',
	offset: 0, // default 0
	partition: 0, // default 0
	isone: false, // 此參數默認不可更改
	total_limit: 10000, // 消息隊列消費池限制數, 0:不限制 number: 限制條數 高並發時服務優雅降級方案		
}

  

kafka連接池數量判斷

// app/service/web/report_task.js 中 getWebItemDataForKafka 方法
// kafka 連接池限制
const msgtab = query.time + query.ip;
if (this.kafkatotal && this.kafkalist.length >= this.kafkatotal) return;
this.kafkalist.push(msgtab);

  

數據消費完成之后刪除消費標識

// app/service/web/report_task.js 中 getWebItemDataForKafka 方法
this.savePages(item, system.slow_page_time, () => {
    // 釋放消費池
    const index = this.kafkalist.indexOf(msgtab);
    if (index > -1) this.kafkalist.splice(index, 1);
});

  

至此實現了egg.js中對kafka的應用和封裝。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM