kafka多線程消費及處理和手動提交處理方案設計[轉]


轉自:http://blog.csdn.net/haoyifen/article/details/54692503

kafka與其他消息隊列不同的是, kafka的消費者狀態由外部( 消費者本身或者類似於Zookeeper之類的外部存儲 )進行維護, 所以kafka的消費就更加靈活, 但是也帶來了很多的問題, 因為客戶端消費超時被判定掛掉而消費者重新分配分區, 導致重復消費, 或者客戶端掛掉而導致重復消費等問題.

本文內容簡介

kafka的消費者有很多種不同的用法及模型. * 本文着重探討0.9版本及之后的kafka新consumer API的手動提交和多線程的使用* . 對於外部存儲offset, 手動偏移設置, 以及手動分區分配等不同消費者方案, 將在其他文章中介紹.

消費者在單線程下的使用

下面介紹單線程情況下自動提交和手動提交的兩種消費者

1. 自動提交, 單線程poll, 然后消費

        Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("group.id", "autoCommitGroup"); //自動提交 props.put("enable.auto.commit", "true"); //自動提交時間間隔 props.put("auto.commit.interval.ms", "1000"); //key和value的序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }

 

offset自動提交會讓人產生誤會, 其實並不是在后台提交, 而是在poll時才會進行offset提交.

2. 手動提交, 單線程poll, 讀取一定量的數據后才提交offset

        Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("group.id", "manualOffsetControlTest"); //手動提交 props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); //每次處理200條消息后才提交 final int minBatchSize = 200; //用於保存消息的list ArrayList<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } //如果讀取到的消息滿了200條, 就進行處理 if (buffer.size() >= minBatchSize) { doSomething(buffer); //處理完之后進行提交 consumer.commitAsync(); //清除list, 繼續接收 buffer.clear(); } }

 

新kafka消費者的版本特性

在接下來的探討之前, 需要簡單介紹一下kafka消費者的特性.

  1. kafka的0.9版本中重寫了consumer API
  2. consumer維護了消費者當前消費狀態, 不是線程安全的
  3. 新的consumer基於單線程模型, offset自動提交在poll方法中進行, 0.9–0.10.0.1, 客戶端的心跳也是在poll中進行, 在0.10.1.0版本中, 客戶端心跳在后台異步發送了
  4. 0.9版本不能設置每回poll返回的最大數據量, 所以poll一次會返回上一次消費位置到最新位置的數據, 或者最大的數據量. 在0.10.0.1版本及之后, 可以通過在consumer的props中設置max.poll.records來限制每回返回的最大數據條數.

我的設計

我所使用的kafka版本是0.10.0.1, 所以使用的是新版本的consumer API, 可以限制每回返回的最大數據條數, 但是心跳和自動提交都是在poll中進行的.

為了防止前面單線程中, 因為消息處理時間過長, poll的時間間隔很長, 導致不能及時在poll發送心跳, 且offset也不能提交, 客戶端被超時被判斷為掛掉, 未提交offset的消息會被其他消費者重新消費.

我的設計:

  1. 首先使用max.poll.records來限制每次poll返回的最大消息量
  2. 將消息的poll和消息的處理分隔開, 盡快的poll, 以發送心跳
  3. 每個處理線程只負責一個分區的處理, 當處理到一定的數量或者距離上一次處理一定的時間間隔后, 由poll線程進行提交offset.

代碼架構如下圖所示: 
假設有兩個消費者線程MsgReceiver, 分別分到了分區1和分區2, 分區3和分區4 
消費者模型

  1. 有多個消費者線程, 在while循環中poll消息
  2. 消費者根據分區將消息交給對應的record_processor線程進行處理, 即一個record_processor線程只處理一個分區的消息
  3. record_processor處理線程處理了一定條數的消息或者距離上一次處理消息過去一定時間后, 將當前分區的偏移量放至到consumer_queue中
  4. 消費者record_processor在poll前先讀取commit_queue中的內容, 如果有的話, 則提交當中的偏移信息到kafka. 然后繼續poll消息

代碼實現

1. 消費者任務 MsgReceiver

public class MsgReceiver implements Runnable { private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class); private BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitQueue = new LinkedBlockingQueue<>(); private Map<String, Object> consumerConfig; private String alarmTopic; private ConcurrentHashMap<TopicPartition, RecordProcessor> recordProcessorTasks; private ConcurrentHashMap<TopicPartition, Thread> recordProcessorThreads; public MsgReceiver(Map<String, Object> consumerConfig, String alarmTopic, ConcurrentHashMap<TopicPartition, RecordProcessor> recordProcessorTasks, ConcurrentHashMap<TopicPartition, Thread> recordProcessorThreads) { this.consumerConfig = consumerConfig; this.alarmTopic = alarmTopic; this.recordProcessorTasks = recordProcessorTasks; this.recordProcessorThreads = recordProcessorThreads; } @Override public void run() { //kafka Consumer是非線程安全的,所以需要每個線程建立一個consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig); consumer.subscribe(Arrays.asList(alarmTopic)); //檢查線程中斷標志是否設置, 如果設置則表示外界想要停止該任務,終止該任務 try { while (!Thread.currentThread().isInterrupted()) { try { //查看該消費者是否有需要提交的偏移信息, 使用非阻塞讀取 Map<TopicPartition, OffsetAndMetadata> toCommit = commitQueue.poll(); if (toCommit != null) { logger.debug("commit TopicPartition offset to kafka: " + toCommit); consumer.commitSync(toCommit); } //最多輪詢100ms ConsumerRecords<String, String> records = consumer.poll(100); if (records.count() > 0) { logger.debug("poll records size: " + records.count()); } for (final ConsumerRecord<String, String> record : records) { String topic = record.topic(); int partition = record.partition(); TopicPartition topicPartition = new TopicPartition(topic, partition); RecordProcessor processTask = recordProcessorTasks.get(topicPartition); //如果當前分區還沒有開始消費, 則就沒有消費任務在map中 if (processTask == null) { //生成新的處理任務和線程, 然后將其放入對應的map中進行保存 processTask = new RecordProcessor(commitQueue); recordProcessorTasks.put(topicPartition, processTask); Thread thread = new Thread(processTask); thread.setName("Thread-for " + topicPartition.toString()); logger.info("start Thread: " + thread.getName()); thread.start(); recordProcessorThreads.put(topicPartition, thread); } //將消息放到 processTask.addRecordToQueue(record); } } catch (Exception e) { e.printStackTrace(); logger.warn("MsgReceiver exception " + e + " ignore it"); } } } finally { consumer.close(); } } }

 

2. 消息處理任務 RecordProcessor

public class RecordProcessor implements Runnable { private static Logger logger = LoggerFactory.getLogger(RecordProcessor.class); //保存MsgReceiver線程發送過來的消息 private BlockingQueue<ConsumerRecord<String, String>> queue = new LinkedBlockingQueue<>(); //用於向consumer線程提交消費偏移的隊列 private BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitQueue; //上一次提交時間 private LocalDateTime lastTime = LocalDateTime.now(); //消費了20條數據, 就進行一次提交 private long commitLength = 20L; //距離上一次提交多久, 就提交一次 private Duration commitTime = Duration.standardSeconds(2); //當前該線程消費的數據條數 private int completeTask = 0; //保存上一條消費的數據 private ConsumerRecord<String, String> lastUncommittedRecord; //用於保存消費偏移量的queue, 由MsgReceiver提供 public RecordProcessor(BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitQueue) { this.commitQueue = commitQueue; } @Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { //有時間限制的poll, consumer發送消費過來的隊列. 每個處理線程都有自己的隊列. ConsumerRecord<String, String> record = queue.poll(100, TimeUnit.MICROSECONDS); if (record != null) { //處理過程 process(record); //完成任務數加1 this.completeTask++; //保存上一條處理記錄 lastUncommittedRecord = record; } //提交偏移給consumer commitToQueue(); } } catch (InterruptedException e) { //線程被interrupt,直接退出 logger.info(Thread.currentThread() + "is interrupted"); } } private void process(ConsumerRecord<String, String> record) { System.out.println(record); } //將當前的消費偏移量放到queue中, 由MsgReceiver進行提交 private void commitToQueue() throws InterruptedException { //如果沒有消費或者最后一條消費數據已經提交偏移信息, 則不提交偏移信息 if (lastUncommittedRecord == null) { return; } //如果消費了設定的條數, 比如又消費了commitLength消息 boolean arrivedCommitLength = this.completeTask % commitLength == 0; //獲取當前時間, 看是否已經到了需要提交的時間 LocalDateTime currentTime = LocalDateTime.now(); boolean arrivedTime = currentTime.isAfter(lastTime.plus(commitTime)); //如果消費了設定條數, 或者到了設定時間, 那么就發送偏移到消費者, 由消費者非阻塞poll這個偏移信息隊列, 進行提交 if (arrivedCommitLength || arrivedTime) { lastTime = currentTime; long offset = lastUncommittedRecord.offset(); int partition = lastUncommittedRecord.partition(); String topic = lastUncommittedRecord.topic(); TopicPartition topicPartition = new TopicPartition(topic, partition); logger.debug("partition: " + topicPartition + " submit offset: " + (offset + 1L) + " to consumer task"); Map<TopicPartition, OffsetAndMetadata> map = Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset + 1L)); commitQueue.put(map); //置空 lastUncommittedRecord = null; } } //consumer線程向處理線程的隊列中添加record public void addRecordToQueue(ConsumerRecord<String, String> record) { try { queue.put(record); } catch (InterruptedException e) { e.printStackTrace(); } } }

 

3. 管理對象

負責啟動消費者線程MsgReceiver, 保存消費者線程MsgReceiver, 保存處理任務和線程RecordProcessor, 以及銷毀這些線程

public class KafkaMultiProcessorTest { private static final Logger logger = LoggerFactory.getLogger(KafkaMultiProcessor.class); //訂閱的topic private String alarmTopic; //brokers地址 private String servers; //消費group private String group; //kafka消費者配置 private Map<String, Object> consumerConfig; private Thread[] threads; //保存處理任務和線程的map private ConcurrentHashMap<TopicPartition, RecordProcessor> recordProcessorTasks = new ConcurrentHashMap<>(); private ConcurrentHashMap<TopicPartition, Thread> recordProcessorThreads = new ConcurrentHashMap<>(); public static void main(String[] args) { KafkaMultiProcessorTest test = new KafkaMultiProcessorTest(); //....省略設置topic和group的代碼 test.init(); } public void init() { consumerConfig = getConsumerConfig(); logger.debug("get kafka consumerConfig: " + consumerConfig.toString()); //創建threadsNum個線程用於讀取kafka消息, 且位於同一個group中, 這個topic有12個分區, 最多12個consumer進行讀取 int threadsNum = 3; logger.debug("create " + threadsNum + " threads to consume kafka warn msg"); threads = new Thread[threadsNum]; for (int i = 0; i < threadsNum; i++) { MsgReceiver msgReceiver = new MsgReceiver(consumerConfig, alarmTopic, recordProcessorTasks, recordProcessorThreads); Thread thread = new Thread(msgReceiver); threads[i] = thread; thread.setName("alarm msg consumer " + i); } //啟動這幾個線程 for (int i = 0; i < threadsNum; i++) { threads[i].start(); } logger.debug("finish creating" + threadsNum + " threads to consume kafka warn msg"); } //銷毀啟動的線程 public void destroy() { closeRecordProcessThreads(); closeKafkaConsumer(); } private void closeRecordProcessThreads() { logger.debug("start to interrupt record process threads"); for (Map.Entry<TopicPartition, Thread> entry : recordProcessorThreads.entrySet()) { Thread thread = entry.getValue(); thread.interrupt(); } logger.debug("finish interrupting record process threads"); } private void closeKafkaConsumer() { logger.debug("start to interrupt kafka consumer threads"); //使用interrupt中斷線程, 在線程的執行方法中已經設置了響應中斷信號 for (int i = 0; i < threads.length; i++) { threads[i].interrupt(); } logger.debug("finish interrupting consumer threads"); } //kafka consumer配置 private Map<String, Object> getConsumerConfig() { return ImmutableMap.<String, Object>builder() .put("bootstrap.servers", servers) .put("group.id", group) .put("enable.auto.commit", "false") .put("session.timeout.ms", "30000") .put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") .put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") .put("max.poll.records", 1000) .build(); } public void setAlarmTopic(String alarmTopic) { this.alarmTopic = alarmTopic; } public void setServers(String servers) { this.servers = servers; } public void setGroup(String group) { this.group = group; } }

 

不足

上面的代碼還有不足, 可以看到處理任務和線程是保存在map中, 如果consumer因為有新機器的上線而重新分配分區, 而被剝奪了某個分區的消費, 對應的處理任務和線程並沒有進行響應的銷毀. 所以我們使用org.apache.kafka.clients.consumer.ConsumerRebalanceListener來對分區的調整進行響應.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM