Spring Kafka(六) 實現消息轉發以及ReplyTemplate


目的

軟件需要使用什么技術都是按照業務邏輯來的嘛,那自動轉發相對應的業務可以是什么呢?

可以使用轉發功能實現業務解耦,系統A從Topic-A中獲取到消息,進行處理后轉發到Topic-B中,系統B監聽Topic-B獲取消息再次進行處理,

這個消息可以是訂單相關數據,系統A處理用戶提交的訂單審核,系統B處理訂單的物流信息等等。

實現方式

Spring-Kafka整合了兩種消息轉發方式:

使用Headers設置回復主題(Reply_Topic),這種方式比較特別,是一種請求響應模式,使用的是ReplyingKafkaTemplate類

手動轉發,使用@SendTo注解將監聽方法返回值轉發到Topic中

@SendTo方式

SendTo的方式可以說是非常簡單的,給我三秒鍾,不不不,男人不可以這么快,三個鍾好了,嘿嘿嘿。

配置ConcurrentKafkaListenerContainerFactory的ReplyTemplate

監聽方法加上@SendTo注解

KafkaConfiguration.class

這里我們為監聽容器工廠(ConcurrentKafkaListenerContainerFactory)配置一個ReplyTemplate,ReplyTemplate是我們用來轉發消息所使用的類。

@SendTo注解本質其實就是利用這個ReplyTemplate轉發監聽方法的返回值到對應的Topic中,

我們也可以是用代碼實現KakfaTemplate.send(),不過使用注解的好處就是減少代碼量,加快開發效率。

 @Bean public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setReplyTemplate(kafkaTemplate()); return factory; }
View Code
@Component public class ForwardListener { private static final Logger log= LoggerFactory.getLogger(ForwardListener.class); @KafkaListener(id = "forward", topics = "topic.quick.target") @SendTo("topic.quick.real") public String forward(String data) { log.info("topic.quick.target  forward "+data+" to  topic.quick.real"); return "topic.quick.target send msg : " + data; } }
View Code

順便就寫個測試方法測試一下吧,可以看到運行成功后,

topic.quick.real這個主題會產生一條數據,這條數據就是我們在forward方法返回的值。

 @Autowired private KafkaTemplate kafkaTemplate; @Test public void testForward() { kafkaTemplate.send("topic.quick.target", "test @SendTo"); }
View Code

ReplyTemplate方式

使用ReplyTemplate方式不同於@SendTo方式,@SendTo是直接將監聽方法的返回值轉發對應的Topic中,

而ReplyTemplate也是將監聽方法的返回值轉發Topic中,但轉發Topic成功后,會被請求者消費。

這是怎么回事呢?我們可以回想一下請求響應模式,這種模式其實我們是經常使用的,

就像你調用某個第三方接口,這個接口會把響應報文返回給你,你可以根據業務處理這段響應報文。

而ReplyTemplate方式的這種請求響應模式也是相同的,

首先生成者發送消息到Topic-A中,Topic-A的監聽器則會處理這條消息,緊接着將消息轉發到Topic-B中,

當這條消息轉發到Topic-B成功后則會被ReplyTemplate接收。那最終消費者獲得的是被處理過的數據。

ReplyTemplate實現的代碼也並不復雜,實現的功能確更多。

講一下流程吧

配置ConcurrentKafkaListenerContainerFactory的ReplyTemplate

配置topic.quick.request的監聽器

注冊一個KafkaMessageListenerContainer類型的監聽容器,監聽topic.quick.reply,這個監聽器里面我們不處理任何事情,交由ReplyingKafkaTemplate處理

通過ProducerFactory和KafkaMessageListenerContainer創建一個ReplyingKafkaTemplate類型的Bean,設置回復超時時間為10秒

 @Bean public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setReplyTemplate(kafkaTemplate()); return factory; }
View Code
 @KafkaListener(id = "replyConsumer", topics = "topic.quick.request",containerFactory = "kafkaListenerContainerFactory") @SendTo public String replyListen(String msgData){ log.info("topic.quick.request receive : "+msgData); return "topic.quick.reply  reply : "+msgData; } @Bean public KafkaMessageListenerContainer<String, String> replyContainer(@Autowired ConsumerFactory consumerFactory) { ContainerProperties containerProperties = new ContainerProperties("topic.quick.reply"); return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); } @Bean public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(@Autowired ProducerFactory producerFactory, KafkaMessageListenerContainer replyContainer) { ReplyingKafkaTemplate template = new ReplyingKafkaTemplate<>(producerFactory, replyContainer); template.setReplyTimeout(10000); return template; }
View Code

發送消息就顯得稍微有點麻煩了,不過在項目編碼過程中可以把它封裝成一個工具類調用。

我們需要創建ProducerRecord類,用來發送消息,並添加KafkaHeaders.REPLY_TOPIC到record的headers參數中,這個參數配置我們想要轉發到哪個Topic中。

使用replyingKafkaTemplate.sendAndReceive()方法發送消息,該方法返回一個Future類RequestReplyFuture,

這里類里面包含了獲取發送結果的Future類和獲取返回結果的Future類。使用replyingKafkaTemplate發送及返回都是異步操作。

調用RequestReplyFuture.getSendFutrue().get()方法可以獲取到發送結果

調用RequestReplyFuture.get()方法可以獲取到響應結果

@Autowired private ReplyingKafkaTemplate replyingKafkaTemplate; @Test public void testReplyingKafkaTemplate() throws ExecutionException, InterruptedException, TimeoutException { ProducerRecord<String, String> record = new ProducerRecord<>("topic.quick.request", "this is a message"); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "topic.quick.reply".getBytes())); RequestReplyFuture<String, String, String> replyFuture = replyingKafkaTemplate.sendAndReceive(record); SendResult<String, String> sendResult = replyFuture.getSendFuture().get(); System.out.println("Sent ok: " + sendResult.getRecordMetadata()); ConsumerRecord<String, String> consumerRecord = replyFuture.get(); System.out.println("Return value: " + consumerRecord.value()); Thread.sleep(20000); }
View Code

注意:

由於ReplyingKafkaTemplate也是通過監聽容器實現的,所以響應時間可能會較慢,要注意選擇合適的場景使用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM