1.第一種(發送並忘記)
ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic",1,"TestProducer"); // 主題,key,value Propertis properties = new Properties(); properties.put("bootstrap.servers","127.0.0.1:9092");; properties .put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties .put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer kafkaProducer = new KafkaProducer<>(properties ); kafkaProducer .send(record ) //發送並忘記
2.第二種(同步阻塞)
ProducerRecord<String,String> record = new ProducerRecord<String,String>("topic",1,"TestProducer"); Propertis properties = new Properties(); properties.put("bootstrap.servers","127.0.0.1:9092");; properties .put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties .put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 發送配置(重要)
properties.put("ack","1"); // 集群狀態下的復制機制,默認使用1,還有0,all
properties.put("batch.size",16384); // 一個批次可以使用的內存大小,缺省16384(16k)
prperties.put("linger.ms",0L); // 指定生產者在發送批次前等待更多消息加入批次的時間 缺省0
properties.put("max.request.size",1*1024*1024); // 控制生產者發送請求最大大小,默認為1M(這個參數和kafka主機的message.max.bytes 參數有關)
// 發送配置(非重要)
properties.put("buffer.memory",32*1024*1024); // 生產者內存緩沖區大小
properties.put("retries",0); // 重發消息次數
properties.put("request.timeout.ms",30*1000); // 客戶端將等待請求的響應的最大時間 默認30s
properties.put("max.block.ms",60 * 1000); // 最大阻塞時間,超過則拋出異常 缺省60000ms
properties.put("compression.type","none"); // 於壓縮數據的壓縮類型。默認無壓縮,none,gzip,snapy
KafkaProducer kafkaProducer = new KafkaProducer<>(properties ); Future<RecordMetadata> recordMetadata= kafkaProducer.send(record); // 阻塞在這個未知
if (null != recordMetadata){
System.out.println("offset:" + recordMetadata.offset() + "-" + "partition:" + recordMetadata.partition());
}
3.第三種(異步發送)
Propertis properties = new Properties(); properties.put("bootstrap.servers","127.0.0.1:9092");; properties .put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties .put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
ProducerRecord<String,String> record;
KafkaProducer kafkaProducer = new KafkaProducer<>(properties );try record = new ProducerRecord<String,String>("topic",1,"TestProducer"); kafkaProducer.send(record, new Callback(){
public vlid onCompletion(RecordMetadata metadata,Exception exception){
if (null != exception) {
exception.printStrackTrace();
}
if (null != metadata) {
System.out.println("offset:" + recordMetadata.offset() + "-" + "partition:" + recordMetadata.partition());
}
}
});
} finally {
kafkaProducer.close();
} Future<RecordMetadata> recordMetadata= kafkaProducer.send(record); // 阻塞在這個未知 if (null != recordMetadata){ System.out.println("offset:" + recordMetadata.offset() + "-" + "partition:" + recordMetadata.partition()); }