springboot系列八、springboot整合kafka



  • 背景:

      當業務在同一時間出現高並發的時候,這個時候我們不想無限的增加服務器,但是又想提高吞吐量。這時可以考慮使用消息異步處理,進行消峰填谷;同時還可以降低耦合度。常見的消息中間件有kafka,rabbitMQ,activeMQ,rocketMQ。其中性能最好的,吞吐量最高的是以kafka為代表,下面介紹kafka用法。kafka詳細原理介紹,參考kafka系列:https://www.cnblogs.com/wangzhuxing/category/1351802.html。

     

    一、引入依賴

    <!--kafka支持-->
    <dependency>
         <groupId>org.springframework.kafka</groupId>
         <artifactId>spring-kafka</artifactId>
    </dependency>

    二、配置yml

spring:
   kafka:     # 指定kafka 代理地址,可以多個
      bootstrap-servers: 47.52.199.52:9092
      template:    # 指定默認topic id
        default-topic: producer
      listener:   # 指定listener 容器中的線程數,用於提高並發量
        concurrency: 5
      consumer:
        group-id: myGroup # 指定默認消費者group id
        client-id: 200
        max-poll-records: 200
        auto-offset-reset: earliest # 最早未被消費的offset
      producer:
        batch-size: 1000 # 每次批量發送消息的數量
        retries: 3
        client-id: 200

三、生成者使用示例

package com.example.demo.kafka;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.ExecutionException;

@Component
public class Producer {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 發送消息到kafka
     */
    public RecordMetadata sendChannelMess(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic,message);
        RecordMetadata recordMetadata = null;
        try {
            recordMetadata = future.get().getRecordMetadata();
        } catch (InterruptedException|ExecutionException e) {
            e.printStackTrace();
            System.out.println("發送失敗");
        }
        System.out.println("發送成功");
        System.out.println("partition:"+recordMetadata.partition());
        System.out.println("offset:"+recordMetadata.offset());
        System.out.println("topic:"+recordMetadata.topic());

        return recordMetadata;
    }
}

四、消費者使用示例

package com.example.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class Consumer {

    /**
     * 有消息就讀取,只讀取消息value
     */
    @KafkaListener(topics = {"test13"})
    public void receiveMessage(String message){
        //收到通道的消息之后執行秒殺操作
        System.out.println(message);
    }

    /**
     * 有消息就讀取,批量讀取消息value
     */
    @KafkaListener(topics = "test12")
    public void onMessage(List<String> crs) {
        for(String str : crs){
            System.out.println("test12:" + str);
        }
    }

    /**
     * 有消息就讀取,讀取消息topic,offset,key,value等信息
     */
    @KafkaListener(topics = "test14")
    public void listenT1(ConsumerRecord<?, ?> cr){
        System.out.println("listenT1收到消息,topic:>>>" + cr.topic() + "  offset:>>" + cr.offset()+ "  key:>>" + cr.key() + "  value:>>" + cr.value());
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM