Kafka Producer写入消息的几种方式


直接发送

下面是一种最简单的发送数据的方式

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products","France"); // 1 try { producer.send(record); //2 } catch (Exception e) { e.printStackTrace(); //3 }
  • 1 Producer接收是ProducerRecord对象,因此最开始的时候需要创建ProducerRecord的对象。ProducerRecord有好几种构造方法,稍后我们会讲到。上面例子中需要填写接收消息的topic(以啊不能都是字符串类型),想要发送的key和value。key和value的类型必须要和序列化保持一致。
  • 2 使用send()方法发送ProducerRecord对象,就像最开始Kafka的架构一样,消息会先缓存在buffer,然后开启独立的线程发送给broker。send()方法返回一个Java Future对象,对象中包含RecordMetadata,在上面的例子中并没有关心返回值,因此也就不知道消息是否发送成功,这种一般适用于允许丢失消息的情况。比如记录一些日志信息或者是不太重要的应用信息。
  • 3 虽然我们忽略了向broker发送数据时出的错或者是Broker自己出的错,在producer发送数据前如果有错误仍然会抛出异常。有可能是在序列化消息的时候,产生了异常。比如说Buffer已经满了或者是发送线程中断产生的中断异常。

同步发送

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); producer.send(record).get();

这里,使用了Future.get()方法,会等待kafka的确认回复。当broker遇到错误或者应用出现问题时,future接口都会抛出异常,然后我们可以捕获到这个异常进行处理。如果没有错误。将会获得RecordMetadata对象,这个对象包含了消息写入的偏移值。

Producer有两种错误类型。一种是可以通过再次发送消息解决的错误,比如连接出现问题,需要重新连接;或者是"no leader"错误,通过等待一会Leader重新选举完就可以继续。producer可以配置自动重试。另一种是通过重试无法处理的错误,比如消息过大,这种情况下,Producer就不会重试,而是直接抛出异常。

异步发送

设想一下,如果应用跟Kafka集群之间传递消息需要10ms,那么发送100个消息,将需1秒钟的时间。另一方面,如果我们仅仅发送消息,而忽略返回的时间,那么100个消息根本花费不了多长时间。在大多数的情况下,都不需要回复——kafka在消息写入broker之后会返回消息所在的offset,这部分的信息对于producer来说,其实没什么用。另一方面,我们还需要知道消息发送失败后,抛出的异常、错误日志或者是把消息写入"errors标记的文件",稍后再统一处理。

为了异步的发送数据,但是还能处理异常,producer支持消息成功写入后回调。下面就是回调的例子:

private class DemoProducerCallback implements Callback { //1 @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); //2 } } } ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");//3 producer.send(record, new DemoProducerCallback());//4
  • 1 为了使用回调方法,需要实现org.apache.kafka.clients.producer.Callback接口,实现它的onCompletion方法。
  • 2 当Kafka返回错误的时候,onCompletion方法会收到一个非null的异常。上面的例子直接打印异常消息,但是如果是生产环境,需要做一些处理错误的操作。
  • 3 记录的创建和之前是一样的
  • 4 需要再发送消息的时候,传入回调的对象


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM