Java實現Kafka Producer方法


import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class MyProducer {
  private static KafkaProducer<String,String> producer;
  static{
    Properties kfkProperties = new Properties();
    kfkProperties.put("bootstrap.servers","slave1:9092");
    kfkProperties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    kfkProperties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    producer = new KafkaProducer<>(kfkProperties);
  }

  /**
  * producer1 發送消息不考慮返回信息
  */
  private static void sendMessageForgotResult(){
    ProducerRecord<String,String> record = new ProducerRecord<>("kafkatest","name","ForgetResult");
    producer.send(record);
    producer.close();
  }

  /**
  * producer2 發送消息同步等到發送成功
  */

  private static void sendMessageSync() throws Exception{
    ProducerRecord<String,String> record = new ProducerRecord<>("kafkatest","name","Sync");
    RecordMetadata result = producer.send(record).get();

    System.out.println("時間戳,主題,分區,位移: " + result.timestamp() + "," + record.topic() + "," + result.partition() + "," + result.offset());

    producer.close();
  }

  /**
  * producer3 發送消息異步回調返回消息
  */
  private static void sendMessageCallBack(){
    ProducerRecord<String,String> record ;
    while(true){
      record= new ProducerRecord<>("kafkatest","name","CallBack");
      producer.send(record,new MyProducerCallBack());
      try{
        Thread.sleep(1000);
        }catch (Exception e){
        e.printStackTrace();
      }
    }


    /producer.close();
  }
  private static class MyProducerCallBack implements Callback{

  @Override
  public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if(null != e){
      e.printStackTrace();
      return;
    }
    System.out.println("時間戳,主題,分區,位移: " + recordMetadata.timestamp() + ", " + recordMetadata.topic() + "," + recordMetadata.partition() + " " + recordMetadata.offset());
  }
}

  public static void main(String args[]) throws Exception{
    // MyProducer.sendMessageForgotResult();
    // MyProducer.sendMessageSync();
    MyProducer.sendMessageCallBack();
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM