新功能
-
允許消費者從最近的副本進行獲取
-
為 Consumer Rebalance Protocol 增加對增量協同重新均衡(incremental cooperative rebalancing)的支持
-
新增 MirrorMaker 2.0 (MM2),新的多集群跨數據中心復制引擎
-
引入新的 Java 授權程序接口
-
支持 KTable 中的非密鑰連接
-
用於重新分配副本的 Administrative API
-
保護內部連接的 REST 端點
-
新增刪除消費者偏移並通過 AdminClient 公開的 API
改進
-
[KAFKA-5609] - 連接 log4j 會默認記錄到文件
-
[KAFKA-6263] - 為群組的元數據加載持續時間暴露指標(Metric)
-
[KAFKA-6883] - KafkaShortnamer 允許將 Kerberos 主體名稱轉換為大寫用戶名
-
[KAFKA-6958] - 允許使用 KStreams DSL 定義自定義處理器名稱
-
[KAFKA-7018] - 持久使用 memberId 以重新啟動消費者
-
[KAFKA-7149] - 減少分配數據大小以提高 kafka 流的可伸縮性
-
[KAFKA-7190] - 在數據傳輸擁擠的情況下,清除分區 topic 會引起關於 UNKNOWN_PRODUCER_ID 的 WARN 語句
-
[KAFKA-7197] - 升級至 Scala 2.13.0
2.4 Java Api Demo
這里使用官網推薦的,kafka-client 方便 靈活
引入依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
生產者示例:
public class SimpleProvider {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
for (int i = 1; i <= 600; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("topic", "message"+i));
System.out.println("message"+i);
}
kafkaProducer.close();
}
}
消費者示例:
public class SingleApplication {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
try{
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}finally{
consumer.close();
}
}
}
其他多線程等示例,詳見Github地址:
https://github.com/tree1123/Kafka-Demo-2.4
大數據流動 專注於大數據實時計算,數據治理,數據可視化等技術分享與實踐。
請在后台回復關鍵字下載相關資料。相關學習交流群已經成立,歡迎加入~