import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class MyProducer {
private static KafkaProducer<String,String> producer;
static{
Properties kfkProperties = new Properties();
kfkProperties.put("bootstrap.servers","slave1:9092");
kfkProperties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kfkProperties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(kfkProperties);
}
/**
* producer1 發送消息不考慮返回信息
*/
private static void sendMessageForgotResult(){
ProducerRecord<String,String> record = new ProducerRecord<>("kafkatest","name","ForgetResult");
producer.send(record);
producer.close();
}
/**
* producer2 發送消息同步等到發送成功
*/
private static void sendMessageSync() throws Exception{
ProducerRecord<String,String> record = new ProducerRecord<>("kafkatest","name","Sync");
RecordMetadata result = producer.send(record).get();
System.out.println("時間戳,主題,分區,位移: " + result.timestamp() + "," + record.topic() + "," + result.partition() + "," + result.offset());
producer.close();
}
/**
* producer3 發送消息異步回調返回消息
*/
private static void sendMessageCallBack(){
ProducerRecord<String,String> record ;
while(true){
record= new ProducerRecord<>("kafkatest","name","CallBack");
producer.send(record,new MyProducerCallBack());
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
}
/producer.close();
}
private static class MyProducerCallBack implements Callback{
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(null != e){
e.printStackTrace();
return;
}
System.out.println("時間戳,主題,分區,位移: " + recordMetadata.timestamp() + ", " + recordMetadata.topic() + "," + recordMetadata.partition() + " " + recordMetadata.offset());
}
}
public static void main(String args[]) throws Exception{
// MyProducer.sendMessageForgotResult();
// MyProducer.sendMessageSync();
MyProducer.sendMessageCallBack();
}
}