實現一個簡單的Kafka多線程消費模型


最近項目上用到了Kafka(作為數據源接入),這里將自己的實踐分享出來,供大家參考或針砭。

 

從網上查閱資料發現,基本上有2中與Kafka對接的方式:

1.Spring-Kafka
2.調用Kafka API自己實現ConsumerClient

Spring-Kafka的基本原理就是Spring自動輪詢Poll數據,通過監聽器MessageListener.onMessage()向用戶自定義的消費入口(@KafkaListener)推送數據。因此對於用戶來說,僅需要關注自己的業務實現即可,Kafka數據對於業務來說就是一個方法的入參而已。這種設計很有意思,因為Kafka是不支持主動Push的,但是Spring-Kafka自己實現了這種角色反轉。Spring-Kafka本身就是一個很好的實現,而且上手相對簡單,推薦大家使用這種方式。

溫馨提示:Spring-Kafka和kafka-clients之間有版本的兼容性問題需特別注意,另外如果你使用SpringBoot開發的話也需要匹配特定的版本。

 

 

#Spring-Kafka KafkaConsumer消費模型(來源於網絡)

 

不過抱着學習研究的目的,本篇選擇第2中實現方式,其實和Spring-Kafka殊途同歸。

直奔主題,本篇就不闡述太多理論性的東西,僅介紹一些基本的Kafka API對象和概念:

1.KafkaConsumer,顧名思義就是Kafka的數據消費者,其主要作用是連接Kafka訂閱(subscribe)相關主題(topic)並拉取(poll)數據並提交消費偏移(offset)。

2.ConsumerRecord,Kafka數據接收記錄,其中有些重要的屬性:topic(主題),patition(分區),offset(偏移),key(主鍵),value(數據值)。

PS:KafkaConsumer是非線程安全的

 

對於一個Kafka消費客戶端有些基本的配置:

1.bootstrap.servers
--連接Kafka集群的地址,多個地址以逗號分隔

2.key.deserializer
--消息中key反序列化類,需要和Producer中key序列化類相對應

3.value.deserializer
--消息中value的反序列化類,需要和Producer中Value序列化類相對應

4.group.id
--消費者所屬消費組的唯一標識

 

為了提高單線程消費Kafka數據的效率,我們要在此基礎上創建一個專門用於處理數據的線程池。簡單來說,就是一個線程只用來Poll數據,然后丟給線程池去處理。

消費線程:KafkaConsumerClient.java 

/**
 * 消費線程
 * 
 * @author lichmama
 *
 */
public class KafkaConsumerClient extends Thread {
    /** 讀取超時 **/
    private static final int timeout = 5000;
    /** 核心線程數 **/
    private static final int corePoolSize = 5;
    /** 最大線程數 **/
    private static final int maximumPoolSize = 20;
    /** 空閑存活時間 **/
    private static final long keepAliveTime = 30L;
    /** 隊列容量 **/
    private static final int capacity = 10000;
    /** 告警數據處理線程池 **/
    private ExecutorService executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(capacity));
    /** kafka配置 **/
    private Properties props;
    /** kafka主題 **/
    private String topics;
    private KafkaConsumer<String, String> consumer = null;

    private KafkaConsumerClient(Properties props, String topics) {
        super("KafkaConsumerClient");
        this.props = props;
        this.topics = topics;
    }

    @Override
    public void run() {
        System.out.println("KafkaConsumerClient is running...");
        try {
            consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Collections.singletonList(topics));
        } catch (Exception e) {
            e.printStackTrace();
            return;
        }
        try {
            while (true) { // 循環讀取
                consumer.poll(timeout).forEach(record -> {
                    process(record);
                });
                commitOffset();
            }
        } finally {
            System.out.println("KafkaConsumerClient terminated for some unexpected exception!");
            consumer.close();
        }
    }

    /**
     * 提交偏移
     */
    private void commitOffset() {
        try {
            consumer.commitAsync();
        } catch (Exception e) {
            System.out.println("異步提交失敗,嘗試主動提交。。。");
            consumer.commitSync();
        }
    }

    /**
     * 處理數據
     * 
     * @param record
     */
    private void process(ConsumerRecord<String, String> record) {
        try {
            executor.submit(new KafkaDataProcessor(record));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void doStart(Properties props, String topics) {
        KafkaConsumerClient client = new KafkaConsumerClient(props, topics);
        // 異常退出后自動重啟
        client.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {

            @Override
            public void uncaughtException(Thread t, Throwable throwable) {
                System.out.println("KafkaConsumerClient異常退出,重啟中。。。");
                try {
                    Thread.sleep(1000 * 5); // 等待5秒鍾
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                KafkaConsumerClient.doStart(props, topics);
            }
        });
        client.start();
    }
}
View Code

 

數據處理線程:KafkaDataProcessor.java

/**
 * 處理線程
 * @author lichmama
 *
 */
public class KafkaDataProcessor extends Thread {
    private ConsumerRecord<String, String> record;

    public KafkaDataProcessor(ConsumerRecord<String, String> record) {
        this.record = record;
    }

    @Override
    public void run() {
        System.out.println(String.format("topic:%s, partition:%d, offset:%d, message:%s", 
                record.topic(), record.partition(), record.offset(), record.value()));
    }

}

 

啟動程序:StartKafkaConsumer.java

public class StartKafkaConsumer {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.load(new FileInputStream("consumer-config.properties"));
        String topics = (String) props.remove("topics");
        // 啟動消費線程
        KafkaConsumerClient.doStart(props, topics);
    }
}

 

kafka消費者配置:consumer-config.properties

##-*- 消費者配置 -*-##
#kafka集群地址
bootstrap.servers=localhost:9092
#消費者歸屬組ID
group.id=test_group
#單次最大拉取記錄數
max.poll.records=20
#關閉自動提交
enable.auto.commit=false
#key反序列化類名
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#value反序列化類名
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

#訂閱主題 
topics=test

 #本例使用JDK8開發,kafka-clients版本為0.10.2.0

根據實際情況來調整線程池的隊列長度、線程數以及單次最大拉取記錄數,才能確保程序運行良好,否則可能會造成告警丟失。

因為Kafka有分區的概念,每個topic可能有N個partition,為了更進一步提高消費速度可以同時啟動M(M<=N)個KafkaConsumerClient。需要特別說明的是,如果M>N的話,那么多余的Client是無法Poll到數據的,這是Kafka的特性所決定的

 

本篇到這基本就結束了,快把程序跑起來看看效果吧。😄


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM