1.概述
Kafka是一個分布表示實時數據流平台,可獨立部署在單台服務器上,也可部署在多台服務器上構成集群。它提供了發布與訂閱的功能,用戶可以發送數據到Kafka集群中,也可以從Kafka集群中讀取數據。之前在Kafka 2.8.0版本時,Kafka社區提出了KRaft協議的概念,現在社區發布了Kafka 3.0,里面涉及優化和新增了很多功能,其中就包含KRaft協議的改機。今天,筆者就給大家介紹一下Kafka 3.0新增了哪些特性以及優化了哪些功能。
2.內容
在 Kafka 3.0 中包含了許多重要的新功能,其中比較顯著的變化如下所示:
- 棄用對Java 8 和Scala 2.12 的支持;
- Kafka Raft 支持元數據主題的快照以及自動管理仲裁中的其他改進;
- 默認情況下為Kafka 生產者提供更加強大的交付保證;
- 棄用消息格式 v0 和 v1;
- OffsetFetch 和 FindCoordinator 請求中的優化;
- 更靈活的 Mirror Maker 2 配置和 Mirror Maker 1 的棄用;
- 能夠在 Kafka Connect 中的單個調用中重新其中連接器的任務;
- 現在默認啟用連接器日志上下文和連接器客戶單覆蓋;
- Kafka Streams 中時間戳同步的增強語義;
- 改進了 Stream 和 TaskId 的公共 API;
- Kafka 中的默認 serde 變為 null。
2.1 關於升級到 Kafka 3.0
在Kafka 3.0中,社區對於Zookeeper的版本已經升級到3.6.3了,其中我們可以預覽 KRaft 模式,但是無法從 2.8 或者更早的版本升級到該模式。許多實現依賴 jar 現在在運行時類路勁中可用,而不是在編譯和運行時類路勁中。升級后的編譯錯誤可以通過顯示添加缺少的依賴 jar 或更新應用程序以不使用內部類來修復。
消費者配置的默認值 session.timeout.ms 從10 秒增加到了45 秒,而Broker配置 log.message.format.version 和 Topic 配置 message.format.version 已經被啟用。兩種配置的值始終假定為 3.0 或者更高,通過 inter.broker.protocol.version 來配置。如果設置了 log.message.format.version 或者 message.format.version 建議在升級到 3.0的同時清理掉這兩個屬性,同時設置 inter.broker.protocol.version 值為 3.0 。
Streams API 刪除了在 2.5.0 或者更早版本中棄用的所有棄用 API,Kafka Streams 不再對“connect:json”模塊有編譯時的依賴,依賴此傳遞依賴項的項目必須明確聲明它。
現在,通過指定的自定義主體構建起實現 principal.builder.class 現在必須實現 KafkaPrincipalSerde 接口以允許Broker 之間的轉發。另外,一些過時的類,方法和工具以及從clients、connect、core、和tools模塊進行了刪除。
該Producer#sendOffsetsToTransaction(Map offsets, String consumerGroupId)方法已被棄用。請使用 Producer#sendOffsetsToTransaction(Map offsets, ConsumerGroupMetadata metadata)來替換,ConsumerGroupMetadata 可以通過檢索KafkaConsumer#groupMetadata()更強大的語義。需要注意的是,完整的消費者組元數據集只有 Brokers 或 2.5 或更高版本才能支持,因此你必須升級你的 Kafka 集群以獲得更強的語義。否則,你可以通過new ConsumerGroupMetadata(consumerGroupId)與較老版本的Broker進行交互。
連接器中 internal.key.converter 和 internal.value.converter 屬性已被完全刪除。自版本 2.0.0 起,不推薦使用這些 Connect 工作器屬性。現在被硬編碼為使用 schemas.enable 設置為的 JSON 轉換器false。如果你的集群一直在使用不同的內部鍵或值轉換器,你可以按照官網文檔中概述的遷移步驟,將你的 Connect 集群安全地升級到 3.0。 基於 Connect 的 MirrorMaker (MM2) 包括對支持的更改IdentityReplicationPolicy,無需重命名 Topic 即可啟用復制。DefaultReplicationPolicy默認情況下仍然使用現有的,但可以通過 replication.policy 配置屬性啟用身份復制 。這對於從舊版 MirrorMaker (MM1) 遷移的用戶,或者對於不希望 Topic 重命名的具有簡單單向復制拓撲的用例特別有用。請注意IdentityReplicationPolicy與 DefaultReplicationPolicy 不同,無法根據 Topic 名稱阻止復制循環,因此在構建復制拓撲時要注意避免循環。
2.1.1 目的
雖然 internal.key.converter 和 internal.value.converter 中 Connect 工作器屬性,以及以這些名稱為前綴的所有屬性都已棄用,但是有時候用戶仍會嘗試使用這些屬性進行調試,在與未棄用的Key 和 Value轉化器相關的屬性意外混淆后,或者只是對其進行盲目的配置后,進行調試。這些實驗的結果可能會產生不好的后果,配置了新內保轉換器卻無法讀取具有較舊內部轉換器的內保 Topic 數據,這最多會導致偏移量和連機器配置的丟失。
以下連接屬性會將被刪除:
- internal.key.converter
- internal.value.converter
- internal.key.converter. # 以工作器內部密鑰轉換器為前綴的屬性
- internal.value.converter. # 以工作線程的內部值轉換器為前綴的屬性
Connect 的行為就好像上面沒有提供一樣。具體來說,對於它的鍵和值轉換器,它將使用開箱即用的 JsonConverter,配置為 schemas.enable 屬性值為 false 。
2.1.2 升級步驟
運行未使用JsonConverter 並對 schemas.enable 設置 false 的 Connect 集群用戶,可以按照以下步驟將其 Connect 集群升級到 3.0:
- 停止集群上的所有工作線程
- 對於每個內部主題(配置、偏移量和狀態):
- 創建一個新主題來代替現有主題
- 對於現有主題中的每條消息:
- 使用 Connect 集群的舊內部鍵和值轉換器反序列化消息的鍵和值
- 使用 禁用模式的JSON 轉換器序列化消息的鍵和值(通過將schemas.enable屬性設置為false)
- 用新的鍵和值向新的內部主題寫一條消息
- 重新配置每個 Connect worker 以使用步驟 2 中新創建的內部主題
- 啟動集群上的所有worker
2.2 新功能
在本次 Kafka 3.0 版本中新增了以下功能:
- 添加了InsertHeader 和 DropHeader 連接轉換
- 在 KRaft 模式中實現 createPartitions
- 如果分區從 fetcher 中刪除,副本 fetcher 不應在發散時期更新分區狀態
2.2.1 添加 InsertHeader 和 DropHeader
之前在核心 Kafka 產品中引入了 Headers,在 Kafka Connect Framework 中公開它們將是有利的。Kafka 的 Header 是帶有二進制值的簡單名稱,而 Connect API 已經有一個非常有用的層來處理不同類型的數據。Connect 的 Header 支持應該使用像 Kafka 這樣的字符串名稱,但使用與 Connect 記錄鍵和值相同的類型來表示值。這將提供與 Connect 框架的其余部分的一致性,並使連接器和轉換能夠輕松地訪問、修改和創建記錄上的 Header。
Kafka 將 Header 定義為具有字符串名稱和二進制值,但 Connect 將使用用於記錄鍵和值的相同機制來表示 Header 值。每個 Header 值可能有一個對應的 Schema,允許連接器和轉換以一致的方式處理 Header 值、記錄鍵和記錄值。Connect 將定義一種 HeaderConverter 機制以類似於Converter框架的方式序列化和反序列化標頭值 ,這樣現有的 Converter實現也可以實現 HeaderConverter. 由於來自不同供應商的連接器和轉換可能被組合到單個管道中,因此不同的連接器和轉換可以輕松地將 Header 值從原始形式轉換為連接器和/或轉換期望的類型,這一點很重要。
注意:
為了簡潔和清晰,顯示的代碼不包括 JavaDoc,但提議的更改確實包括所有公共 API 和方法的 JavaDoc。
1.Connect Header 和 Header API
org.apache.kafka.connect.Header 將添加一個新接口並用作記錄上單個標頭的公共 API。該接口為鍵、值和值的模式定義了簡單的 getter。這些是不可變對象,還有一些方法可以創建Header具有不同名稱或值的新對象。代碼片段如下所示:
package org.apache.kafka.connect.header; public interface Header { // Access the key and value String key(); // never null Schema schema(); // may be null Object value(); // may be null // Methods to create a copy Header with(Schema schema, Object value); Header rename(String key); }
org.apache.kafka.connect.Headers 還將添加一個新接口並用作記錄標題有序列表的公共 API。這是在 Kafka 客戶端的 org.apache.kafka.common.header.Headers接口之后作為標題的有序列表進行模式化的,其中允許多個具有相同名稱的標題。Connect Headers接口定義了Header按順序和/或按名稱訪問各個 對象以及獲取有關Header對象數量的信息的方法 。它還定義了Header使用各種簽名來添加、刪除和保留 對象的方法,這些簽名將易於連接器和轉換使用。由於多個Header對象可以具有相同的名稱,因此轉換需要一種簡單的方法來修改和/或刪除現有Header對象, apply(HeaderTransform) 並且apply(String, HeaderTransform) 方法可以輕松使用自定義 lambda 函數來執行此操作。代碼片段如下所示:
package org.apache.kafka.connect.header; public interface Headers extends Iterable<Header> { // Information about the Header instances int size(); boolean isEmpty(); Iterator<Header> allWithName(String key); Header lastWithName(String key); // Add Header instances to this object Headers add(Header header); Headers add(String key, SchemaAndValue schemaAndValue); Headers add(String key, Object value, Schema schema); Headers addString(String key, String value); Headers addBoolean(String key, boolean value); Headers addByte(String key, byte value); Headers addShort(String key, short value); Headers addInt(String key, int value); Headers addLong(String key, long value); Headers addFloat(String key, float value); Headers addDouble(String key, double value); Headers addBytes(String key, byte[] value); Headers addList(String key, List<?> value, Schema schema); Headers addMap(String key, Map<?, ?> value, Schema schema); Headers addStruct(String key, Struct value); Headers addDecimal(String key, BigDecimal value); Headers addDate(String key, java.util.Date value); Headers addTime(String key, java.util.Date value); Headers addTimestamp(String key, java.util.Date value); // Remove and/or retain the latest Header Headers clear(); Headers remove(String key); Headers retainLatest(String key); Headers retainLatest(); // Create a copy of this Headers object Headers duplicate(); // Apply transformations to named or all Header objects Headers apply(HeaderTransform transform); Headers apply(String key, HeaderTransform transform); interface HeaderTransform { Header apply(Header header); } }
2.Connect Records
每條 Kafka 消息都包含零個或多個標頭名稱-值對,因此 Connect 記錄類將被修改為具有Headers可以就地修改的非空對象。現有的 ConnectRecord 抽象類是兩個基類 SourceRecord和 SinkRecord,並且將被改變為具有新的 headers填充字段 ConnectHeaders對象。所有現有構造函數和方法的簽名都將保持不變以保持后向兼容性,但現有構造函數將headers使用ConnectHeaders對象填充新字段。而且, toString(), hashCode()和 equalTo(Object)方法將改為使用新的 headers領域。
一個新的構造函數和幾個新方法將被添加到這個現有的類中,代碼片段如下所示:
package org.apache.kafka.connect.connector; public abstract class ConnectRecord<R extends ConnectRecord<R>> { /* The following will be added to this class */ private final Headers headers; public ConnectRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) { this(topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp); if (headers == null) { this.headers = new ConnectHeaders(); } else if (headers instanceof ConnectHeaders) { this.headers = (ConnectHeaders)headers; } else { this.headers = new ConnectHeaders(headers); } } public Headers headers() { return headers; } public abstract R newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers); }
現有的 SourceRecord類將被修改以添加一個新的構造函數並實現附加 newRecord(...)方法。同樣,所有現有構造函數和方法的簽名將保持不變以保持向后兼容性。代碼片段如下所示:
package org.apache.kafka.connect.source; public class SourceRecord extends ConnectRecord<SourceRecord> { /* The following will be added to this class */ public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) { super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers); this.sourcePartition = sourcePartition; this.sourceOffset = sourceOffset; } @Override public SourceRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) { return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, headers); } }
同樣,SinkRecord 將修改現有 類以添加新的構造函數並實現附加 newRecord(...) 方法。同樣,所有現有構造函數和方法的簽名將保持不變以保持向后兼容性。代碼片段如下所示:
package org.apache.kafka.connect.sink; public class SinkRecord extends ConnectRecord<SinkRecord> { /* The following will be added to this class */ public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset, Long timestamp, TimestampType timestampType, Iterable<Header> headers) { super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers); this.kafkaOffset = kafkaOffset; this.timestampType = timestampType; } @Override public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers) { return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers); } }
3.序列化與反序列化
本次更新中添加了一個新 org.apache.kafka.connect.storage.HeaderConverter 接口,該org.apache.kafka.connect.storage.Converter接口在現有接口的基礎上進行了模式化, 但具有特定於 Header 的方法名稱和簽名。代碼片段如下所示:
package org.apache.kafka.connect.storage; public interface HeaderConverter extends Configurable, Closeable { /** * Convert the header name and byte array value into a {@link Header} object. * @param topic the name of the topic for the record containing the header * @param headerKey the header's key; may not be null * @param value the header's raw value; may be null * @return the {@link SchemaAndValue}; may not be null */ SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value); /** * Convert the {@link Header}'s {@link Header#valueAsBytes() value} into its byte array representation. * @param topic the name of the topic for the record containing the header * @param headerKey the header's key; may not be null * @param schema the schema for the header's value; may be null * @param value the header's value to convert; may be null * @return the byte array form of the Header's value; may be null if the value is null */ byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value); /** * Configuration specification for this set of header converters. * @return the configuration specification; may not be null */ ConfigDef config(); }
需要注意的是,不同的是 Converter,新 HeaderConverter接口擴展了 Configurable 現在對於可能具有附加配置屬性的 Connect 接口通用的接口。
現有實現 Converter 也可能實現 HeaderConverter,並且ConverterConnect 中的所有三個現有 實現都將相應地更改以通過序列化/反序列化 Header 值來實現這個新接口,類似於它們序列化/反序列化鍵和值的方式:
- StringConverter
- ByteArrayConverter
- JsonConverter
HeaderConverter 將添加一個新實現來將所有內置原語、數組、映射和結構與字符串表示形式相互轉換。與StringConverter使用 toString()方法的不同 SimpleHeaderConverter,除了不帶引號的簡單字符串值之外, 使用類似 JSON 的表示形式表示基本類型、數組、映射和結構。這種形式直接對應於許多開發人員認為將值序列化為字符串的方式,並且可以 SimpleHeaderConverter解析這些任何和所有這樣的值,並且大部分時間來推斷正確的模式。因此,這將用於HeaderConverterConnect 工作程序中使用的默認值 。
下表描述了SimpleHeaderConverter將如何持久化這些值,表格如下:
類型 | 描述 | 例子 |
BOOLEAN | true或者false | |
BYTE_ARRAY | 字節數組的Base64編碼字符串 | |
INT8 | Java字節的字符串表示形式 | |
INT16 | Java Short的字符串表示形式 | |
INT32 | Java Int的字符串表示形式 | |
INT64 | Java Long的字符串表示形式 | |
FLOAT32 | Java 浮點數的字符串表示形式 | |
FLOAT64 | Java Double的字符串表示形式 | |
STRING | 字符串的UTF-8表示 | |
ARRAY | 數組的類似 JSON 的表示形式。數組值可以是任何類型,包括基本類型和非基本類型。 | |
MAP | 類似 JSON 的表示形式。盡管大多數正確創建的映射都具有相同類型的鍵和值,但也支持具有任何鍵和值的映射。映射值可以是任何類型,包括基本類型和非基本類型。 | { "foo": "value", "bar": "strValue", "baz": "other" } |
STRUCT | 類似 JSON 的表示形式。Struct 對象可以序列化,但反序列化時將始終解析為映射,因為模式不包含在序列化形式中。 | { "foo": true, "bar": "strValue", "baz": 1234 } |
DECIMAL | 對應的字符串表示java.math.BigDecimal。 | |
TIME | IOS-8601 時間表示,格式為“HH:mm:ss.SSS'Z'”。 | 16:31:05.387UTC |
DATE | 日期的 ISO-8601 表示,格式為“YYYY-MM-DD”。 | 2021-09-25 |
TIMESTAMP | 時間戳的 ISO-8601 表示,格式為“YYYY-MM-DD'T'HH:mm:ss.SSS'Z'”。 | 2021-09-25T 16:31:05.387UTC |
4.屬性配置
Connect 工作器需要配置為使用 HeaderConverter 實現,因此header.converter 將定義一個名為的附加工作器配置 ,默認為 SimpleHeaderConverter. 具有相同名稱和默認值的類似配置屬性將添加到連接器配置中,允許連接器覆蓋工作程序的 Header 轉換器。請注意,每個連接器任務都有自己的標頭轉換器實例,就像鍵和值轉換器一樣。
5.轉換 Header 值
每個 Header 都有一個可由接收器連接器和簡單消息轉換使用的值。但是,標頭值的類型首先取決於標頭的創建方式以及它們的序列化和反序列化方式。將添加一組新的轉換實用程序方法,使 SMT 和接收器連接器可以輕松地將標頭值轉換為易於使用的類型。這些轉換可能需要原始架構和值。與字符串之間的轉換使用與上述相同的機制SimpleHeaderConverter。
例如,SMT 或接收器連接器可能期望標頭值為 long,並且可以使用這些實用方法來轉換任何數值(例如,int、short、String、BigDecimal 等)。或者,接收器連接器可能需要 Timestamp 邏輯數據類型,因此它可以使用該 Values.convertToTimestamp(s,v) 方法從時間戳或日期的任何 ISO-8601 格式字符串表示轉換,或表示為 long 或字符串的過去紀元的毫秒數。
這些實用方法可用於 Header 值或鍵、值或結構、數組和映射中的任何值。代碼片段如下所示:
package org.apache.kafka.connect.data; public class Values { // All methods return null when value is null, and throw a DataException // if the value cannot be converted to the desired type. // If the value is already the desired type, these methods simply return it. public static Boolean convertToBoolean(Schema schema, Object value) throws DataException {...} public static Byte convertToByte(Schema schema, Object value) throws DataException {...} public static Short convertToShort(Schema schema, Object value) throws DataException {...} public static Integer convertToInteger(Schema schema, Object value) throws DataException {...} public static Long convertToLong(Schema schema, Object value) throws DataException {...} public static Float convertToFloat(Schema schema, Object value) throws DataException {...} public static Double convertToDouble(Schema schema, Object value) throws DataException {...} public static String convertToString(Schema schema, Object value) {...} public static java.util.Date convertToTime(Schema schema, Object value) throws DataException {...} public static java.util.Date convertToDate(Schema schema, Object value) throws DataException {...} public static java.util.Date convertToTimestamp(Schema schema, Object value) throws DataException {...} public static BigDecimal convertToDecimal(Schema schema, Object value, int scale) throws DataException {...} // These only support converting from a compatible string form, which is the same // format used in the SimpleHeaderConverter described above public static List<?> convertToList(Object value) {...} public static Map<?, ?> convertToMap(Object value) {...} // Only supports returning the value if it already is a Struct. public static Struct convertToStruct(Object value) {...} }
3.優化與調整
在 Kafka 3.0 中優化和調整了以下內容:
- [ KAFKA-3745 ] - 考慮向 ValueJoiner 接口添加連接鍵
- [ KAFKA-4793 ] - Kafka Connect: POST /connectors/(string: name)/restart 不會啟動失敗的任務
- [ KAFKA-5235 ] - GetOffsetShell:支持多個主題和消費者配置覆蓋
- [ KAFKA-6987 ] - 用 CompletableFuture 重新實現 KafkaFuture
- [ KAFKA-7458 ] - 在引導階段避免強制處理
- [ KAFKA-8326 ] - 添加 Serde> 支持
- [ KAFKA-8372 ] - 刪除不推薦使用的 RocksDB#compactRange API
- [ KAFKA-8478 ] - 在強制處理之前輪詢更多記錄
- [ KAFKA-8531 ] - 更改默認復制因子配置
- [ KAFKA-8613 ] -對流中的窗口操作強制使用寬限期
- [ KAFKA-8897 ] - RocksDB 增加版本
- [ KAFKA-9559 ] - 將默認的“默認 serde”從 ByteArraySerde 更改為 null
- [ KAFKA-9726 ] - MM2 模仿 MM1 的 IdentityReplicationPolicy
- [ KAFKA-10062 ] - 添加一種方法來檢索 Streams 應用程序已知的當前時間戳
- [ KAFKA-10201 ] - 更新代碼庫以使用更具包容性的術語
- [ KAFKA-10449 ] - Connect-distributed 示例配置文件沒有針對偵聽器的說明
- [ KAFKA-10585 ] - Kafka Streams 應該從清理中清理狀態存儲目錄
- [ KAFKA-10619 ] - Producer 將默認啟用 EOS
- [ KAFKA-10675 ] - 來自 ConnectSchema.validateValue() 的錯誤消息應包括架構的名稱。
- [ KAFKA-10697 ] - 刪除 ProduceResponse.responses
- [ KAFKA-10746 ] - 消費者輪詢超時到期應記錄為警告而不是信息。
- [ KAFKA-10767 ] - 為 ThreadCacheTest 中缺少的方法添加單元測試用例
- [ KAFKA-10769 ] - 刪除 JoinGroupRequest#containsValidPattern 因為它與 Topic#containsValidPattern 重復
- [ KAFKA-10885 ] - 重構 MemoryRecordsBuilderTest/MemoryRecordsTest 以避免大量(不必要的)被忽略的測試用例
- [ KAFKA-12177 ] - 保留不是冪等的
- [ KAFKA-12234 ] - 擴展 OffsetFetch 請求以接受多個組 ID。
- [ KAFKA-12287 ] - 當按時間戳或持續時間重置偏移量找不到偏移量並默認為最新時,在消費者組上添加警告日志記錄。
- [ KAFKA-12288 ] - 刪除任務級文件系統鎖
- [ KAFKA-12294 ] - 考慮使用轉發機制來創建元數據自動主題
- [ KAFKA-12313 ] - 考慮棄用 default.windowed.serde.inner.class 配置
- [ KAFKA-12329 ] - 當主題不存在時,kafka-reassign-partitions 命令應該給出更好的錯誤信息
- [ KAFKA-12335 ] - 將 junit 從 5.7.0 升級到 5.7.1
- [ KAFKA-12344 ] - 在 Scala API 中支持 SlidingWindows
- [ KAFKA-12347 ] - 提高 Kafka Streams 跟蹤進度的能力
- [ KAFKA-12349 ] - 跟進 KIP-500 中的 PartitionEpoch
- [ KAFKA-12362 ] - 確定任務是否空閑
- [ KAFKA-12379 ] - KIP-716:允許使用 MirrorMaker2 配置 offsetsync 主題的位置
- [ KAFKA-12396 ] - 收到空密鑰時kstreams 的專用異常
- [ KAFKA-12398 ] - 修復脆弱的測試 `ConsumerBounceTest.testClose`
- [ KAFKA-12408 ] - 文檔省略了 ReplicaManager 指標
- [ KAFKA-12409 ] - ReplicaManager 中的計量器泄漏
- [ KAFKA-12415 ] - 為 Gradle 7.0 做准備並限制非 api 依賴項的傳遞范圍
- [ KAFKA-12419 ] - 刪除 3.0 中棄用的 Kafka Streams API
- [ KAFKA-12436 ] - 棄用 MirrorMaker v1
- [ KAFKA-12439 ] - 在 KIP-500 模式下,我們應該能夠為被圍欄的節點分配新的分區
- [ KAFKA-12442 ] - 將 ZSTD JNI 從 1.4.8-4 升級到 1.4.9-1
- [ KAFKA-12454 ] - 當當前 kafka 集群中不存在給定的 brokerIds 時,在 kafka-log-dirs 上添加錯誤日志記錄
- [ KAFKA-12464 ] - 增強約束粘性分配算法
- [ KAFKA-12479 ] - 在 ConsumerGroupCommand中將分區偏移請求合並為單個請求
- [ KAFKA-12483 ] - 默認情況下在連接器配置中啟用客戶端覆蓋
- [ KAFKA-12484 ] - 默認情況下啟用 Connect 的連接器日志上下文
- [ KAFKA-12499 ] - 根據 Streams EOS 上的提交間隔調整事務超時
- [ KAFKA-12509 ] - 加強 StateDirectory 線程鎖定
- [ KAFKA-12541 ] - 擴展 ListOffset 以獲取具有最大時間戳的偏移量 (KIP-734)
- [ KAFKA-12573 ] - 刪除了不推薦使用的`Metric#value`
- [ KAFKA-12574 ] - 棄用 eos-alpha
- [ KAFKA-12577 ] - 刪除不推薦使用的 `ConfigEntry` 構造函數
- [ KAFKA-12584 ] - 刪除不推薦使用的 `Sum` 和 `Total` 類
- [ KAFKA-12591 ] - 刪除不推薦使用的 `quota.producer.default` 和 `quota.consumer.default` 配置
- [ KAFKA-12612 ] - 從 3.0 中的 ConsumerRecord/RecordMetadata 中刪除校驗和
- [ KAFKA-12614 ] - 使用 Jenkinsfile 進行主干和發布分支構建
- [ KAFKA-12620 ] - 控制器生成的生產者 ID
- [ KAFKA-12637 ] - 刪除不推薦使用的 PartitionAssignor 接口
- [ KAFKA-12662 ] - 為 ProducerPerformance 添加單元測試
- [ KAFKA-12663 ] - 更新 FindCoordinator 以一次解析多個 Coordinator
- [ KAFKA-12675 ] - 提高粘性通用分配器的可擴展性和性能
- [ KAFKA-12779 ] - TaskMetadata 應該返回實際的 TaskId 而不是純字符串
- [ KAFKA-12788 ] - 改進 KRaft 副本放置
- [ KAFKA-12803 ] - 支持在 KRaft 模式下重新分配分區
- [ KAFKA-12819 ] - 測試的生活質量改進
- [ KAFKA-12849 ] - 考慮將 TaskMetadata 遷移到與內部實現的接口
- [ KAFKA-12874 ] - 將默認消費者會話超時增加到 45 秒 (KIP-735)
- [ KAFKA-12906 ] - 消費者應在反序列化異常中包含分區和偏移量
- [ KAFKA-12909 ] - 允許用戶選擇加入虛假的左/外流流加入改進
- [ KAFKA-12921 ] - 將 ZSTD JNI 從 1.4.9-1 升級到 1.5.0-1
- [ KAFKA-12922 ] - MirrorCheckpointTask 應該關閉主題過濾器
- [ KAFKA-12931 ] - KIP-746:修改 KRaft 元數據記錄
- [ KAFKA-12934 ] - 將一些控制器類移動到元數據包
- [ KAFKA-12981 ] - 確保同步讀取/更新 LogSegment.maxTimestampSoFar 和 LogSegment.offsetOfMaxTimestampSoFar
- [ KAFKA-13000 ] - 改進 MockClient 中 UnsupportedVersionException 的處理
- [ KAFKA-13021 ] - 從 KIP-633 改進 API 更改和地址跟進的 Javadocs
- [ KAFKA-13026 ] - 冪等生產者 (KAFKA-10619) 后續測試
- [ KAFKA-13041 ] - 支持使用 ducker-ak 調試系統測試
- [ KAFKA-13209 ] - 升級碼頭服務器以修復 CVE-2021-34429
- [ KAFKA-13258 ] - AlterClientQuotas 響應失敗時不包含錯誤
- [ KAFKA-13259 ] - DescribeProducers 響應在失敗時不包含錯誤
- [ KAFKA-13260 ] - FindCoordinator errorCounts 不處理 v4
4.BUG修復
在 Kafka 3.0 中修復了如下BUG:
- [ KAFKA-3968 ] - 將新 FileMessageSet 刷新到磁盤時,不會在父目錄上調用 fsync()
- [ KAFKA-5146 ] - Kafka Streams:刪除對 connect-json 的編譯依賴
- [ KAFKA-6435 ] - 應用程序重置工具可能會刪除不正確的內部主題
- [ KAFKA-7421 ] - 類加載期間 Kafka Connect 中的死鎖
- [ KAFKA-8315 ] - 歷史連接問題
- [ KAFKA-8562 ] - 盡管 KAFKA-5051,SASL_SSL 仍然執行反向 DNS 查找
- [ KAFKA-8784 ] - 刪除 RocksDBConfigSetter#close 的默認實現
- [ KAFKA-8940 ] - 片狀測試 SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
- [ KAFKA-9186 ] - Kafka Connect 用可能來自 DelegatingClassLoader 的錯誤消息淹沒日志
- [ KAFKA-9189 ] - 如果與 Zookeeper 的連接丟失,則會阻止關閉
- [ KAFKA-9295 ] - KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
- [ KAFKA-9527 ] - 當 --to-datetime 或 --by-duration 在具有空分區的 --input-topics 上運行時,應用程序重置工具返回 NPE
- [ KAFKA-9672 ] - ISR 中的死代理導致 isr-expiration 失敗並出現異常
- [ KAFKA-9858 ] - bzip2 1.0.6 中 bzip2recover 中的 CVE-2016-3189 釋放后使用漏洞允許遠程攻擊者通過精心制作的 bzip2 文件導致拒絕服務(崩潰),與設置為之前的塊端相關塊的開始。
- [ KAFKA-10046 ] - 棄用的 PartitionGrouper 配置被忽略
- [ KAFKA-10192 ] - 片狀測試 BlockingConnectorTest#testBlockInConnectorStop
- [ KAFKA-10340 ] - 在嘗試為不存在的主題生成記錄而不是永遠掛起時,源連接器應該報告錯誤
- [ KAFKA-10614 ] - 選舉/辭職的組協調員應防范領導時代
- [ KAFKA-12170 ] - Connect Cast 無法正確處理“字節”類型的字段
- [ KAFKA-12252 ] - 當工人失去領導權時,分布式牧人滴答線程快速循環
- [ KAFKA-12262 ] - 當擁有密鑰的追隨者成為領導者時,永遠不會分發新的會話密鑰
- [ KAFKA-12297 ] - MockProducer 的實現與異步發送回調的文檔相矛盾
- [ KAFKA-12303 ] - 當存在空值時,Flatten SMT 會刪除一些字段
- [ KAFKA-12308 ] - ConfigDef.parseType 死鎖
- [ KAFKA-12330 ] - 當 FetchResponse 已滿時,FetchSessionCache 可能會導致分區飢餓
- [ KAFKA-12336 ] - 使用命名的 Consumed 參數調用 stream[K, V](topicPattern: Pattern) API 時自定義流命名不起作用
- [ KAFKA-12350 ] - 關於refresh.topics.interval.seconds默認值不正確的文檔
- [ KAFKA-12393 ] - 記錄多租戶注意事項
- [ KAFKA-12426 ] - 缺少在 RaftReplicaManager 中創建 partition.metadata 文件的邏輯
- [ KAFKA-12427 ] - Broker 不會關閉帶有緩沖數據的靜音空閑連接
- [ KAFKA-12474 ] - 如果無法寫入新的會話密鑰,Worker 可能會死
- [ KAFKA-12492 ] - 示例 RocksDBConfigSetter 的格式混亂
- [ KAFKA-12514 ] - SubscriptionState 中的 NPE
- [ KAFKA-12520 ] - 在啟動時不必要地重建生產者狀態
- [ KAFKA-12522 ] - Cast SMT 應該允許空值記錄通過
- [ KAFKA-12548 ] - 無效的記錄錯誤消息未發送到應用程序
- [ KAFKA-12557 ] - org.apache.kafka.clients.admin.KafkaAdminClientTest#testClientSideTimeoutAfterFailureToReceiveResponse 間歇性地無限期掛起
- [ KAFKA-12611 ] - 修復了在 ProducerPerformance 中錯誤地使用隨機負載的問題
- [ KAFKA-12619 ] - 確保在初始化高水印之前提交 LeaderChange 消息
- [ KAFKA-12650 ] - InternalTopicManager#cleanUpCreatedTopics 中的 NPE
- [ KAFKA-12655 ] - CVE-2021-28165 - 將碼頭升級到 9.4.39
- [ KAFKA-12660 ] - 追加失敗后不更新偏移提交傳感器
- [ KAFKA-12661 ] - 當值不為空時,ConfigEntry#equal 不比較其他字段
- [ KAFKA-12667 ] - StateDirectory 關閉時錯誤日志不正確
- [ KAFKA-12672 ] - 運行 test -kraft -server-start 導致錯誤
- [ KAFKA-12677 ] - raftCluster 總是發送到錯誤的活動控制器並且從不更新
- [ KAFKA-12684 ] - 有效的分區列表被成功選擇的分區列表錯誤地替換
- [ KAFKA-12686 ] - AlterIsr 響應處理中的競爭條件
- [ KAFKA-12691 ] - TaskMetadata timeSinceIdlingStarted 未正確報告
- [ KAFKA-12700 ] - admin.listeners 配置在文檔中有不穩定的有效值
- [ KAFKA-12702 ] - InterBrokerSendThread 中捕獲的未處理異常
- [ KAFKA-12718 ] - SessionWindows 過早關閉
- [ KAFKA-12730 ] - 單個 Kerberos 登錄失敗會導致 Java 9 以后的所有連接失敗
- [ KAFKA-12747 ] - 片狀測試 RocksDBStoreTest.shouldReturnUUIDsWithStringPrefix
- [ KAFKA-12749 ] - 被抑制的 KTable 上的更新日志主題配置丟失
- [ KAFKA-12752 ] - CVE-2021-28168 將球衣升級到 2.34 或 3.02
- [ KAFKA-12754 ] - 讀取偏移量時,TaskMetadata endOffsets 不會更新
- [ KAFKA-12777 ] - AutoTopicCreationManager 不處理響應錯誤
- [ KAFKA-12782 ] - Javadocs 搜索將您發送到一個不存在的 URL
- [ KAFKA-12792 ] - 修復指標錯誤並引入 TimelineInteger
- [ KAFKA-12815 ] - KTable.transformValue 可能有不正確的記錄元數據
- [ KAFKA-12835 ] - 代理上的主題 ID 可能不匹配(代理間協議版本更新后)
- [ KAFKA-12851 ] - 片狀測試 RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable
- [ KAFKA-12856 ] - 將 Jackson 升級到 2.12.3
- [ KAFKA-12865 ] - 描述 ACL 中管理客戶端 API 的文檔錯誤
- [ KAFKA-12866 ] - 即使使用 chroot Kafka 也需要 ZK root 訪問權限
- [ KAFKA-12867 ] - Trogdor ConsumeBenchWorker 使用 maxMessages 配置提前退出
- [ KAFKA-12870 ] - RecordAccumulator 卡在刷新狀態
- [ KAFKA-12880 ] - 在 3.0 中刪除不推薦使用的 Count 和 SampledTotal
- [ KAFKA-12889 ] - 日志清理組考慮空日志段以避免留下空日志
- [ KAFKA-12890 ] - 消費者組陷入“CompletingRebalance”
- [ KAFKA-12896 ] - 由重復的組長 JoinGroups 引起的組重新平衡循環
- [ KAFKA-12897 ] - KRaft 控制器無法在單個代理集群上創建具有多個分區的主題
- [ KAFKA-12898 ] - 訂閱中擁有的分區必須排序
- [ KAFKA-12904 ] - Connect 的驗證 REST 端點使用不正確的超時
- [ KAFKA-12914 ] - StreamSourceNode.toString() 拋出 StreamsBuilder.stream(Pattern) ctor
- [ KAFKA-12925 ] - 中間接口缺少前綴掃描
- [ KAFKA-12926 ] - 運行 kafka-consumer-groups.sh 時,ConsumerGroupCommand 的 java.lang.NullPointerException 出現負偏移
- [ KAFKA-12945 ] - 刪除 3.0 中的端口、主機名和相關配置
- [ KAFKA-12948 ] - 節點處於連接狀態的 NetworkClient.close(node) 使 NetworkClient 無法使用
- [ KAFKA-12949 ] - TestRaftServer 的 scala.MatchError:test-kraft-server-start.sh 上的 null
- [ KAFKA-12951 ] - 恢復 GlobalKTable 時的無限循環
- [ KAFKA-12964 ] - 損壞的段恢復可以刪除新的生產者狀態快照
- [ KAFKA-12983 ] - 在加入組之前並不總是調用 onJoinPrepare
- [ KAFKA-12984 ] - 合作粘性分配器可能會因無效的 SubscriptionState 輸入元數據而卡住
- [ KAFKA-12991 ] - 修復對 `AbstractCoordinator.state` 的不安全訪問
- [ KAFKA-12993 ] - Streams“內存管理”文檔的格式混亂
- [ KAFKA-12996 ] - 當獲取偏移量小於領導者起始偏移量時,未正確處理 OffsetOutOfRange 以用於發散時期
- [ KAFKA-13002 ] - 對於非 MAX_TIMESTAMP 規范,listOffsets 必須立即降級
- [ KAFKA-13003 ] - KafkaBroker 通告套接字端口而不是配置的通告端口
- [ KAFKA-13007 ] - KafkaAdminClient getListOffsetsCalls 為每個主題分區構建集群快照
- [ KAFKA-13008 ] - 流將在等待分區延遲時長時間停止處理數據
- [ KAFKA-13010 ] - 片狀測試 org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()
- [ KAFKA-13029 ] - FindCoordinators 批處理可能會在滾動升級期間破壞消費者
- [ KAFKA-13033 ] - 協調器不可用錯誤應該導致添加到取消映射列表中進行新的查找
- [ KAFKA-13037 ] - “線程狀態已經是 PENDING_SHUTDOWN” 日志垃圾郵件
- [ KAFKA-13053 ] - KRaft 記錄的凹凸框架版本
- [ KAFKA-13056 ] - 當控制器共同駐留時,代理不應生成快照
- [ KAFKA-13057 ] - 許多代理 RPC 在 KRaft 模式下未啟用
- [ KAFKA-13058 ] - `AlterConsumerGroupOffsetsHandler` 不能正確處理分區錯誤。
- [ KAFKA-13073 ] - 由於 MockLog 的實現不一致,模擬測試失敗
- [ KAFKA-13078 ] - 過早關閉 FileRawSnapshotWriter
- [ KAFKA-13080 ] - 獲取快照請求未定向到控制器中的kraft
- [ KAFKA-13092 ] - LISR 請求中的性能回歸
- [ KAFKA-13096 ] - 添加/刪除/替換線程時不會更新 QueryableStoreProvider 呈現 IQ 不可能
- [ KAFKA-13098 ] - 在元數據日志目錄中恢復快照時沒有此類文件異常
- [ KAFKA-13099 ] - 使 transactionalIds 過期時消息太大錯誤
- [ KAFKA-13100 ] - 控制器無法恢復到內存快照
- [ KAFKA-13104 ] - 控制器應在 RaftClient 辭職時通知它
- [ KAFKA-13112 ] - 控制器提交的偏移量與 raft 客戶端偵聽器上下文不同步
- [ KAFKA-13119 ] - 在啟動時驗證 KRaft controllerListener 配置
- [ KAFKA-13127 ] - 修復雜散分區查找邏輯
- [ KAFKA-13129 ] - 修復與 ConfigCommand 更改相關的損壞系統測試
- [ KAFKA-13132 ] - 在 LISR 請求中升級到主題 ID 在 3.0 中引入了差距
- [ KAFKA-13137 ] - KRaft 控制器指標 MBean 名稱被錯誤引用
- [ KAFKA-13139 ] - 在沒有任務的情況下請求重新啟動連接器后的空響應導致 NPE
- [ KAFKA-13141 ] - 如果存在分歧時期,領導者不應更新追隨者獲取偏移量
- [ KAFKA-13143 ] - 禁用 KRaft 控制器的元數據端點
- [ KAFKA-13160 ] - 修復了在使用 KRaft 時調用代理的配置處理程序以傳遞預期默認資源名稱的代碼。
- [ KAFKA-13161 ] - 在 KRaft 中分區更改后未更新跟隨者領導者和 ISR 狀態
- [ KAFKA-13167 ] - KRaft 代理應在受控關閉期間立即心跳
- [ KAFKA-13168 ] - KRaft 觀察者不應該有副本 ID
- [ KAFKA-13173 ] - KRaft 控制器不能正確處理同時代理到期
- [ KAFKA-13198 ] - TopicsDelta 在處理 PartitionChangeRecord 時不會更新已刪除的主題
- [ KAFKA-13214 ] - 消費者在斷開連接后不應重置組狀態
- [ KAFKA-13215 ] - 片狀測試 org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
- [ KAFKA-13219 ] - BrokerState 指標不適用於 KRaft 集群
- [ KAFKA-13262 ] - 模擬客戶端現在有最終的 close() 方法
- [ KAFKA-13266 ] - 從提取器中刪除分區后應創建“InitialFetchState”
- [ KAFKA-13270 ] - Kafka 可能無法連接到 ZooKeeper,永遠重試,永遠不會啟動
- [ KAFKA-13276 ] - 公共 DescribeConsumerGroupsResult 構造函數指的是 KafkaFutureImpl
- [ KAFKA-13277 ] - 請求/響應中長標記字符串的序列化拋出 BufferOverflowException
5.任務
在 Kafka 3.0 中的開發任務如下:
- [ KAFKA-8405 ] - 刪除不推薦使用的 `kafka-preferred-replica-election` 命令
- [ KAFKA-8734 ] - 刪除 PartitionAssignorAdapter 和不推薦使用的 PartitionAssignor 接口
- [ KAFKA-10070 ] - 參數化連接單元測試以刪除代碼重復
- [ KAFKA-10091 ] - 改善任務空閑
- [ KAFKA-12482 ] - 刪除不推薦使用的 rest.host.name 和 rest.port Connect worker 配置
- [ KAFKA-12519 ] - 考慮刪除舊的內置指標版本的流
- [ KAFKA-12578 ] - 刪除不推薦使用的安全類/方法
- [ KAFKA-12579 ] - 從 3.0 的客戶端中刪除各種不推薦使用的方法
- [ KAFKA-12581 ] - 刪除不推薦使用的 Admin.electPreferredLeaders
- [ KAFKA-12588 ] - 在 shell 命令中刪除不推薦使用的 --zookeeper
- [ KAFKA-12590 ] - 刪除不推薦使用的 SimpleAclAuthorizer
- [ KAFKA-12592 ] - 刪除不推薦使用的 LogConfig.Compact
- [ KAFKA-12600 ] - 刪除客戶端配置`client.dns.lookup`的棄用配置值`default`
- [ KAFKA-12625 ] - 修復通知文件
- [ KAFKA-12717 ] - 刪除內部轉換器配置屬性
- [ KAFKA-12724 ] - 將 2.8.0 添加到系統測試和流升級測試
- [ KAFKA-12794 ] - DescribeProducersRequest.json 中的尾隨 JSON 令牌可能會導致某些 JSON 解析器中的解析錯誤
- [ KAFKA-12800 ] - 配置 jackson 以拒絕生成器中的尾隨輸入
- [ KAFKA-12820 ] - 升級 maven-artifact 依賴以解決 CVE-2021-26291
- [ KAFKA-12976 ] - 從刪除主題調用中刪除UNSUPPORTED_VERSION錯誤
- [ KAFKA-12985 ] - CVE-2021-28169 - 將碼頭升級到 9.4.42
- [ KAFKA-13035 ] - Kafka Connect:更新 POST /connectors/(string: name)/restart 文檔以包含任務重啟行為
- [ KAFKA-13051 ] - 需要為 3.0 定義 Principal Serde
- [ KAFKA-13151 ] - 在 KRaft 中禁止策略配置
6.總結
Kafka 3.0 的發布標志着社區對 Kafka 項目邁向了一個新的里程牌。另外,感謝Kafka PMC對Kafka Eagle監控系統的認可,為了維護Apache社區的商標權益,現在對Kafka Eagle正式改名為EFAK(Eagle For Apache Kafka),EFAK會持續更新迭代優化,為大家管理Kafka集群和使用Kafka應用提供便利,歡迎大家使用EFAK,也可以到Github或者EAFK官網上關注 EFAK 的最新動態。
7.結束語
這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!
另外,博主出書了《Kafka並不難學》和《Hadoop大數據挖掘從入門到進階實戰》,喜歡的朋友或同學, 可以在公告欄那里點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。