kafka發送消息的三種方式


1.第一種(發送並忘記)

ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic",1,"TestProducer"); // 主題,key,value

Propertis properties = new Properties();
properties.put("bootstrap.servers","127.0.0.1:9092");;
properties .put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties .put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer kafkaProducer = new KafkaProducer<>(properties );

kafkaProducer .send(record ) //發送並忘記

2.第二種(同步阻塞)

ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic",1,"TestProducer");

Propertis properties = new Properties();
properties.put("bootstrap.servers","127.0.0.1:9092");;
properties .put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties .put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 發送配置(重要)
properties.put("ack","1"); // 集群狀態下的復制機制,默認使用1,還有0,all
properties.put("batch.size",16384); // 一個批次可以使用的內存大小,缺省16384(16k)
prperties.put("linger.ms",0L); // 指定生產者在發送批次前等待更多消息加入批次的時間 缺省0
properties.put("max.request.size",1*1024*1024); // 控制生產者發送請求最大大小,默認為1M(這個參數和kafka主機的message.max.bytes 參數有關)
// 發送配置(非重要)
properties.put("buffer.memory",32*1024*1024); // 生產者內存緩沖區大小
properties.put("retries",0); // 重發消息次數
properties.put("request.timeout.ms",30*1000); // 客戶端將等待請求的響應的最大時間 默認30s
properties.put("max.block.ms",60 * 1000); // 最大阻塞時間,超過則拋出異常 缺省60000ms
properties.put("compression.type","none"); // 於壓縮數據的壓縮類型。默認無壓縮,none,gzip,snapy
KafkaProducer kafkaProducer
= new KafkaProducer<>(properties ); Future<RecordMetadata> recordMetadata= kafkaProducer.send(record); // 阻塞在這個未知
if (null != recordMetadata){
  System.out.println("offset:" + recordMetadata.offset() + "-" + "partition:" + recordMetadata.partition());
}

 3.第三種(異步發送)

Propertis properties = new Properties(); properties.put("bootstrap.servers","127.0.0.1:9092");; properties .put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties .put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
ProducerRecord<String,String> record;
KafkaProducer kafkaProducer = new KafkaProducer<>(properties );try  record = new ProducerRecord<String,String>("topic",1,"TestProducer");  kafkaProducer.send(record, new Callback(){
    public vlid onCompletion(RecordMetadata metadata,Exception exception){
      if (null != exception) {
        exception.printStrackTrace();
      }
      if (null != metadata) {
             System.out.println("offset:" + recordMetadata.offset() + "-" + "partition:" + recordMetadata.partition());
      }
    }
  });
} finally {
  kafkaProducer.close();
}
Future<RecordMetadata> recordMetadata= kafkaProducer.send(record); // 阻塞在這個未知 if (null != recordMetadata){   System.out.println("offset:" + recordMetadata.offset() + "-" + "partition:" + recordMetadata.partition()); }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM