背景:
當業務在同一時間出現高並發的時候,這個時候我們不想無限的增加服務器,但是又想提高吞吐量。這時可以考慮使用消息異步處理,進行消峰填谷;同時還可以降低耦合度。常見的消息中間件有kafka,rabbitMQ,activeMQ,rocketMQ。其中性能最好的,吞吐量最高的是以kafka為代表,下面介紹kafka用法。kafka詳細原理介紹,參考kafka系列:https://www.cnblogs.com/wangzhuxing/category/1351802.html。
一、引入依賴
<!--kafka支持--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
二、配置yml
spring: kafka: # 指定kafka 代理地址,可以多個 bootstrap-servers: 47.52.199.52:9092 template: # 指定默認topic id default-topic: producer listener: # 指定listener 容器中的線程數,用於提高並發量 concurrency: 5 consumer: group-id: myGroup # 指定默認消費者group id client-id: 200 max-poll-records: 200 auto-offset-reset: earliest # 最早未被消費的offset producer: batch-size: 1000 # 每次批量發送消息的數量 retries: 3 client-id: 200
三、生成者使用示例
package com.example.demo.kafka; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import java.util.concurrent.ExecutionException; @Component public class Producer { @Autowired private KafkaTemplate<String,String> kafkaTemplate; /** * 發送消息到kafka */ public RecordMetadata sendChannelMess(String topic, String message) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic,message); RecordMetadata recordMetadata = null; try { recordMetadata = future.get().getRecordMetadata(); } catch (InterruptedException|ExecutionException e) { e.printStackTrace(); System.out.println("發送失敗"); } System.out.println("發送成功"); System.out.println("partition:"+recordMetadata.partition()); System.out.println("offset:"+recordMetadata.offset()); System.out.println("topic:"+recordMetadata.topic()); return recordMetadata; } }
四、消費者使用示例
package com.example.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.List; @Component public class Consumer { /** * 有消息就讀取,只讀取消息value */ @KafkaListener(topics = {"test13"}) public void receiveMessage(String message){ //收到通道的消息之后執行秒殺操作 System.out.println(message); } /** * 有消息就讀取,批量讀取消息value */ @KafkaListener(topics = "test12") public void onMessage(List<String> crs) { for(String str : crs){ System.out.println("test12:" + str); } } /** * 有消息就讀取,讀取消息topic,offset,key,value等信息 */ @KafkaListener(topics = "test14") public void listenT1(ConsumerRecord<?, ?> cr){ System.out.println("listenT1收到消息,topic:>>>" + cr.topic() + " offset:>>" + cr.offset()+ " key:>>" + cr.key() + " value:>>" + cr.value()); } }