普通實現
public class MyProducer {
public static void main(String[] args) {
/**
* 創建Kafka生產者配置信息:ProducerConfig類中記錄了Kafka需要的所有參數信息
* 1.指定連接的Kafka集群
* 2.ack應答級別
* 3.發送失敗的重試次數
* 4.批次大小(一次發送多少大小數據)
* 5.等待時間
* 6.RecordAccumulator緩沖區大小
* 7.指定key,value序列化類
*/
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 1);
properties.put("batch.size", 16384);
properties.put("liner.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/**
* 通過配置文件創建生產者對象
*/
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++) {
// 創建記錄ProducerRecord("Topic","partition","key","value")
ProducerRecord<String, String> message =
new ProducerRecord<String, String>("test", 0,"MyProducer","hello" + i);
// send:異步方法,發送之后,立即返回,並不是說調用了,就真的發送成功了;
kafkaProducer.send(message);
}
// 關閉連接:會清空內存
kafkaProducer.close();
}
}
同步實現
public class MyProducerFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 1);
properties.put("batch.size", 16384);
properties.put("liner.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> message = new ProducerRecord<String, String>("test", "hello" + i);
/**
* 同步發送,send返回 Future對象
* 調用get()
* 返回RecordMetadata元數據記錄,記錄了發送的topic,partition,offset
*/
RecordMetadata metadata = kafkaProducer.send(message).get();
String topic = metadata.topic();
int partition = metadata.partition();
long offset = metadata.offset();
System.out.println("Topic=>"+topic+" partition=>"+partition + " offset=>" +offset);
}
// 關閉連接:會清空內存
kafkaProducer.close();
}
}
回調實現
public class MyProducerCallback {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("acks", "all");
properties.put("retries", 1);
properties.put("batch.size", 16384);
properties.put("liner.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> message = new ProducerRecord<String, String>("test", "messuesein--" + i);
/**
* 發送消息:帶回調
* 傳入CallBack函數接口,參數:
* 1. RecordMetadata:成功返回元數據記錄
* 2. Exception:失敗返回異常
*/
kafkaProducer.send(message, (metadata, exception) -> {
// exception==null,即成功
if (exception == null) {
/**
* metadata記錄元數據信息
*/
String topic = metadata.topic();
int partition = metadata.partition();
long offset = metadata.offset();
System.out.println("Topic=>"+topic+" partition=>"+partition + " offset=>" +offset);
} else {
exception.printStackTrace();
}
});
}
// 關閉連接:會清空內存
kafkaProducer.close();
}
}