Kafka2.4發布——新特性介紹(附Java Api Demo代碼)


file

新功能

  • 允許消費者從最近的副本進行獲取

  • 為 Consumer Rebalance Protocol 增加對增量協同重新均衡(incremental cooperative rebalancing)的支持

  • 新增 MirrorMaker 2.0 (MM2),新的多集群跨數據中心復制引擎

  • 引入新的 Java 授權程序接口

  • 支持 KTable 中的非密鑰連接

  • 用於重新分配副本的 Administrative API

  • 保護內部連接的 REST 端點

  • 新增刪除消費者偏移並通過 AdminClient 公開的 API

改進

  • [KAFKA-5609] - 連接 log4j 會默認記錄到文件

  • [KAFKA-6263] - 為群組的元數據加載持續時間暴露指標(Metric)

  • [KAFKA-6883] - KafkaShortnamer 允許將 Kerberos 主體名稱轉換為大寫用戶名

  • [KAFKA-6958] - 允許使用 KStreams DSL 定義自定義處理器名稱

  • [KAFKA-7018] - 持久使用 memberId 以重新啟動消費者

  • [KAFKA-7149] - 減少分配數據大小以提高 kafka 流的可伸縮性

  • [KAFKA-7190] - 在數據傳輸擁擠的情況下,清除分區 topic 會引起關於 UNKNOWN_PRODUCER_ID 的 WARN 語句

  • [KAFKA-7197] - 升級至 Scala 2.13.0

2.4 Java Api Demo

這里使用官網推薦的,kafka-client 方便 靈活

引入依賴:

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
</dependency>

生產者示例:

public class SimpleProvider {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("topic", "message"+i));
            System.out.println("message"+i);
        }
        kafkaProducer.close();
    }

}

消費者示例:

public class SingleApplication {
    public static void main(String[] args) {


        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("auto.offset.reset","earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
        try{
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        }finally{
            consumer.close();
        }
    }
}

其他多線程等示例,詳見Github地址:

https://github.com/tree1123/Kafka-Demo-2.4

大數據流動 專注於大數據實時計算,數據治理,數據可視化等技術分享與實踐。
請在后台回復關鍵字下載相關資料。相關學習交流群已經成立,歡迎加入~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM