Kafka Producer寫入消息的幾種方式


直接發送

下面是一種最簡單的發送數據的方式

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products","France"); // 1 try { producer.send(record); //2 } catch (Exception e) { e.printStackTrace(); //3 }
  • 1 Producer接收是ProducerRecord對象,因此最開始的時候需要創建ProducerRecord的對象。ProducerRecord有好幾種構造方法,稍后我們會講到。上面例子中需要填寫接收消息的topic(以啊不能都是字符串類型),想要發送的key和value。key和value的類型必須要和序列化保持一致。
  • 2 使用send()方法發送ProducerRecord對象,就像最開始Kafka的架構一樣,消息會先緩存在buffer,然后開啟獨立的線程發送給broker。send()方法返回一個Java Future對象,對象中包含RecordMetadata,在上面的例子中並沒有關心返回值,因此也就不知道消息是否發送成功,這種一般適用於允許丟失消息的情況。比如記錄一些日志信息或者是不太重要的應用信息。
  • 3 雖然我們忽略了向broker發送數據時出的錯或者是Broker自己出的錯,在producer發送數據前如果有錯誤仍然會拋出異常。有可能是在序列化消息的時候,產生了異常。比如說Buffer已經滿了或者是發送線程中斷產生的中斷異常。

同步發送

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); producer.send(record).get();

這里,使用了Future.get()方法,會等待kafka的確認回復。當broker遇到錯誤或者應用出現問題時,future接口都會拋出異常,然后我們可以捕獲到這個異常進行處理。如果沒有錯誤。將會獲得RecordMetadata對象,這個對象包含了消息寫入的偏移值。

Producer有兩種錯誤類型。一種是可以通過再次發送消息解決的錯誤,比如連接出現問題,需要重新連接;或者是"no leader"錯誤,通過等待一會Leader重新選舉完就可以繼續。producer可以配置自動重試。另一種是通過重試無法處理的錯誤,比如消息過大,這種情況下,Producer就不會重試,而是直接拋出異常。

異步發送

設想一下,如果應用跟Kafka集群之間傳遞消息需要10ms,那么發送100個消息,將需1秒鍾的時間。另一方面,如果我們僅僅發送消息,而忽略返回的時間,那么100個消息根本花費不了多長時間。在大多數的情況下,都不需要回復——kafka在消息寫入broker之后會返回消息所在的offset,這部分的信息對於producer來說,其實沒什么用。另一方面,我們還需要知道消息發送失敗后,拋出的異常、錯誤日志或者是把消息寫入"errors標記的文件",稍后再統一處理。

為了異步的發送數據,但是還能處理異常,producer支持消息成功寫入后回調。下面就是回調的例子:

private class DemoProducerCallback implements Callback { //1 @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); //2 } } } ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");//3 producer.send(record, new DemoProducerCallback());//4
  • 1 為了使用回調方法,需要實現org.apache.kafka.clients.producer.Callback接口,實現它的onCompletion方法。
  • 2 當Kafka返回錯誤的時候,onCompletion方法會收到一個非null的異常。上面的例子直接打印異常消息,但是如果是生產環境,需要做一些處理錯誤的操作。
  • 3 記錄的創建和之前是一樣的
  • 4 需要再發送消息的時候,傳入回調的對象


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM