Kafka的接口回調 +自定義分區、攔截器


一、接口回調+自定義分區

  1.接口回調:在使用消費者的send方法時添加Callback回調

 

producer.send(new ProducerRecord<String, String>("xinnian", "20" + i + "年新年好!"), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata!=null){
System.out.println(recordMetadata.topic()+"-----"+recordMetadata.offset()+"-----"+recordMetadata.partition());
}
}
 2.自定義分區:定義類實現Patitioner接口,實現接口的方法:
   設置configure、分區邏輯partition(return 1;)、釋放資源close、在生產者的配置過程中添加入分區屬性。
 在定義生產者屬性時添加分區的屬性即可
/**
 * @author: PrincessHug
 * @date: 2019/2/28, 16:24
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class PartitionDemo implements Partitioner {
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        return 1;
    }

    public void close() {

    }

    public void configure(Map<String, ?> map) {

    }
}

public class ProducerDemo {
    public static void main(String[] args) {
        Properties prop = new Properties();

        //參數配置
        //kafka節點的地址
        prop.put("bootstrap.servers", "192.168.126.128:9092");
        //發送消息是否等待應答
        prop.put("acks", "all");
        //配置發送消息失敗重試
        prop.put("retries", "0");
        //配置批量處理消息大小
        prop.put("batch.size", "10241");
        //配置批量處理數據延遲
        prop.put("linger.ms","5");
        //配置內存緩沖大小
        prop.put("buffer.memory", "12341235");
        //消息在發送前必須序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("partitioner.class", "PartitionDemo");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

        for (int i=10;i<100;i++){
            producer.send(new ProducerRecord<String, String>("xinnian", "20" + i + "年新年好!"), new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (recordMetadata!=null){
                        System.out.println(recordMetadata.topic()+"-----"+recordMetadata.offset()+"-----"+recordMetadata.partition());
                    }
                }
            });
        }
        producer.close();
    }
}

  注意:在自定義分區后,你的消費者會收不到消息,因為消費者默認接收的分區為0。

 

二、攔截器

  1)創建生產者類;
     2)創建自定義攔截器類實現ProducerInterceptor接口,重寫抽象方法;
     3)在業務邏輯方法ProducerRecord方法中,修改返回值,
        return new ProducerRecord<String,String>(
        record.topic(),
        record.partiiton(),
        record.key(),
        System.currentTimeMillis() + "-" + record.value() + "-" + record.topic());
     4)在生產者類中將自定義攔截器生效
       prop.put(ProducerConfig.INTERCEPTOR_CLASSEA_CONFIG,"com.wyh.com.wyh.kafka.interceptor.TimeInterceptor");
     5)運行生產者main方法,或者在linux端用shell測試。

/**
 * @author: PrincessHug
 * @date: 2019/2/28, 20:59
 * @Blog: https://www.cnblogs.com/HelloBigTable/
 */
public class TimeInterceptor implements ProducerInterceptor<String, String> {

    //業務邏輯
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        return new ProducerRecord<String,String>(
                producerRecord.topic(),
                producerRecord.partition(),
                producerRecord.key(),
                System.currentTimeMillis()+"--"+producerRecord.value()
        );
    }

    //發送失敗調用
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    //釋放資源
    public void close() {

    }

    //獲取配置信息
    public void configure(Map<String, ?> map) {

    }
}

public class ItctorProducer {
    public static void main(String[] args) {
        //配置生產者屬性
        Properties prop = new Properties();
        //kafka節點的地址
        prop.put("bootstrap.servers", "192.168.126.128:9092");
        //發送消息是否等待應答
        prop.put("acks", "all");
        //配置發送消息失敗重試
        prop.put("retries", "0");
        //配置批量處理消息大小
        prop.put("batch.size", "1024");
        //配置批量處理數據延遲
        prop.put("linger.ms","5");
        //配置內存緩沖大小
        prop.put("buffer.memory", "12341235");
        //消息在發送前必須序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //添加攔截器
        ArrayList<String> inList = new ArrayList<String>();
        inList.add("interceptor.TimeInterceptor");
        prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,inList);

        //實例化producer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

        //發送消息
        for (int i=0;i<99;i++){
            producer.send(new ProducerRecord<String, String>("xinnian","You are genius!"+i));
        }

        //釋放資源
        producer.close();
        
    }
}

 


  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM