Kafka 核心 API ==> Consumer 消費者


一、Consumer 自動提交
  在 上文中介紹了Producer API的使用,現在我們已經知道如何將消息通過API發送到 Kafka 中了,那么現在的生產者/消費者模型就還差一位扮演消費者的角色了。因此,本文將介紹Consumer API的使用,使用API從Kafka中消費消息,讓應用成為一個消費者角色。
我們先得創建一個 Consumer實例,並指定相關配置項,有了這個實例對象后我們才能進行其他的操作。代碼示例:
/**
 * 創建Consumer實例
 */
public static KafkaConsumer<String, String> createConsumer() {
    HashMap<String, Object> conf = Maps.newHashMap();
    // 指定Kafka服務的ip地址及端口號
    conf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.182.128:9092");
    // 指定消息key的序列化器
    conf.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 指定消息value的序列化器
    conf.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 是否開啟自動提交
    conf.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    // 自動提交的間隔,單位毫秒
    conf.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    // 指定 GroupId,Kafka中的消費者需要在消費者組里
    conf.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    return new KafkaConsumer<>(conf);
}

在以上代碼中,可以看到設置了group.id這個配置項,這是一個Consumer的必要配置項,因為在Kafka中,Consumer需要位於一個Consumer Group里。具體如下圖所示:

在上圖中是一個Consumer消費一個Partition,是一對一的關系。但Consumer Group里可以只有一個Consumer,此時該Consumer可以消費多個Partition,是一對多的關系。如下圖所示:

一個 Consumer可以只消費一個 Partition,也可以消費多個 Partition,但需要注意的是多個 Consumer不能消費同一個 Partition

總結一下Consumer的注意事項:

  • 單個Partition的消息只能由Consumer Group中的某個Consumer來消費
  • Consumer從Partition中消費消息是順序的,默認從頭開始消費
  • 如果Consumer Group中只有一個Consumer,那么這個Consumer會消費所有Partition中的消息

  在Kafka中,當消費者消費數據后,需要提交數據的offset來告知服務端成功消費了哪些數據。然后服務端就會移動數據的offset,下一次消費的時候就是從移動后的offset位置開始消費。

  這樣可以在一定程度上保證數據是被消費成功的,並且由於數據不會被刪除,而只是移動數據的offset,這也保證了數據不易丟失。若消費者處理數據失敗時,只要不提交相應的offset,就可以在下一次重新進行消費。

  和數據庫的事務一樣,Kafka消費者提交offset的方式也有兩種,分別是自動提交和手動提交。在本例中演示的是自動提交,這也是消費數據最簡單的方式。自動提交需要在創建 Consumer 實例的時候增加如下配置:

// 是否開啟自動提交
conf.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

代碼如下:

/**
 * 消費者消費消息,自動提交
 */
public static void autoCommitOffset(List<String> topicNames) {

    // 創建consumer實例
    KafkaConsumer<String, String> consumer = createConsumer();

    // 訂閱一個或多個Topic
    consumer.subscribe(topicNames);

    // 輪循處理消息
    while (true) {
      // 從Topic中拉取數據,每1000毫秒拉取一次
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

      // 將records轉換為可迭代對象
      Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator();
      // 將數據遍歷出來
      while (recordIterator.hasNext()) {
        ConsumerRecord<String, String> record = recordIterator.next();
        System.out.printf("topic = %s, key = %s, val = %s \n",
                record.topic(), record.key(), record.value());
      }
    }
}

二、Consumer 手動提交

  自動提交的方式是最簡單的,但不建議在實際生產中使用,因為可控性不高。所以更多時候我們使用的是手動提交,但想要使用手動提交,就需要先關閉自動提交,修改配置項如下:
// 關閉自動提交
conf.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

  關閉自動提交后,就得在代碼中調用commit相關的方法來提交offset,主要就是兩個方法:commitAsynccommitSync,看方法名也知道一個是異步提交一個是同步提交。這里以commitAsync為例,實現思路主要是在發生異常的時候不要調用commitAsync方法,而在正常執行完畢后才調用commitAsync方法。代碼示例:

/**
 * 手動提交,適合一些特定的業務場景
 * 比如:數據存入數據庫成功則提交,失敗則重新對這條數據進行消費
 * 這樣不會丟失消息
 */
public static void manualCommitOffset(List<String> topicNames) {

    // 創建consumer實例
    KafkaConsumer<String, String> consumer = createConsumer();

    // 訂閱一個或多個Topic
    consumer.subscribe(topicNames);

    // 輪訓處理消息
    while (true) {
      // 從Topic中拉取數據,每1000毫秒拉取一次
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

      // 將records轉換為可迭代對象
      Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator();
      // 將數據遍歷出來
      while (recordIterator.hasNext()) {
        ConsumerRecord<String, String> record = recordIterator.next();

        try {
          // 模擬將數據寫入數據庫
          Thread.sleep(1000);
          System.out.println("save to db...");
          System.out.printf("topic = %s, key = %s, val = %s \n",
                  record.topic(), record.key(), record.value());
        } catch (Exception e) {
          // 寫入失敗則不要調用commit,這樣就相當於起到回滾的作用,
          // 下次消費還是從之前的offset開始消費
          e.printStackTrace();
        }
      }
      // 寫入成功則調用commit相關方法去手動提交offset
      consumer.commitAsync();
    }
}

三、SpringBoot 集成 Kafka

1、添加依賴
implementation('org.springframework.kafka:spring-kafka:2.4.3.RELEASE')

注意:該包的版本與 SpringBoot 版本有匹配性,此版本匹配 SpringBoot 的 2.2.x 的版本

2、配置 application.yml

server:
  port: 8899

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      # 發生錯誤后,消息重發的次數。
      retries: 0
      #當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。
      batch-size: 16384
      # 設置生產者內存緩沖區的大小。
      buffer-memory: 33554432
      # 鍵的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生產者在成功寫入消息之前不會等待任何來自服務器的響應。
      # acks=1 : 只要集群的首領節點收到消息,生產者就會收到一個來自服務器成功響應。
      # acks=all :只有當所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應。
      acks: 1
    consumer:
      # 自動提交的時間間隔 在spring boot 2.X 版本中這里采用的是值的類型為Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下該作何處理:
      # latest(默認值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄)
      # earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄
      auto-offset-reset: earliest
      # 是否自動提交偏移量,默認值是true,為了避免出現重復數據和數據丟失,可以把它設置為false,然后手動提交偏移量
      enable-auto-commit: false
      # 鍵的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在偵聽器容器中運行的線程數。
      concurrency: 5
      #listner負責ack,每調用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

3、Controller+主啟動類

@SpringBootApplication
public class KafukaSpringBootApplication {

  public static void main(String[] args) {
    SpringApplication.run(KafukaSpringBootApplication.class);
  }
}
主啟動類
@RestController
public class ProducerController {

  @Autowired
  private CommonProducer producer;

  @RequestMapping("/send")
  public String sendMsg(@RequestParam String msg){

    producer.sendMsg(msg);
    return "success";
  }
}
Controller

4、新建 Producer

@Component
@Slf4j
public class CommonProducer {

  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;

  public void sendMsg(String msg) {

    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(KafkaConst.TOPIC_NAME, msg);

    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

      @Override
      public void onSuccess(SendResult<String, String> result) {

        log.info("producer send success result = {}", result.toString());
      }

      @Override
      public void onFailure(Throwable throwable) {
        log.info("producer send failed. msg={}", throwable.getMessage());
      }
    });
  }
}

5、新建 Consumer

@Slf4j
@Component
public class CommonConsumer {

  @KafkaListener(topics = KafkaConst.TOPIC_NAME, groupId = KafkaConst.GROUP_ONE)
  public void consumeForGroupOne(ConsumerRecord<String, String> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    Optional<String> msgOptional = Optional.ofNullable(record.value());
    if (msgOptional.isPresent()) {
      Object msg = msgOptional.get();
      log.info("consumeForGroupOne start: topic={}, msg={}", topic, msg);
      ack.acknowledge();
    }
  }

  @KafkaListener(topics = KafkaConst.TOPIC_NAME, groupId = KafkaConst.GROUP_TWO)
  public void consumeForGroupTwo(ConsumerRecord<String, String> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    Optional<String> msgOptional = Optional.ofNullable(record.value());
    if (msgOptional.isPresent()) {
      Object msg = msgOptional.get();
      log.info("consumeForGroupTwo start: topic={}, msg={}", topic, msg);
      ack.acknowledge();
    }
  }
}

6、設置 Topic名和 GroupId

public interface KafkaConst {

  String TOPIC_NAME = "hello-kafka";

  String GROUP_ONE = "test-group1";

  String GROUP_TWO = "test-group2";
}

通過 HTTP 請求,我們就可以在控制台看到消息生產和發送的信息了,如下圖所示:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM