Kafka多線程消費


本文簡單介紹下如何使用多線程消費kafka

注: 以下示例采用Kafka版本2.2

消費者配置

消費者從Kafka讀取消息,需要考慮以下消費者配置。

參數 說明
max.poll.records(default=500) 消費者單次獲取的最大消息條數
fetch.max.bytes(default=52428800) 服務器應為獲取請求返回的最大數據量。記錄由消費者分批獲取,如果獲取的第一個非空分區中的第一個記錄批次大於該值,則仍會返回該記錄批次,以確保消費者可以取得進展。因此,這不是絕對最大值。代理接受的最大記錄批量大小是通過message.max.bytes(broker config) 或max.message.bytes(topic config) 定義的。請注意,消費者並行執行多次提取。
session.timeout.ms(default=10000) 消費者定期向broker發送心跳,如果在此會話超時到期之前broker沒有收到心跳,則broker將從組中刪除消費者,並啟動重新平衡
max.poll.interval.ms(default=300000) 消費者兩次調用poll()之間的最大延遲,如果超過這個時間,則broker將從組中刪除消費者並啟動重新平衡
heartbeat.interval.ms(default=3000) 定義消費者發送心跳的頻率

Rebalance

什么情況會觸發重新平衡?

Kafka在管理消費者組時,只要消費者組成員發生變化或消費者組訂閱發生變化,就會觸發分區重新平衡。比如:

  • topic 添加了新的分區

  • 一個消費者加入一個組:部署了新的程序,使用了一樣的groupId

  • 一個消費者離開一個組:

    • max.poll.interval.ms 超時,未及時處理輪詢記錄
    • session.timeout.ms超時,由於應用程序崩潰或者網絡錯誤,沒有發送心跳
    • 消費者關閉,服務停掉

重新平衡該怎么做?

如果是啟用自動偏移提交,您不必擔心組重新平衡,一切都由 poll 方法自動完成。但是,如果您禁用自動偏移提交並手動提交,你需要在發送組請求之前提交偏移量。您可以通過兩種方式執行此操作:

  1. 在處理完一批消息后執行commitSync()

       while (true) {
    	   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    	   for (ConsumerRecord<String, String> record : records) {
    		   buffer.add(record);
    	   }
    	   if (buffer.size() >= minBatchSize) {
    		   insertIntoDb(buffer);
    		   // 在入庫之后,提交offset之前失敗的話,也會導致重復消費
    		   consumer.commitSync();
    		   buffer.clear();
    	   }
      }
    
  2. 實現ConsumerRebalanceListener,在分區即將被撤銷時得到通知,並在此時提交相應的offset。

    public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    private Consumer<?,?> consumer;
    
    public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
       this.consumer = consumer;
    }
       
    @Override
     public void onPartitionsRevoked(Collection<TopicPartition> partitions){
      // 提交offset
      for(TopicPartition partition: partitions)
       saveOffsetInExternalStore(consumer.position(partition));
      }
    
    }
    

第一種方法更簡單,但處理速度非常快的情況可能會導致偏移提交過於頻繁。第二種方法更有效,並且對於完全分離的消費和處理是必要的。

示例代碼

思路:主線程拉取一批Kafka消息,以分區為最小粒度創建任務,交給線程池處理,每個任務處理一個分區的數據,主線程輪詢任務消費情況,提交offset。

創建任務

package com.mirai.boot.kafka;  
  
import lombok.extern.slf4j.Slf4j;  
import org.apache.kafka.clients.consumer.ConsumerRecord;  
  
import java.util.List;  
import java.util.concurrent.CompletableFuture;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.atomic.AtomicLong;  
import java.util.concurrent.locks.ReentrantLock;  
  
/**  
 * @author mirai  
 * @version 2021/9/23  
 * @since 1.8  
 */
@Slf4j  
public class MyConsumerTask implements Runnable {  
    private final List<ConsumerRecord<String, String>> records;  
  
 private volatile boolean stopped = false;  
  
 private volatile boolean started = false;  
  
 private volatile boolean finished = false;  
  
 private final CompletableFuture<Long> completion = new CompletableFuture<>();  
  
 private final ReentrantLock startStopLock = new ReentrantLock();  
  
 private final AtomicLong currentOffset = new AtomicLong();  
  
 public MyConsumerTask(List<ConsumerRecord<String, String>> records) {  
        this.records = records;  
 }  
  
    @Override  
 public void run() {  
        startStopLock.lock();  
 try {  
            if (stopped) {  
                return;  
 }  
            started = true;  
 } finally {  
            startStopLock.unlock();  
 }  
  
        for (ConsumerRecord<String, String> record : records) {  
            if (stopped) {  
                break;  
 }  
            // process record here and make sure you catch all exceptions;  
 currentOffset.set(record.offset() + 1);  
 }  
        finished = true;  
 completion.complete(currentOffset.get());  
 }  
  
    public long getCurrentOffset() {  
        return currentOffset.get();  
 }  
  
    public void stop() {  
        startStopLock.lock();  
 try {  
            this.stopped = true;  
 if (!started) {  
                finished = true;  
 completion.complete(currentOffset.get());  
 }  
        } finally {  
            startStopLock.unlock();  
 }  
    }  
  
    public long waitForCompletion() {  
        try {  
            return completion.get();  
 } catch (InterruptedException | ExecutionException e) {  
            return -1;  
 }  
    }  
  
    public boolean isFinished() {  
        return finished;  
 }  
}

實現 ConsumerRebalanceListener

package com.mirai.boot.kafka.demo;  
  
import lombok.AllArgsConstructor;  
import lombok.extern.slf4j.Slf4j;  
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;  
import org.apache.kafka.clients.consumer.KafkaConsumer;  
import org.apache.kafka.clients.consumer.OffsetAndMetadata;  
import org.apache.kafka.common.TopicPartition;  
  
import java.util.Collection;  
import java.util.HashMap;  
import java.util.Map;  
  
/**  
 * @author mirai  
 */
@Slf4j  
@AllArgsConstructor  
public class MultiThreadedRebalancedListener implements ConsumerRebalanceListener {  
    private final KafkaConsumer<String, String> consumer;  
 private final Map<TopicPartition, MyConsumerTask<String, String>> activeTasks;  
 private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;  
  
 @Override  
 public void onPartitionsRevoked(Collection<TopicPartition> partitions) {  
  
        // 1. Stop all tasks handling records from revoked partitions  
 Map<TopicPartition, MyConsumerTask<String, String>> stoppedTask = new HashMap<>();  
 for (TopicPartition partition : partitions) {  
            MyConsumerTask<String, String> task = activeTasks.remove(partition);  
 if (task != null) {  
                task.stop();  
 stoppedTask.put(partition, task);  
 }  
        }  
  
        // 2. Wait for stopped tasks to complete processing of current record  
 stoppedTask.forEach((partition, task) -> {  
            long offset = task.waitForCompletion();  
 if (offset > 0) {  
                offsetsToCommit.put(partition, new OffsetAndMetadata(offset));  
 }  
        });  
  
 // 3. collect offsets for revoked partitions  
 Map<TopicPartition, OffsetAndMetadata> revokedPartitionOffsets = new HashMap<>();  
 partitions.forEach(partition -> {  
            OffsetAndMetadata offset = offsetsToCommit.remove(partition);  
 if (offset != null) {  
                revokedPartitionOffsets.put(partition, offset);  
 }  
        });  
  
 // 4. commit offsets for revoked partitions  
 try {  
            consumer.commitSync(revokedPartitionOffsets);  
 } catch (Exception e) {  
            log.warn("Failed to commit offsets for revoked partitions!");  
 }  
    }  
  
    @Override  
 public void onPartitionsAssigned(Collection<TopicPartition> partitions) {  
        consumer.resume(partitions);  
 }  
  
}

多線程消費

package com.mirai.boot.kafka.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author mirai
 * @version 2021/9/23
 * @since 1.8
 */
@Slf4j
public class MyMultiTreadConsumer implements Runnable {
    private final KafkaConsumer<String, String> consumer;
    private final Map<TopicPartition, MyConsumerTask<String, String>> activeTasks = new HashMap<>();
    private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private long lastCommitTime = System.currentTimeMillis();

    private final static ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2,
            0L
            , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
            r -> {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            },
            new ThreadPoolExecutor.DiscardPolicy()
    );

    public MyMultiTreadConsumer(Properties properties, String topic) {
        consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic)
                , new MultiThreadedRebalancedListener(consumer, activeTasks, offsetsToCommit));
    }

    @Override
    public void run() {
        try {
            while (!stopped.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                handleFetchedRecords(records);
                checkActiveTasks();
                commitOffsets();
            }
        } catch (WakeupException we) {
            if (!stopped.get()) {
                throw we;
            }
        } finally {
            consumer.close();
        }
    }

    private void handleFetchedRecords(ConsumerRecords<String, String> records) {
        if (records.count() > 0) {
            List<TopicPartition> partitionsToPause = new ArrayList<>();
            records.partitions().forEach(partition -> {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                MyConsumerTask<String, String> task = new MyConsumerTask<>(partitionRecords);
                partitionsToPause.add(partition);
                EXECUTOR.submit(task);
                activeTasks.put(partition, task);
            });
            consumer.pause(partitionsToPause);
        }
    }

    private void checkActiveTasks() {
        List<TopicPartition> finishedTasksPartitions = new ArrayList<>();
        activeTasks.forEach((partition, task) -> {
            if (task.isFinished()) {
                finishedTasksPartitions.add(partition);
            }
            long offset = task.getCurrentOffset();
            if (offset > 0) {
                offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
            }
        });
        finishedTasksPartitions.forEach(activeTasks::remove);
        consumer.resume(finishedTasksPartitions);
    }

    private void commitOffsets() {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - lastCommitTime > 5000) {
                if (!offsetsToCommit.isEmpty()) {
                    consumer.commitSync(offsetsToCommit);
                    offsetsToCommit.clear();
                }
                lastCommitTime = currentTimeMillis;
            }
        } catch (Exception e) {
            log.error("Failed to commit offsets!", e);
        }
    }
}

參考連接


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM