Spring Kafka(三)KafkaTemplate發送消息及結果回調


 

我們使用KafkaTemplate.send(String data)這個方法發送消息到Kafka中,顯然這個方法並不能滿足我們系統的需求,

那我們需要查看一下KafkaTemplate所實現的接口,看看還提供了什么方法。

當我們發送消息到Kafka后,我們又怎么去確認消息是否發送成功呢?這就涉及到KafkaTemplate的發送回調方法了。接下來我們開始正式講解。

1. 查看發送接口

首先我們Ctrl+鼠標左鍵進入KafkaTemplate的源代碼中查看一下,可以看到有關發送的接口如下。

這里的參數還是比較簡單的,值得一提的事,方法中有個Long類型的時間戳(timestamp)參數,這是Kafka0.10版本提供的新功能,

主要用來使用時間索引進行查詢數據以及日志切分清除策略。還有一個ProducerRecord參數,這個類其實就是整合了topic、partition、data等數據的消費實體類。

topic:這里填寫的是Topic的名字

partition:這里填寫的是分區的id,其實也是就第幾個分區,id從0開始。表示指定發送到該分區中

timestamp:時間戳,一般默認當前時間戳

key:消息的鍵

data:消息的數據

ProducerRecord:消息對應的封裝類,包含上述字段

Message<?>:Spring自帶的Message封裝類,包含消息及消息頭

ListenableFuture<SendResult<K, V>> sendDefault(V data); ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, V data); ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); ListenableFuture<SendResult<K, V>> send(Message<?> message);
View Code

2. 使用sendDefault發送消息

首先在KafkaConfiguration編寫一個帶有默認Topic參數的KafkaTemplate,同時為另外一個KafkaTemplate加上@Primary注解,

@Primary注解的意思是在擁有多個同類型的Bean時優先使用該Bean,到時候方便我們使用@Autowired注解自動注入。

   //這個是我們之前編寫的KafkaTemplate代碼,加入@Primary注解
 @Bean @Primary public KafkaTemplate<Integer, String> kafkaTemplate() { KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory()); return template; } @Bean("defaultKafkaTemplate") public KafkaTemplate<Integer, String> defaultKafkaTemplate() { KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory()); template.setDefaultTopic("topic.quick.default"); return template; }
View Code

接着編寫測試方法,可以看到我們這里調用的是sendDefault方法,而且並沒有在方法參數上添加topicName,

這是因為我們在聲明defaultKafkaTemplate這個Bean的時候添加了這行代碼 template.setDefaultTopic("topic.quick.default"),

只要調用sendDefault方法,kafkaTemplate會自動把消息發送到名為"topic.quick.default"的Topic中。

 @Resource private KafkaTemplate defaultKafkaTemplate; @Test public void testDefaultKafkaTemplate() { defaultKafkaTemplate.sendDefault("I`m send msg to default topic"); }
View Code

 

 這里也順便測試一下其他幾個吧。

 @Test public void testTemplateSend() { //發送帶有時間戳的消息
        kafkaTemplate.send("topic.quick.demo", 0, System.currentTimeMillis(), 0, "send message with timestamp"); //使用ProducerRecord發送消息
        ProducerRecord record = new ProducerRecord("topic.quick.demo", "use ProducerRecord to send message"); kafkaTemplate.send(record); //使用Message發送消息
        Map map = new HashMap(); map.put(KafkaHeaders.TOPIC, "topic.quick.demo"); map.put(KafkaHeaders.PARTITION_ID, 0); map.put(KafkaHeaders.MESSAGE_KEY, 0); GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map)); kafkaTemplate.send(message); }
View Code

3. KafkaTemplate異步發送消息

發送時間較長的時候會導致進程提前關閉導致無法調用回調時間。

主要是因為KafkaTemplate發送消息是采取異步方式發送的,我們可以看下KafkaTemplate的源代碼

這是我們剛才調用的發送消息方法,可以看到KafkaTemplate會使用ProducerRecord把我們傳遞進來的參數再一次封裝,最后調用doSend方法發送消息到Kafka中

send(String topic, V data)

send(String topic, V data) public ListenableFuture<SendResult<K, V>> send(String topic, V data) { ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data); return this.doSend(producerRecord); }
View Code

ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord)

doSend方法先是檢測是否開啟事務,緊接着使用SettableListenableFuture發送消息,然后判斷是否啟動自動沖洗數據到Kafka中,

我們再接着看看SettableListenableFuture實現了什么接口

 protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) { if (this.transactional) { Assert.state(this.inTransaction(), "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record"); } final Producer<K, V> producer = this.getTheProducer(); if (this.logger.isTraceEnabled()) { this.logger.trace("Sending: " + producerRecord); } final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture(); producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { try { if (exception == null) { future.set(new SendResult(producerRecord, metadata)); if (KafkaTemplate.this.producerListener != null) { KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata); } if (KafkaTemplate.this.logger.isTraceEnabled()) { KafkaTemplate.this.logger.trace("Sent ok: " + producerRecord + ", metadata: " + metadata); } } else { future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception)); if (KafkaTemplate.this.producerListener != null) { KafkaTemplate.this.producerListener.onError(producerRecord, exception); } if (KafkaTemplate.this.logger.isDebugEnabled()) { KafkaTemplate.this.logger.debug("Failed to send: " + producerRecord, exception); } } } finally { if (!KafkaTemplate.this.transactional) { KafkaTemplate.this.closeProducer(producer, false); } } } }); if (this.autoFlush) { this.flush(); } if (this.logger.isTraceEnabled()) { this.logger.trace("Sent: " + producerRecord); } return future; }
View Code

可以看到SettableListenableFuture實現了ListenableFuture接口,ListenableFuture則實現了Future接口,

Future是Java自帶的實現異步編程的接口,支持返回值的異步,而我們使用Thread或者Runnable都是不帶返回值的。

public class SettableListenableFuture<T> implements ListenableFuture<T>
public interface ListenableFuture<T> extends Future<T> 
View Code

4. KafkaTemplate同步發送消息

KafkaTemplate異步發送消息大大的提升了生產者的並發能力,但某些場景下我們並不需要異步發送消息,

這個時候我們可以采取同步發送方式,實現也是非常簡單的,我們只需要在send方法后面調用get方法即可。

Future模式中,我們采取異步執行事件,等到需要返回值得時候我們再調用get方法獲取future的返回值

 @Test public void testSyncSend() throws ExecutionException, InterruptedException { kafkaTemplate.send("topic.quick.demo", "test sync send message").get(); }
View Code

get方法還有一個比較有意思的重載方法,get(long timeout, TimeUnit unit),當send方法耗時大於get方法所設定的參數時會拋出一個超時異常,

但需要注意,這里僅拋出異常,消息還是會發送成功的。

這里的測試方法設置send耗時必須小於 一微秒(那必須得失敗呀,嘿嘿嘿),運行后我們可以看到拋出的異常,但也發現消息能發送成功並被監聽器接收了。

那這功能有什么作用呢,如果還沒有接觸過SQL慢查詢可以去了解一下,使用該方法作為SQL慢查詢記錄的條件。

 @Test public void testTimeOut() throws ExecutionException, InterruptedException, TimeoutException { kafkaTemplate.send("topic.quick.demo", "test send message timeout").get(1,TimeUnit.MICROSECONDS); }
View Code
2018-09-08 16:36:09.110  INFO 7724 --- [     demo-0-C-1] com.viu.kafka.listen.DemoListener : demo receive : test send message timeout java.util.concurrent.TimeoutException
View Code

5. 消息結果回調

一般來說我們都會去獲取KafkaTemplate發送消息的結果去判斷消息是否發送成功,如果消息發送失敗,則會重新發送或者執行對應的業務邏輯。

所以這里我們去實現這個功能。

KafkaSendResultHandler

第一步還是編寫一個消息結果回調類KafkaSendResultHandler。

當我們使用KafkaTemplate發送消息成功的時候回調用OnSuccess方法,發送失敗則會調用onError方法。

@Component public class KafkaSendResultHandler implements ProducerListener { private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class); @Override public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { log.info("Message send success : " + producerRecord.toString()); } @Override public void onError(ProducerRecord producerRecord, Exception exception) { log.info("Message send error : " + producerRecord.toString()); } }
View Code

接下來就使用KafkaSendResultHandler實現消息發送結果回調,這里需要休眠,發送時間較長的時候會導致進程提前關閉導致無法調用回調時間。

主要是因為KafkaTemplate發送消息是采取異步方式發送的

 @Autowired private KafkaSendResultHandler producerListener; @Test public void testProducerListen() throws InterruptedException { kafkaTemplate.setProducerListener(producerListener); kafkaTemplate.send("topic.quick.demo", "test producer listen"); Thread.sleep(1000); }
View Code

運行測試方法,我們可以看到控制台輸出的日志如下

2018-09-08 15:51:39.975  INFO 10268 --- [ad | producer-1] c.v.k.handler.KafkaSendResultHandler     : Message send success : ProducerRecord(topic=topic.quick.demo, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=test producer listen, timestamp=null)
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM