Spring Boot2.0 整合 Kafka


Kafka 概述

Apache Kafka 是一個分布式流處理平台,用於構建實時的數據管道和流式的應用.它可以讓你發布和訂閱流式的記錄,可以儲存流式的記錄,並且有較好的容錯性,可以在流式記錄產生時就進行處理。

Apache Kafka是分布式發布-訂閱消息系統,在 kafka官網上對 Kafka 的定義:一個分布式發布-訂閱消息傳遞系統。

Kafka 特性

  1. 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作;
  2. 可擴展性:kafka集群支持熱擴展;
  3. 持久性、可靠性:消息被持久化到本地磁盤,並且支持數據備份防止數據丟失;
  4. 容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗);
  5. 高並發:支持數千個客戶端同時讀寫;
  6. 支持實時在線處理和離線處理:可以使用Storm這種實時流處理系統對消息進行實時進行處理,同時還可以使用Hadoop這種批處理系統進行離線處理;

Kafka 使用場景

  1. 日志收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如Hadoop、Hbase、Solr等;
  2. 消息系統:解耦和生產者和消費者、緩存消息等;
  3. 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發布到kafka的topic中,然后訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到Hadoop、數據倉庫中做離線分析和挖掘;
  4. 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告;
  5. 流式處理:比如spark streaming和storm;
  6. 事件源;

Spring Boot2.0 + Kafka

1,安裝配置Kafka ,Zookeeper

安裝和配置過程很簡單,就不詳細說了,參考官網:http://kafka.apache.org/quickstart

使用命令啟動Kafka: bin``/kafka-server-start``.sh config``/server``.properties

下面給出我的環境:

Centos 7.5,  Kafka 2.11, Zookeeper-3.4.13,  JDK1.8+ 	

2,創建 Spring Boot 項目

注意版本:該項目使用Spring Boot 2.0 +,低版本可能不對

  1. pom.xml引用
               <dependency>
                   <groupId>org.springframework.boot</groupId>
                   <artifactId>spring-boot-starter</artifactId>
               </dependency>
               <dependency>
                   <groupId>org.springframework.kafka</groupId>
                   <artifactId>spring-kafka</artifactId>
               </dependency>
               <dependency>
                   <groupId>com.alibaba</groupId>
                   <artifactId>fastjson</artifactId>
                   <version>1.2.47</version>
               </dependency>
  1. 定義消息生產者
    直接使用 KafkaTemplate 發送消息 ,Spring Boot自動裝配,不需要自己定義一個Kafka配置類,吐槽一下網站的文章,全都是互相抄,全都寫一個 ProduceConfig Consumerconfig 類, Kafka 的參數配置 硬編碼在代碼中,簡直無法直視。。
    定義一個泛型類 KafkaSender<T> T 就是你需要發送的消息 對象,序列化使用阿里的 fastjson

消息發送后,可以在回調類里面處理自己的業務,ListenableFutureCallback 類有兩個方法,分別是 onFailureononSuccess ,實際場景可以在這兩個方法,處理自己的具體業務,這里不做實現。


       /**
        * 消息生產者
        *
        * @author Jarvis
        * @date 2018/8/3
        */
       @Component
       public class KafkaSender<T> {
       
           private Logger logger = LoggerFactory.getLogger(KafkaSender.class);
       
           @Autowired
           private KafkaTemplate<String, Object> kafkaTemplate;
       
           /**
            * kafka 發送消息
            *
            * @param obj 消息對象
            */
           public void send(T obj) {
               String jsonObj = JSON.toJSONString(obj);
               logger.info("------------ message = {}", jsonObj);
       
               //發送消息
               ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("kafka.tut", jsonObj);
               future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                   @Override
                   public void onFailure(Throwable throwable) {
                       logger.info("Produce: The message failed to be sent:" + throwable.getMessage());
                   }
       
                   @Override
                   public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                       //TODO 業務處理
                       logger.info("Produce: The message was sent successfully:");
                       logger.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString());
                   }
               });
           }
       }

  1. 定義消息消費者
    使用 @KafkaListener 注解監聽 topics 消息,此處的topics 必須和 send 函數中的 一致

@Header(KafkaHeaders.RECEIVED_TOPI 直接獲取 topic


    /**
     * 監聽kafka.tut 的 topic
     *
     * @param record
     * @param topic  topic
     */
    @KafkaListener(id = "tut", topics = "kafka.tut")
    public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        //判斷是否NULL
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {
            //獲取消息
            Object message = kafkaMessage.get();

            logger.info("Receive: +++++++++++++++ Topic:" + topic);
            logger.info("Receive: +++++++++++++++ Record:" + record);
            logger.info("Receive: +++++++++++++++ Message:" + message);
        }
    }

  1. 配置文件 application.yml

       spring:
         application:
           name: kafka-tutorial
         kafka:
           # 指定kafka 代理地址,可以多個
           bootstrap-servers: 192.168.10.100:9092
           producer:
             retries: 0
             # 每次批量發送消息的數量
             batch-size: 16384
             # 緩存容量
             buffer-memory: 33554432
             # 指定消息key和消息體的編解碼方式
             key-serializer: org.apache.kafka.common.serialization.StringSerializer
             value-serializer: org.apache.kafka.common.serialization.StringSerializer
           consumer:
             # 指定默認消費者group id
             group-id: consumer-tutorial
             auto-commit-interval: 100
             auto-offset-reset: earliest
             enable-auto-commit: true
             # 指定消息key和消息體的編解碼方式
             key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
             value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
           # 指定listener 容器中的線程數,用於提高並發量
           listener:
             concurrency: 3
  1. 直接使用 @Autowired 對類 KafkaSender 自動裝配,然后調用 send 方法發送消息即可,下面給出代碼:
           @Autowired
           private KafkaSender<User> kafkaSender;
       
           @Test
           public void kafkaSend() throws InterruptedException {
               //模擬發消息
               for (int i = 0; i < 5; i++) {
       
                   User user = new User();
                   user.setId(System.currentTimeMillis());
                   user.setMsg(UUID.randomUUID().toString());
                   user.setSendTime(new Date());
       
                   kafkaSender.send(message);
                   Thread.sleep(3000);
       
               }
           }

控制台可以看到執行成功:

在服務器執行 bin/kafka-topics.sh --list --zookeeper localhost:2181 可以看到topic

Kafka如何保證數據的不丟失

1.生產者數據的不丟失

  • 新版本的producer采用異步發送機制。KafkaProducer.send(ProducerRecord)方法僅僅是把這條消息放入一個緩存中(即RecordAccumulator,本質上使用了隊列來緩存記錄),同時后台的IO線程會不斷掃描該緩存區,將滿足條件的消息封裝到某個batch中然后發送出去。顯然,這個過程中就有一個數據丟失的窗口:若IO線程發送之前client端掛掉了,累積在accumulator中的數據的確有可能會丟失。 kafka的ack機制:在kafka發送數據的時候,每次發送消息都會有一個確認反饋機制,確保消息正常的能夠被收到。
  • 如果是同步模式:ack機制能夠保證數據的不丟失,如果ack設置為0,風險很大,一般不建議設置為0
    producer.type=sync
    request.required.acks=1
  • 如果是異步模式:通過buffer來進行控制數據的發送,有兩個值來進行控制,時間閾值與消息的數量閾值,如果buffer滿了數據還沒有發送出去,如果設置的是立即清理模式,風險很大,一定要設置為阻塞模式
    producer.type=async
    request.required.acks=1
    queue.buffering.max.ms=5000
    queue.buffering.max.messages=10000
    queue.enqueue.timeout.ms = -1
    batch.num.messages=200
  • 結論:producer有丟數據的可能,但是可以通過配置保證消息的不丟失
    2.消費者數據的不丟失
  • 如果在消息處理完成前就提交了offset,那么就有可能造成數據的丟失。由於Kafka consumer默認是自動提交位移的,所以在后台提交位移前一定要保證消息被正常處理了,因此不建議采用很重的處理邏輯,如果處理耗時很長,則建議把邏輯放到另一個線程中去做。為了避免數據丟失,現給出兩點建議:
    enable.auto.commit=false 關閉自動提交位移
    在消息被完整處理之后再手動提交位移
  • 如果使用了storm,要開啟storm的ackfail機制;
  • 如果沒有使用storm,確認數據被完成處理之后,再更新offset值。低級API中需要手動控制offset值。通過offset commit 來保證數據的不丟失,kafka自己記錄了每次消費的offset數值,下次繼續消費的時候,接着上次的offset進行消費即可。

源碼 github:https://github.com/jarvisqi/java-tutorial/tree/master/kafka-tutorial

參考:

  1. http://kafka.apache.org/quickstart
  2. https://docs.spring.io/spring-kafka/reference/htmlsingle/#kafka
  3. https://blog.csdn.net/tzs_1041218129/article/details/78988439


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM