Java操作Kafka客戶端及Springboot整合Kafka


1.生產者的基本實現

1.1 引入依賴

  <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <!--版本號根據kafka安裝包版本指定,比如kafka_2.12-2.0.0-->
            <version>2.0.0</version>
   </dependency>

1.2 具體代碼實現

同步發送消息

  private final static String TOPIC_NAME = "my-replicated-topic";
	public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        //1. 設置參數
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.21.107:9092,192.168.21.108:9092,192.168.21.109:9092");
        //把發送的key從字符串序列化為字符數組
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //把發送消息的value從字符串序列化為字節數組
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //發送消息的客戶端
        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        User user = new User();
        user.setErpId(500000L);
        user.setErpName("張三三");
        user.setRealName("張三三");
        user.setCreateTime(new Date());
        user.setUpdateTime(new Date());

        //指定發送分區
    		//key: 作用是決定了往哪個分區上發送消息,value表示具體要發送的消息內容
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME,
                user.getErpId().toString(), JSON.toJSONString(user));

        //發送消息,得到消息發送的元數據並輸出
        RecordMetadata metadata = producer.send(producerRecord).get();
        System.out.println("同步方式發送消息結果:" + "topic-" +
                metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());
    }
 }

在同步發送消息的場景下,如果生產者發送消息三次沒有收到ack,生產者會阻塞,阻塞到3s的時間,如果還沒有收到消息,會進行重試。重試的次數為3次。

異步發送消息

//異步發送消息
producer.send(producerRecord, new Callback() {
  @Override
  public void onCompletion(RecordMetadata metadata, Exception e) {
    if(e!=null){
      System.out.println("發送消息失敗:"+e.getMessage());
    }
    if(metadata!=null){
      System.out.println("異步方式發送消息結果:" + "topic-" +
                         metadata.topic() + "|partition-"
                         + metadata.partition() + "|offset-" + metadata.offset());
    }
  }
});


// 因為異步提交后主線程可能已經停止,沒有拿到onCompletion的回調,因此可讓主線程阻塞一段時間看到效果
Thread.sleep(100000L);

1.3 生產者中ack的配置

在同步發送的前提下,生產者在獲得集群返回的ack之前會一直阻塞。集群中ack共有三個配置:

  • ack = 0, kafka-cluster不需要任何broker收到消息,就立即返回ack給生產者,同時,這也是最容易丟消息的,但效率也是最高的。
  • ack = 1(default), 多副本之間的leader已經收到消息,並且把消息寫入到本地log中,才會返回ack給生產者,性能和安全性也是最均衡的。
  • ack = -1 /all。里面有默認的配置min.insync.replicas=2(默認為1,推薦配置大於等於2), 此時就需要leader和一個follower同步完后,才會返回ack給生產者(此時集群中有2個 broker已完成數據的接收),這種方式最安全,但性能最差。

對應於ack和重試(如果沒有收到ack,就開始重試)的配置

//ack和重試
props.put(ProducerConfig.ACKS_CONFIG,"1");

/**
         * 發送失敗會重試,默認重試間隔100ms,重試能保證消息發送的可靠性,但是也可能造成消息重復發送,
         * 比如網絡抖動,所以需要在接收者那邊做好消息接收的冪等性處理
         */
props.put(ProducerConfig.RETRIES_CONFIG,3);
//重試間隔設置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);

1.4 生產者發送消息的緩沖區配置

  • kafka默認會創建一個消息緩沖區,用來存放要發送的消息,緩沖區大小為32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  • kafka本地線程會去緩沖區中一次拉取16k的數據,發送到broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  • 如果線程拉不到16k的數據,間隔10ms也會將已拉到的數據發送給broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

2. 消費者的實現

2.1 消費者的基本實現

private static final  String TOPIC_NAME = "my-replicated-topic";
private static final String CONSUMER_GROUP_NAME = "testGroup";

public static void main(String[] args) {
        Properties props = new Properties();
        			     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.21.107:9092,192.168.21.108:9092,192.168.21.109:9092");
        //消費分組名
        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

        //1.創建一個消費者的客戶端信息
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
        //2. 消費者訂閱主題列表
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true){
            //3. poll() API是拉取消息的長輪詢
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String,String> record:records){
                //4. 打印消息
                System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
                record.offset(), record.key(), record.value());
            }

        }

2.2 消費者自動提交和手動提交offset

消費者無論是自動提交還是手動提交,都需要把所屬的消費組+消費的某個主題+消費的某個分區及消費的偏移量,這樣的信息提交到集群的_consumer_offsets主題里面

自動提交

消費者poll消息下來以后就會自動提交offset

//是否自動提交offset,默認就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
//自動提交offset的間隔時間
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

自動提交會丟消息。因為消費者在消費前提交offset,有可能提交完后還沒消費時消費者掛了。

手動提交

需要把自動提交的配置改為false

//是否自動提交offset,默認就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

手動同步提交:在消費完消息后調用同步提交的方法,當集群返回ack前一直阻塞,返回ack后表示提交完成,執行之后的邏輯

 while (true){
   //3. poll() API是拉取消息的長輪詢
   ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
   for (ConsumerRecord<String,String> record:records){
     //4. 打印消息
     System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
                       record.offset(), record.key(), record.value());
   }
   //所有的消息都已消費完
   if(records.count()>0){
     //手動同步提交offset,當前線程會阻塞直到offset提交成功
     consumer.commitSync();
   }

 }

手動異步提交:在消息消費完后提交,不需要等待集群ack,直接執行之后的邏輯,可以設置一個毀掉方法,供集群調用

  while (true){
            //3. poll() API是拉取消息的長輪詢
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String,String> record:records){
                //4. 打印消息
                System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
                record.offset(), record.key(), record.value());
            }
            //所有的消息都已消費完
            if(records.count()>0){
                //手動異步提交offset,當前線程提交offset不會阻塞,可以繼續執行后面的程序邏輯
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if (exception != null) {
                            System.err.println("Commit failed for " + offsets);
                            System.err.println("Commit failed exception: " +
                                    exception.getStackTrace());
                        }
                    }
                });
            }

        }

2.3 長輪詢poll消息

  • 默認情況下,消費者一次會poll 500條消息

     // 一次poll最大拉取消息的條數,可以根據消費速度的快慢來設置
     props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,500);
    

2.4 消費者健康狀態檢查

消費者每隔1s向kafka集群發送心跳,集群發現如果有超過10s沒有續約的消費者,將被提出消費組,觸發該消費組的rebalance機制,將該分區交給消費組里的其他消費者進行消費。

  //consumer給broker發送心跳的間隔時間
  props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,1000);
  //kafka如果超過10s沒有收到消費者的心跳,則會把消費者提出消費組,進行reBalance,把分區分配給其他消費者。
  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,10*1000);

2.5 指定分區、偏移量和實際那消費

  • 指定分區消費
//指定分區消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
  • 消息回溯消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
收到消息:partition = 3,offset = 0, key = 500000, value = {"createTime":1636702968492,"erpId":500000,"erpName":"張三三","realName":"張三三","updateTime":1636702968492}
收到消息:partition = 3,offset = 1, key = 500005, value = {"createTime":1636957199355,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636957199355}
收到消息:partition = 3,offset = 2, key = 500005, value = {"createTime":1636957211271,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636957211271}
收到消息:partition = 3,offset = 3, key = 500005, value = {"createTime":1636958064780,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636958064780}
  • 指定offset消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
consumer.seek(new TopicPartition(TOPIC_NAME,0),10);
收到消息:partition = 3,offset = 2, key = 500005, value = {"createTime":1636957211271,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636957211271}
收到消息:partition = 3,offset = 3, key = 500005, value = {"createTime":1636958064780,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636958064780}
  • 指定時間去消費

根據時間,去所有的Partition中確定該時間對應的offset,然后去所有的partition中找到該offset之后的消息開始消費

 List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
  List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
  //從1小時前開始消費
  long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
  Map<TopicPartition, Long> map = new HashMap<>();
  for (PartitionInfo par : topicPartitions) {
    map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
  }
  Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
  for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
    TopicPartition key = entry.getKey();
    OffsetAndTimestamp value = entry.getValue();
    if (key == null || value == null) continue;
    Long offset = value.offset();
    System.out.println("partition-" + key.partition() +
                       "|offset-" + offset);
    System.out.println();
    //根據消費里的timestamp確定offset if (value != null) {
    consumer.assign(Arrays.asList(key));
    consumer.seek(key, offset);
  }

2.6 新消費組的消費offset規則

新消費組的消費者在啟動后,默認會從當前分區的最后一條消息offset+1開始消費(消費新消息)。可以通過以下設置,讓新的消費者第一次從頭開始消費,之后開始消費新消息(最后消費的位置的偏移量+1)

  • latest: 默認的,消費新消息

  • earliest: 第一次從頭開始消費,之后開始消費新消息(最后消費的位置的偏移量+1)

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
收到消息:partition = 4,offset = 0, key = 500004, value = {"createTime":1636948165301,"erpId":500004,"erpName":"張三三","realName":"張三三","updateTime":1636948165301}
收到消息:partition = 1,offset = 0, key = 500001, value = {"createTime":1636703013717,"erpId":500001,"erpName":"李絲絲","realName":"李絲絲","updateTime":1636703013717}
收到消息:partition = 3,offset = 0, key = 500000, value = {"createTime":1636702968492,"erpId":500000,"erpName":"張三三","realName":"張三三","updateTime":1636702968492}
收到消息:partition = 3,offset = 1, key = 500005, value = {"createTime":1636957199355,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636957199355}
收到消息:partition = 3,offset = 2, key = 500005, value = {"createTime":1636957211271,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636957211271}
收到消息:partition = 3,offset = 3, key = 500005, value = {"createTime":1636958064780,"erpId":500005,"erpName":"張三三","realName":"張三三","updateTime":1636958064780}

3. SpringBoot整合Kafka

3.1 引入spring-kafka依賴

    <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
    </dependency>

3.2 application.yml中的配置

spring:
  kafka:
    bootstrap-servers: 192.168.21.107:9092,192.168.21.108:9092,192.168.21.109:9092
    # 生產者
    producer:
      # 設置大於0的值,則客戶端會將發送失敗的記錄重新發送
      retries: 3
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 500

    listener:
      # 手動調用Acknowledgment.acknowledge()后立即提交,一般使用這種
      ack-mode: manual_immediate

3.3 編寫消息生產者

private final static String TOPIC_NAME= "my-replicated-topic";

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

@RequestMapping(value ="/send")
public String sendMessage(){
      kafkaTemplate.send(TOPIC_NAME,"key","this is a test kafka message");
      return "send message success";
}

3. 4 編寫消息消費者

		@KafkaListener(topics = "my-replicated-topic")
    public void listenGroup(ConsumerRecord<String, String> record,
                            Acknowledgment ack) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        //手動提交offset
        ack.acknowledge();
    }

3.5 消費者中配置消費主題、分區和偏移量

 @KafkaListener(groupId = "testGroup", topicPartitions = {
            @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
            @TopicPartition(topic = "topic2", partitions = "0",
                    partitionOffsets = @PartitionOffset(partition = "1",
                            initialOffset = "100"))
    },concurrency = "3")//concurrency就是同組下的消費者個數,就是並發消費數,建 議小於等於分區總數
    public void listenGroupPro(ConsumerRecord<String, String> record,
                               Acknowledgment ack) {
        String value = record.value(); 
        System.out.println(value); 
        System.out.println(record); 
        //手動提交offset 
        ack.acknowledge();
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM