【kafka】生產者API 回調 同步


普通實現


public class MyProducer {
    public static void main(String[] args) {
        /**
         * 創建Kafka生產者配置信息:ProducerConfig類中記錄了Kafka需要的所有參數信息
         * 1.指定連接的Kafka集群
         * 2.ack應答級別
         * 3.發送失敗的重試次數
         * 4.批次大小(一次發送多少大小數據)
         * 5.等待時間
         * 6.RecordAccumulator緩沖區大小
         * 7.指定key,value序列化類
         */
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 1);
        properties.put("batch.size", 16384);
        properties.put("liner.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /**
         *  通過配置文件創建生產者對象
         */
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
        // 創建記錄ProducerRecord("Topic","partition","key","value")
            ProducerRecord<String, String> message =
                    new ProducerRecord<String, String>("test", 0,"MyProducer","hello" + i);
            // send:異步方法,發送之后,立即返回,並不是說調用了,就真的發送成功了;
            kafkaProducer.send(message);
        }
        // 關閉連接:會清空內存
        kafkaProducer.close();
    }
}

同步實現

public class MyProducerFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 1);
        properties.put("batch.size", 16384);
        properties.put("liner.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> message = new ProducerRecord<String, String>("test", "hello" + i);
            /**
             * 同步發送,send返回 Future對象
             * 調用get()
             * 返回RecordMetadata元數據記錄,記錄了發送的topic,partition,offset
             */
            RecordMetadata metadata = kafkaProducer.send(message).get();
            String topic = metadata.topic();
            int partition = metadata.partition();
            long offset = metadata.offset();
            System.out.println("Topic=>"+topic+" partition=>"+partition + " offset=>" +offset);
        }
        // 關閉連接:會清空內存
        kafkaProducer.close();
    }
}


回調實現

public class MyProducerCallback {
    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 1);
        properties.put("batch.size", 16384);
        properties.put("liner.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> message = new ProducerRecord<String, String>("test", "messuesein--" + i);
            /**
             * 發送消息:帶回調
             * 傳入CallBack函數接口,參數:
             * 1. RecordMetadata:成功返回元數據記錄
             * 2. Exception:失敗返回異常
             */
            kafkaProducer.send(message, (metadata, exception) -> {
                // exception==null,即成功
                if (exception == null) {
                    /**
                     * metadata記錄元數據信息
                     */
                    String topic = metadata.topic();
                    int partition = metadata.partition();
                    long offset = metadata.offset();
                    System.out.println("Topic=>"+topic+" partition=>"+partition + " offset=>" +offset);
                } else {
                    exception.printStackTrace();
                }
            });
        }
        // 關閉連接:會清空內存
        kafkaProducer.close();
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM