Apache Kafka(九)- Kafka Consumer 消費行為


1. Poll Messages

Kafka Consumer 中消費messages時,使用的是poll模型,也就是主動去Kafka端取數據。其他消息管道也有的是push模型,也就是服務端向consumer推送數據,consumer僅需等待即可。

Kafka Consumerpoll模型使得consumer可以控制從log的指定offset去消費數據、消費數據的速度、以及replay events的能力。

Kafka Consumer poll模型工作如下圖:

 

 

  • ·       Consumer 調用.poll(Duration timeout) 方法,向broker請求數據
  • ·       若是broker端有數據則立即返回;否則在timeout時間后返回empty

 

我們可以通過參數控制 Kafka Consumer 行為,主要有:

  • ·       Fetch.min.bytes(默認值是1

o   控制在每個請求中,至少拉取多少數據

o   增加此參數可以提高吞吐並降低請求的數目,但是代價是增加延時

 

  • ·       Max.poll.records(默認是500

o   控制在每個請求中,接收多少條records

o   如果消息普遍都比較小而consumer端又有較大的內存,則可以考慮增大此參數

o   最好是監控在每個請求中poll了多少條消息

 

  • ·       Max.partitions.fetch.bytes(默認為1MB

o   Broker中每個partition可返回的最多字節

o   如果目標端有100多個partitions,則需要較多內存

 

  • ·       Fetch.max.bytes(默認50MB

o   對每個fetch 請求,可以返回的最大數據量(一個fetch請求可以覆蓋多個partitions

o   Consumer並行執行多個fetch操作

 

默認情況下,一般不建議手動調整以上參數,除非我們的consumer已經達到了默認配置下的最高的吞吐,且需要達到更高的吞吐。

 

2. Consumer Offset Commit 策略

在一個consumer 應用中,有兩種常見的committing offsets的策略,分別為:

  • ·       (較為簡單)enable.auto.commit = true:自動commit offsets,但必須使用同步的方式處理數據
  • ·       (進階)enable.auto.commit = false:手動commit offsets

 

在設置enable.auto.commit = true時,考慮以下代碼:

while(true) {
     List<Records> batch = consumer.poll(Duration.ofMillis(100));
     doSomethingSynchronous(batch);
 }

 

一個Consumer 每隔100ms poll一次消息,然后以同步地方式處理這個batch的數據。此時offsets 會定期自動被commit,此定期時間由 auto.commit.interval.ms 決定,默認為 5000,也就是在每次調用 .poll() 方法 5 秒后,會自動commit offsets

但是如果在處理數據時用的是異步的方式,則會導致“at-most-once”的行為。因為offsets可能會在數據被處理前就被commit

所以對於新手來說,使用 enable.auto.commit = true 可能是有風險的,所以不建議一開始就使用這種方式

 

若設置 enable.auto.commit = false,考慮以下代碼:

while(true) {
     List<Records> batch = consumer.poll(Duration.ofMillis(100));
     if isReady(batch){
         doSomethingSynchronous(batch);
         consumer.commitSync();
     }
 }

  

此例子明確指示了在同步地處理了數據后,再主動commit offsets。這樣我們可以控制在什么條件下,去commit offsets。一個比較典型的場景為:將接收的數據讀入緩存,然后flush 緩存到一個數據庫中,最后再commit offsets

 

3. 手動Commit Offset 示例

首先我們關閉自動commit offsets

// disable auto commit of offsets
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

指定每個請求最多接收10records,便於測試:
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); 

添加以下代碼邏輯:

public static void main(String[] args) throws IOException {
     Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
     RestHighLevelClient client = createClient();
 
     // create Kafka consumer
     KafkaConsumer<String, String> consumer = createConsumer("kafka_demo");
 
     // poll for new data
     while(true){
         ConsumerRecords<String, String> records =
                 consumer.poll(Duration.ofMinutes(100));
 
         logger.info("received " + records.count() + "records");
         for(ConsumerRecord record : records) {
 
             // construct a kafka generic ID
             String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset();
 
             // where we insert data into ElasticSearch
             IndexRequest indexRequest = new IndexRequest(
                     "kafkademo"
             ).id(kafka_generic_id).source(record.value(), XContentType.JSON);
 
             IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
             String id = indexResponse.getId();
 
             logger.info(id);
 
             try {
                 Thread.sleep(10); // introduce a small delay
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
 
         logger.info("Committing offsets...");
         consumer.commitSync();                      // commit offsets manually
         logger.info("Offsets have been committed");
 
         }
     }

這里我們在處理每次獲取的10records后(也就是for 循環完整執行一次),手動執行一次offsets commit。打印日志記錄為:

 

手動停止consumer 程序后,可以看到最后的committed offsets165

  

使用consumer-group cli 也可以驗證當前committed offsets165

  

4. Performance Improvement using Batching

在這個例子中,consumer 限制每次poll 10條數據,然后每條依次處理(插入elastic search)。此方法效率較低,我們可以通過使用 batching 的方式增加吞吐。這里實現的方式是使用 elastic search API 提供的BulkRequest,基於之前的代碼,修改如下:

public static void main(String[] args) throws IOException {
     Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
     RestHighLevelClient client = createClient();
 
     // create Kafka consumer
     KafkaConsumer<String, String> consumer = createConsumer("kafka_demo");
 
     // poll for new data
     while(true){
         ConsumerRecords<String, String> records =
                 consumer.poll(Duration.ofMinutes(100));
 
         // bulk request
         BulkRequest bulkRequest = new BulkRequest();
 
         logger.info("received " + records.count() + "records");
         for(ConsumerRecord record : records) {
 
             // construct a kafka generic ID
             String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset();
 
             // where we insert data into ElasticSearch
             IndexRequest indexRequest = new IndexRequest(
                     "kafkademo"
             ).id(kafka_generic_id).source(record.value(), XContentType.JSON);
 
             IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
 
             // add to our bulk request (takes no time)
             bulkRequest.add(indexRequest);
 
 
             //String id = indexResponse.getId();
             //logger.info(id);
 
             try {
                 Thread.sleep(10); // introduce a small delay
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
 
         // bulk response
         BulkResponse bulkItemResponses = client.bulk(bulkRequest, RequestOptions.DEFAULT);
 
         logger.info("Committing offsets...");
         consumer.commitSync();                      // commit offsets manually
         logger.info("Offsets have been committed");
 
         }
     }

   

可以看到,consumerpoll到記錄后,並不會一條條的向elastic search 發送,而是將它們放入一個BulkRequest,並在for循環結束后發送。在發送完畢后,再手動commit offsets

 

執行結果為:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM