一 .前言
最進再看官方flink提供的視頻教程,發現入門版本因為時間關系都是基於1.7.x講解的. 在實際操作中跟1.12.x版本還是有差距的, 所以整理一下從1.7 版本到1.12版本之間的相對大的變動. 做到在學習的過程中可以做到心里有數.
二 .Flink 1.7 版本
在 Flink 1.7.0,我們更關注實現快速數據處理以及以無縫方式為 Flink 社區構建數據密集型應用程序。我們最新版本包括一些令人興奮的新功能和改進,例如對 Scala 2.12 的支持,Exactly-Once 語義的 S3 文件接收器,復雜事件處理與流SQL的集成.
2.1. Flink中的Scala 2.12支持
Flink 1.7.0 是第一個完全支持 Scala 2.12 的版本。這可以讓用戶使用新的 Scala 版本編寫 Flink 應用程序以及利用 Scala 2.12 的生態系統。
2.2. 狀態變化
在許多情況下,由於需求的變化,長期運行的 Flink 應用程序會在其生命周期內發生變化。在不丟失當前應用程序進度狀態的情況下更改用戶狀態是應用程序變化的關鍵要求。Flink 1.7.0 版本中社區添加了狀態變化,允許我們靈活地調整長時間運行的應用程序的用戶狀態模式,同時保持與先前保存點的兼容。通過狀態變化,我們可以在狀態模式中添加或刪除列。當使用 Avro 生成類作為用戶狀態時,狀態模式變化可以開箱即用,這意味着狀態模式可以根據 Avro 的規范進行變化。雖然 Avro 類型是 Flink 1.7 中唯一支持模式變化的內置類型,但社區仍在繼續致力於在未來的 Flink 版本中進一步擴展對其他類型的支持。
2.3. Exactly-once語義的S3 StreamingFileSink
Flink 1.6.0 中引入的 StreamingFileSink 現在已經擴展到 S3 文件系統,並保證 Exactly-once 語義。使用此功能允許所有 S3 用戶構建寫入 S3 的 Exactly-once 語義端到端管道。
2.4. Streaming SQL中支持MATCH_RECOGNIZE
這是 Apache Flink 1.7.0 的一個重要補充,它為 Flink SQL 提供了 MATCH_RECOGNIZE 標准的初始支持。此功能融合了復雜事件處理(CEP)和SQL,可以輕松地對數據流進行模式匹配,從而實現一整套新的用例。此功能目前處於測試階段。
2.5. Streaming SQL中的 Temporal Tables 和 Temporal Joins
Temporal Tables 是 Apache Flink 中的一個新概念,它為表的更改歷史記錄提供(參數化)視圖,可以返回表在任何時間點的內容。例如,我們可以使用具有歷史貨幣匯率的表。隨着時間的推移,表會不斷發生變化,並增加更新的匯率。Temporal Table 是一種視圖,可以返回匯率在任何時間點的實際狀態。通過這樣的表,可以使用正確的匯率將不同貨幣的訂單流轉換為通用貨幣。
Temporal Joins 允許 Streaming 數據與不斷變化/更新的表的內存和計算效率的連接,使用處理時間或事件時間,同時符合ANSI SQL。
流式 SQL 的其他功能除了上面提到的主要功能外,Flink 的 Table&SQL API 已經擴展到更多用例。以下內置函數被添加到API:TO_BASE64,LOG2,LTRIM,REPEAT,REPLACE,COSH,SINH,TANH。SQL Client 現在支持在環境文件和 CLI 會話中自定義視圖。此外,CLI 中還添加了基本的 SQL 語句自動完成功能。社區添加了一個 Elasticsearch 6 table sink,允許存儲動態表的更新結果。
2.6. 版本化REST API
從 Flink 1.7.0 開始,REST API 已經版本化。這保證了 Flink REST API 的穩定性,因此可以在 Flink 中針對穩定的 API開發第三方應用程序。因此,未來的 Flink 升級不需要更改現有的第三方集成。
2.7. Kafka 2.0 Connector
Apache Flink 1.7.0 繼續添加更多的連接器,使其更容易與更多外部系統進行交互。在此版本中,社區添加了 Kafka 2.0 連接器,可以從 Kafka 2.0 讀寫數據時保證 Exactly-Once 語義。
2.8. 本地恢復
Apache Flink 1.7.0 通過擴展 Flink 的調度來完成本地恢復功能,以便在恢復時考慮之前的部署位置。如果啟用了本地恢復,Flink 將在運行任務的機器上保留一份最新檢查點的本地副本。將任務調度到之前的位置,Flink 可以通過從本地磁盤讀取檢查點狀態來最小化恢復狀態的網絡流量。此功能大大提高了恢復速度。
2.9. 刪除Flink的傳統模式
Apache Flink 1.7.0 標志着 Flip-6 工作已經完全完成並且與傳統模式達到功能奇偶校驗。因此,此版本刪除了對傳統模式的支持。
三 .Flink 1.8 版本
新特性和改進:
- Schema Evolution Story 最終版
- 基於 TTL 持續清除舊狀態
- 使用用戶定義的函數和聚合進行 SQL 模式檢測
- 符合 RFC 的 CSV 格式
- 新的 KafkaDeserializationSchema,可以直接訪問 ConsumerRecord
- FlinkKinesisConsumer 中的分片水印選項
- DynamoDB Streams 的新用戶捕獲表更改
- 支持用於子任務協調的全局聚合
重要變化:
- 使用 Flink 捆綁 Hadoop 庫的更改:不再發布包含 hadoop 的便捷二進制文件
- FlinkKafkaConsumer 現在將根據主題規范過濾已恢復的分區
- 表 API 的 Maven 依賴更改:之前具有flink-table依賴關系的用戶需要將依賴關系從flink-table-planner更新為正確的依賴關系 flink-table-api-,具體取決於是使用 Java 還是 Scala:flink-table-api-java-bridge或者flink-table-api-scala-bridge
3.1. 使用TTL(生存時間)連續增量清除舊的Key狀態
我們在Flink 1.6(FLINK-9510)中為Key狀態引入了TTL(生存時間)。此功能允許在訪問時清理並使Key狀態條目無法訪問。另外,在編寫保存點/檢查點時,現在也將清理狀態。Flink 1.8引入了對RocksDB狀態后端(FLINK-10471)和堆狀態后端(FLINK-10473)的舊條數的連續清理。這意味着舊的條數將(根據TTL設置)不斷被清理掉。
3.2. 恢復保存點時對模式遷移的新支持
使用Flink 1.7.0,我們在使用AvroSerializer時添加了對更改狀態模式的支持。使用Flink 1.8.0,我們在TypeSerializers將所有內置遷移到新的序列化器快照抽象方面取得了很大進展,該抽象理論上允許模式遷移。在Flink附帶的序列化程序中,我們現在支持PojoSerializer (FLINK-11485)和Java EnumSerializer (FLINK-11334)以及有限情況下的Kryo(FLINK-11323)的模式遷移格式。
3.3. 保存點兼容性
TraversableSerializer 此序列化程序(FLINK-11539)中的更新,包含Scala的Flink 1.2中的保存點將不再與Flink 1.8兼容。可以通過升級到Flink 1.3和Flink 1.7之間的版本,然后再更新至Flink 1.8來解決此限制。
3.4. RocksDB版本沖突並切換到FRocksDB(FLINK-10471)
需要切換到名為FRocksDB的RocksDB的自定義構建,因為需要RocksDB中的某些更改來支持使用TTL進行連續狀態清理。FRocksDB的已使用版本基於RocksDB的升級版本5.17.2。對於Mac OS X,僅支持OS X版本> =10.13的RocksDB版本5.17.2。
另見:https://github.com/facebook/rocksdb/issues/4862
3.5. Maven 依賴
使用Flink捆綁Hadoop庫的更改(FLINK-11266)
包含hadoop的便捷二進制文件不再發布。
如果部署依賴於flink-shaded-hadoop2包含 flink-dist,則必須從下載頁面的可選組件部分手動下載並打包Hadoop jar並將其復制到/lib目錄中。另外一種方法,可以通過打包flink-dist和激活 include-hadoopmaven配置文件來構建包含hadoop的Flink分發。
由於hadoop flink-dist默認不再包含在內,因此指定-DwithoutHadoop何時打包flink-dist將不再影響構建。
3.6. TaskManager配置(FLINK-11716)
TaskManagers現在默認綁定到主機IP地址而不是主機名。可以通過配置選項控制此行為taskmanager.network.bind-policy。如果你的Flink集群在升級后遇到莫名其妙的連接問題,嘗試設置taskmanager.network.bind-policy: name在flink-conf.yaml 返回前的1.8的設置行為。
3.7. Table API 的變動
- 直接表構造函數使用的取消預測(FLINK-11447) Flink 1.8不贊成Table在Table API中直接使用該類的構造函數。此構造函數以前將用於執行與橫向表的連接。你現在應該使用table.joinLateral()或 table.leftOuterJoinLateral()代替。這種更改對於將Table類轉換為接口是必要的,這將使Table API在未來更易於維護和更清潔。
- 引入新的CSV格式符(FLINK-9964)
此版本為符合RFC4180的CSV文件引入了新的格式符。新描述符可用作 org.apache.flink.table.descriptors.Csv。
目前,這只能與Kafka一起使用。舊描述符可org.apache.flink.table.descriptors.OldCsv用於文件系統連接器。
靜態生成器方法在TableEnvironment(FLINK-11445)上的棄用,為了將API與實際實現分開:TableEnvironment.getTableEnvironment()。
不推薦使用靜態方法。你現在應該使用Batch/StreamTableEnvironment.create()。
- 表API Maven模塊中的更改(FLINK-11064)
之前具有flink-table依賴關系的用戶需要更新其依賴關系flink-table-planner以及正確的依賴關系flink-table-api-?,具體取決於是使用Java還是Scala:flink-table-api-java-bridge或者flink-table-api-scala-bridge。
- 更改為外部目錄表構建器(FLINK-11522)
ExternalCatalogTable.builder()不贊成使用ExternalCatalogTableBuilder()。
- 更改為表API連接器jar的命名(FLINK-11026)
Kafka/elasticsearch6 sql-jars的命名方案已經更改。在maven術語中,它們不再具有sql-jar限定符,而artifactId現在以前綴為例,flink-sql而不是flink例如flink-sql-connector-kafka。
更改為指定Null的方式(FLINK-11785)
現在Table API中的Null需要定義nullof(type)而不是Null(type)。舊方法已被棄用。
3.8. 連接器變動
- 引入可直接訪問ConsumerRecord的新KafkaDeserializationSchema(FLINK-8354)
對於FlinkKafkaConsumers,我們推出了一個新的KafkaDeserializationSchema ,可以直接訪問KafkaConsumerRecord。這包含了該 KeyedSerializationSchema功能,該功能已棄用但目前仍可以使用。
- FlinkKafkaConsumer現在將根據主題規范過濾恢復的分區(FLINK-10342)
從Flink 1.8.0開始,現在FlinkKafkaConsumer總是過濾掉已恢復的分區,這些分區不再與要在還原的執行中訂閱的指定主題相關聯。此行為在以前的版本中不存在FlinkKafkaConsumer。
如果您想保留以前的行為。請使用上面的
disableFilterRestoredPartitionsWithSubscribedTopics()
配置方法FlinkKafkaConsumer。
考慮這個例子:如果你有一個正在消耗topic的Kafka Consumer A,你做了一個保存點,然后改變你的Kafka消費者而不是從topic消費B,然后從保存點重新啟動你的工作。在此更改之前,您的消費者現在將使用這兩個主題A,B因為它存儲在消費者正在使用topic消費的狀態A。通過此更改,您的使用者將僅B在還原后使用topic,因為我們使用配置的topic過濾狀態中存儲的topic。
其它接口改變:
1、從TypeSerializer接口(FLINK-9803)中刪除了canEqual()方法
這些canEqual()方法通常用於跨類型層次結構進行適當的相等性檢查。在TypeSerializer實際上並不需要這個屬性,因此該方法現已刪除。
2、刪除CompositeSerializerSnapshot實用程序類(FLINK-11073)
該CompositeSerializerSnapshot實用工具類已被刪除。
現在CompositeTypeSerializerSnapshot,你應該使用復合序列化程序的快照,該序列化程序將序列化委派給多個嵌套的序列化程序。有關使用的說明,請參閱此處CompositeTypeSerializerSnapshot。
四 .Flink 1.9 版本
2019年 8月22日,Apache Flink 1.9.0 版本正式發布,這也是阿里內部版本 Blink 合並入 Flink 后的首次版本發布。
此次版本更新帶來的重大功能包括批處理作業的批式恢復,以及 Table API 和 SQL 的基於 Blink 的新查詢引擎(預覽版)。同時,這一版本還推出了 State Processor API,這是社區最迫切需求的功能之一,該 API 使用戶能夠用 Flink DataSet 作業靈活地讀寫保存點。此外,Flink 1.9 還包括一個重新設計的 WebUI 和新的 Python Table API (預覽版)以及與 Apache Hive 生態系統的集成(預覽版)。
新功能和改進
- 細粒度批作業恢復 (FLIP-1)
- State Processor API (FLIP-43)
- Stop-with-Savepoint (FLIP-34)
- 重構 Flink WebUI
- 預覽新的 Blink SQL 查詢處理器
- Table API / SQL 的其他改進
- 預覽 Hive 集成 (FLINK-10556)
- 預覽新的 Python Table API (FLIP-38)
4.1. 細粒度批作業恢復 (FLIP-1)
批作業(DataSet、Table API 和 SQL)從 task 失敗中恢復的時間被顯著縮短了。在 Flink 1.9 之前,批處理作業中的 task 失敗是通過取消所有 task 並重新啟動整個作業來恢復的,即作業從頭開始,所有進度都會廢棄。在此版本中,Flink 將中間結果保留在網絡 shuffle 的邊緣,並使用此數據去恢復那些僅受故障影響的 task。所謂 task 的 “failover regions” (故障區)是指通過 pipelined 方式連接的數據交換方式,定義了 task 受故障影響的邊界。
要使用這個新的故障策略,需要確保 flink-conf.yaml 中有 jobmanager.execution.failover-strategy: region 的配置。
注意:1.9 發布包中默認就已經包含了該配置項,不過當從之前版本升級上來時,如果要復用之前的配置的話,需要手動加上該配置。
“Region” 的故障策略也能同時提升 “embarrassingly parallel” 類型的流作業的恢復速度,也就是沒有任何像 keyBy() 和 rebalance 的 shuffle 的作業。當這種作業在恢復時,只有受影響的故障區的 task 需要重啟。對於其他類型的流作業,故障恢復行為與之前的版本一樣。
4.2. State Processor API (FLIP-43)
直到 Flink 1.9,從外部訪問作業的狀態僅局限於:Queryable State(可查詢狀態)實驗性功能。此版本中引入了一種新的、強大的類庫,基於 DataSet 支持讀取、寫入、和修改狀態快照。在實踐上,這意味着:
- Flink 作業的狀態可以自主構建了,可以通過讀取外部系統的數據(例如外部數據庫),然后轉換成 savepoint。
- Savepoint 中的狀態可以使用任意的 Flink 批處理 API 查詢(DataSet、Table、SQL)。例如,分析相關的狀態模式或檢查狀態差異以支持應用程序審核或故障排查。
- Savepoint 中的狀態 schema 可以離線遷移了,而之前的方案只能在訪問狀態時進行,是一種在線遷移。
- Savepoint 中的無效數據可以被識別出來並糾正。
新的 State Processor API 覆蓋了所有類型的快照:savepoint,full checkpoint 和 incremental checkpoint。
4.3. Stop-with-Savepoint (FLIP-34)
“Cancel-with-savepoint” 是停止、重啟、fork、或升級 Flink 作業的一個常用操作。然而,當前的實現並沒有保證輸出到 exactly-once sink 的外部存儲的數據持久化。為了改進停止作業時的端到端語義,Flink 1.9 引入了一種新的 SUSPEND 模式,可以帶 savepoint 停止作業,保證了輸出數據的一致性。你可以使用 Flink CLI 來 suspend 一個作業:
bin/flink stop -p [:targetSavepointDirectory] :jobId
4.4. 重構 Flink WebUI
社區討論了現代化 Flink WebUI 的提案,決定采用 Angular 的最新穩定版來重構這個組件。從 Angular 1.x 躍升到了 7.x 。重新設計的 UI 是 1.9.0 的默認版本,不過有一個按鈕可以切換到舊版的 WebUI。
4.5. 新 Blink SQL 查詢處理器預覽
在 Blink 捐贈給 Apache Flink 之后,社區就致力於為 Table API 和 SQL 集成 Blink 的查詢優化器和 runtime。第一步,我們將 flink-table 單模塊重構成了多個小模塊(FLIP-32)。這對於 Java 和 Scala API 模塊、優化器、以及 runtime 模塊來說,有了一個更清晰的分層和定義明確的接口。
緊接着,我們擴展了 Blink 的 planner 以實現新的優化器接口,所以現在有兩個插件化的查詢處理器來執行 Table API 和 SQL:1.9 以前的 Flink 處理器和新的基於 Blink 的處理器。基於 Blink 的查詢處理器提供了更好地 SQL 覆蓋率(1.9 完整支持 TPC-H,TPC-DS 的支持在下一個版本的計划中)並通過更廣泛的查詢優化(基於成本的執行計划選擇和更多的優化規則)、改進的代碼生成機制、和調優過的算子實現來提升批處理查詢的性能。除此之外,基於 Blink 的查詢處理器還提供了更強大的流處理能力,包括一些社區期待已久的新功能(如維表 Join,TopN,去重)和聚合場景緩解數據傾斜的優化,以及內置更多常用的函數。
注:兩個查詢處理器之間的語義和功能大部分是一致的,但並未完全對齊。具體請查看發布日志。
不過, Blink 的查詢處理器的集成還沒有完全完成。因此,1.9 之前的 Flink 處理器仍然是1.9 版本的默認處理器,建議用於生產設置。你可以在創建 TableEnvironment 時通過 EnvironmentSettings 配置啟用 Blink 處理器。被選擇的處理器必須要在正在執行的 Java 進程的類路徑中。對於集群設置,默認兩個查詢處理器都會自動地加載到類路徑中。當從 IDE 中運行一個查詢時,需要在項目中顯式地增加一個處理器的依賴。
4.6. Table API / SQL 的其他改進
除了圍繞 Blink Planner 令人興奮的進展外,社區還做了一系列的改進,包括:
- 為 Table API / SQL 的 Java 用戶去除 Scala 依賴 (FLIP-32) 作為重構和拆分 flink-table 模塊工作的一部分,我們為 Java 和 Scala 創建了兩個單獨的 API 模塊。對於 Scala 用戶來說,沒有什么改變。不過現在 Java 用戶在使用 Table API 和 SQL 時,可以不用引入一堆 Scala 依賴了。
- 重構 Table API / SQL 的類型系統(FLIP-37) 我們實現了一個新的數據類型系統,以便從 Table API 中移除對 Flink TypeInformation 的依賴,並提高其對 SQL 標准的遵從性。不過還在進行中,預計將在下一版本完工,在 Flink 1.9 中,UDF 尚未移植到新的類型系統上。
- Table API 的多行多列轉換(FLIP-29) Table API 擴展了一組支持多行和多列、輸入和輸出的轉換的功能。這些轉換顯著簡化了處理邏輯的實現,同樣的邏輯使用關系運算符來實現是比較麻煩的。
- 嶄新的統一的 Catalog API Catalog 已有的一些接口被重構和(某些)被替換了,從而統一了內部和外部 catalog 的處理。這項工作主要是為了 Hive 集成(見下文)而啟動的,不過也改進了 Flink 在管理 catalog 元數據的整體便利性。
- SQL API 中的 DDL 支持 (FLINK-10232) 到目前為止,Flink SQL 已經支持 DML 語句(如 SELECT,INSERT)。但是外部表(table source 和 table sink)必須通過 Java/Scala 代碼的方式或配置文件的方式注冊。1.9 版本中,我們支持 SQL DDL 語句的方式注冊和刪除表(CREATE TABLE,DROP TABLE)。然而,我們還沒有增加流特定的語法擴展來定義時間戳抽取和 watermark 生成策略等。流式的需求將會在下一版本完整支持。
五 .Flink 1.10 版本 [重要版本 : Blink 整合完成]
作為 Flink 社區迄今為止規模最大的一次版本升級,Flink 1.10 容納了超過 200 位貢獻者對超過 1200 個 issue 的開發實現,包含對 Flink 作業的整體性能及穩定性的顯著優化、對原生 Kubernetes 的初步集成(beta 版本)以及對 Python 支持(PyFlink)的重大優化。
Flink 1.10 同時還標志着對 Blink[1] 的整合宣告完成,隨着對 Hive 的生產級別集成及對 TPC-DS 的全面覆蓋,Flink 在增強流式 SQL 處理能力的同時也具備了成熟的批處理能力。
5.1. 內存管理及配置優化
Flink 目前的 TaskExecutor 內存模型存在着一些缺陷,導致優化資源利用率比較困難,例如:
流和批處理內存占用的配置模型不同;流處理中的 RocksDB state backend 需要依賴用戶進行復雜的配置。為了讓內存配置變的對於用戶更加清晰、直觀,Flink 1.10 對 TaskExecutor 的內存模型和配置邏輯進行了較大的改動 (FLIP-49 [7])。這些改動使得 Flink 能夠更好地適配所有部署環境(例如 Kubernetes, Yarn, Mesos),讓用戶能夠更加嚴格的控制其內存開銷。
- Managed 內存擴展
Managed 內存的范圍有所擴展,還涵蓋了 RocksDB state backend 使用的內存。盡管批處理作業既可以使用堆內內存也可以使用堆外內存,使用 RocksDB state backend 的流處理作業卻只能利用堆外內存。因此為了讓用戶執行流和批處理作業時無需更改集群的配置,我們規定從現在起 managed 內存只能在堆外。
- 簡化 RocksDB 配置
此前,配置像 RocksDB 這樣的堆外 state backend 需要進行大量的手動調試,例如減小 JVM 堆空間、設置 Flink 使用堆外內存等。現在,Flink 的開箱配置即可支持這一切,且只需要簡單地改變 managed 內存的大小即可調整 RocksDB state backend 的內存預算。
另一個重要的優化是,Flink 現在可以限制 RocksDB 的 native 內存占用(FLINK-7289 [8]),以避免超過總的內存預算——這對於 Kubernetes 等容器化部署環境尤為重要。關於如何開啟、調試該特性,請參考 RocksDB 調試[9]。
注:FLIP-49 改變了集群的資源配置過程,因此從以前的 Flink 版本升級時可能需要對集群配置進行調整。詳細的變更日志及調試指南請參考文檔[10]。
5.2. 統一的作業提交邏輯
在此之前,提交作業是由執行環境負責的,且與不同的部署目標(例如 Yarn, Kubernetes, Mesos)緊密相關。這導致用戶需要針對不同環境保留多套配置,增加了管理的成本。
在 Flink 1.10 中,作業提交邏輯被抽象到了通用的 Executor 接口(FLIP-73 [11])。新增加的 ExecutorCLI (FLIP-81 [12])引入了為任意執行目標[13]指定配置參數的統一方法。此外,隨着引入 JobClient(FLINK-74 [14])負責獲取 JobExecutionResult,獲取作業執行結果的邏輯也得以與作業提交解耦。
5.3. 原生 Kubernetes 集成(Beta)
對於想要在容器化環境中嘗試 Flink 的用戶來說,想要在 Kubernetes 上部署和管理一個 Flink standalone 集群,首先需要對容器、算子及像 kubectl 這樣的環境工具有所了解。
在 Flink 1.10 中,我們推出了初步的支持 session 模式的主動 Kubernetes 集成(FLINK-9953 [15])。其中,“主動”指 Flink ResourceManager (K8sResMngr) 原生地與 Kubernetes 通信,像 Flink 在 Yarn 和 Mesos 上一樣按需申請 pod。用戶可以利用 namespace,在多租戶環境中以較少的資源開銷啟動 Flink。這需要用戶提前配置好 RBAC 角色和有足夠權限的服務賬號。
正如在統一的作業提交邏輯一節中提到的,Flink 1.10 將命令行參數映射到了統一的配置。因此,用戶可以參閱 Kubernetes 配置選項,在命令行中使用以下命令向 Kubernetes 提交 Flink 作業。
./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id= examples/streaming/WindowJoin.jar
5.4. Table API/SQL: 生產可用的 Hive 集成
Flink 1.9 推出了預覽版的 Hive 集成。該版本允許用戶使用 SQL DDL 將 Flink 特有的元數據持久化到 Hive Metastore、調用 Hive 中定義的 UDF 以及讀、寫 Hive 中的表。Flink 1.10 進一步開發和完善了這一特性,帶來了全面兼容 Hive 主要版本[17]的生產可用的 Hive 集成。
- Batch SQL 原生分區支持
此前,Flink 只支持寫入未分區的 Hive 表。在 Flink 1.10 中,Flink SQL 擴展支持了 INSERT OVERWRITE 和 PARTITION 的語法(FLIP-63 [18]),允許用戶寫入 Hive 中的靜態和動態分區。
寫入靜態分區
INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 …)] select_statement1 FROM from_statement;
寫入動態分區
INSERT { INTO | OVERWRITE } TABLE tablename1 select_statement1 FROM from_statement;
對分區表的全面支持,使得用戶在讀取數據時能夠受益於分區剪枝,減少了需要掃描的數據量,從而大幅提升了這些操作的性能。
- 其他優化
除了分區剪枝,Flink 1.10 的 Hive 集成還引入了許多數據讀取[19]方面的優化,例如:
投影下推:Flink 采用了投影下推技術,通過在掃描表時忽略不必要的域,最小化 Flink 和 Hive 表之間的數據傳輸量。這一優化在表的列數較多時尤為有效。
LIMIT 下推:對於包含 LIMIT 語句的查詢,Flink 在所有可能的地方限制返回的數據條數,以降低通過網絡傳輸的數據量。讀取數據時的 ORC 向量化:為了提高讀取 ORC 文件的性能,對於 Hive 2.0.0 及以上版本以及非復合數據類型的列,Flink 現在默認使用原生的 ORC 向量化讀取器。
- 將可插拔模塊作為 Flink 內置對象(Beta)
Flink 1.10 在 Flink table 核心引入了通用的可插拔模塊機制,目前主要應用於系統內置函數(FLIP-68 [20])。通過模塊,用戶可以擴展 Flink 的系統對象,例如像使用 Flink 系統函數一樣使用 Hive 內置函數。新版本中包含一個預先實現好的 HiveModule,能夠支持多個 Hive 版本,當然用戶也可以選擇編寫自己的可插拔模塊。
5.5. 其他 Table API/SQL 優化
- SQL DDL 中的 watermark 和計算列
Flink 1.10 在 SQL DDL 中增加了針對流處理定義時間屬性及產生 watermark 的語法擴展(FLIP-66 [22])。這使得用戶可以在用 DDL 語句創建的表上進行基於時間的操作(例如窗口)以及定義 watermark 策略。
CREATE TABLE table_name (
WATERMARK FOR columnName AS <watermark_strategy_expression>
) WITH (
...
)
- 其他 SQL DDL 擴展
Flink 現在嚴格區分臨時/持久、系統/目錄函數(FLIP-57 [24])。這不僅消除了函數引用中的歧義,還帶來了確定的函數解析順序(例如,當存在命名沖突時,比起目錄函數、持久函數 Flink 會優先使用系統函數、臨時函數)。
在 FLIP-57 的基礎上,我們擴展了 SQL DDL 的語法,支持創建目錄函數、臨時函數以及臨時系統函數(FLIP-79):
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
[IF NOT EXISTS] [catalog_name.][db_name.]function_name
AS identifier [LANGUAGE JAVA|SCALA]
關於目前完整的 Flink SQL DDL 支持,請參考最新的文檔[26]。
注:為了今后正確地處理和保證元對象(表、視圖、函數)上的行為一致性,Flink 廢棄了 Table API 中的部分對象申明方法,以使留下的方法更加接近標准的 SQL DDL(FLIP-64)。
- 批處理完整的 TPC-DS 覆蓋
TPC-DS 是廣泛使用的業界標准決策支持 benchmark,用於衡量基於 SQL 的數據處理引擎性能。Flink 1.10 端到端地支持所有 TPC-DS 查詢(FLINK-11491 [28]),標志着 Flink SQL 引擎已經具備滿足現代數據倉庫及其他類似的處理需求的能力。
5.6. PyFlink: 支持原生用戶自定義函數(UDF)
作為 Flink 全面支持 Python 的第一步,在之前版本中我們發布了預覽版的 PyFlink。在新版本中,我們專注於讓用戶在 Table API/SQL 中注冊並使用自定義函數(UDF,另 UDTF / UDAF 規划中)(FLIP-58)。
如果你對這一特性的底層實現(基於 Apache Beam 的可移植框架)感興趣,請參考 FLIP-58 的 Architecture 章節以及 FLIP-78。這些數據結構為支持 Pandas 以及今后將 PyFlink 引入到 DataStream API 奠定了基礎。
從 Flink 1.10 開始,用戶只要執行以下命令就可以輕松地通過 pip 安裝 PyFlink:
pip install apache-flink
5.7. 重要變更
- FLINK-10725[34]:Flink 現在可以使用 Java 11 編譯和運行。
- FLINK-15495[35]:SQL 客戶端現在默認使用 Blink planner,向用戶提供最新的特性及優化。Table API 同樣計划在下個版本中從舊的 planner 切換到 Blink planner,我們建議用戶現在就開始嘗試和熟悉 Blink planner。
- FLINK-13025[36]:新的 Elasticsearch sink connector[37] 全面支持 Elasticsearch 7.x 版本。
- FLINK-15115[38]:Kafka 0.8 和 0.9 的 connector 已被標記為廢棄並不再主動支持。如果你還在使用這些版本或有其他相關問題,請通過 @dev 郵件列表聯系我們。
- FLINK-14516[39]:非基於信用的網絡流控制已被移除,同時移除的還有配置項“taskmanager.network.credit.model”。今后,Flink 將總是使用基於信用的網絡流控制。
- FLINK-12122[40]:在 Flink 1.5.0 中,FLIP-6[41] 改變了 slot 在 TaskManager 之間的分布方式。要想使用此前的調度策略,既盡可能將負載分散到所有當前可用的 TaskManager,用戶可以在 flink-conf.yaml 中設置 “cluster.evenly-spread-out-slots: true”。
- FLINK-11956[42]:s3-hadoop 和 s3-presto 文件系統不再使用類重定位加載方式,而是使用插件方式加載,同時無縫集成所有認證提供者。我們強烈建議其他文件系統也只使用插件加載方式,並將陸續移除重定位加載方式。
Flink 1.9 推出了新的 Web UI,同時保留了原來的 Web UI 以備不時之需。截至目前,我們沒有收到關於新的 UI 存在問題的反饋,因此社區投票決定在 Flink 1.10 中移除舊的 Web UI。
原文: https://developer.aliyun.com/article/744734
官方地址: https://flink.apache.org/news/2020/02/11/release-1.10.0.html
5.7. 重要變更 FLINK-10725[34]:Flink 現在可以使用 Java 11 編譯和運行。FLINK-15495[35]:SQL 客戶端現在默認使用 Blink planner,向用戶提供最新的特性及優化。Table API 同樣計划在下個版本中從舊的 planner 切換到 Blink planner,我們建議用戶現在就開始嘗試和熟悉 Blink planner。FLINK-13025[36]:新的 Elasticsearch sink connector[37] 全面支持 Elasticsearch 7.x 版本。FLINK-15115[38]:Kafka 0.8 和 0.9 的 connector 已被標記為廢棄並不再主動支持。如果你還在使用這些版本或有其他相關問題,請通過 @dev 郵件列表聯系我們。FLINK-14516[39]:非基於信用的網絡流控制已被移除,同時移除的還有配置項“taskmanager.network.credit.model”。今后,Flink 將總是使用基於信用的網絡流控制。FLINK-12122[40]:在 Flink 1.5.0 中,FLIP-6[41] 改變了 slot 在 TaskManager 之間的分布方式。要想使用此前的調度策略,既盡可能將負載分散到所有當前可用的 TaskManager,用戶可以在 flink-conf.yaml 中設置 “cluster.evenly-spread-out-slots: true”。FLINK-11956[42]:s3-hadoop 和 s3-presto 文件系統不再使用類重定位加載方式,而是使用插件方式加載,同時無縫集成所有認證提供者。我們強烈建議其他文件系統也只使用插件加載方式,並將陸續移除重定位加載方式。Flink 1.9 推出了新的 Web UI,同時保留了原來的 Web UI 以備不時之需。截至目前,我們沒有收到關於新的 UI 存在問題的反饋,因此社區投票決定[43]在 Flink 1.10 中移除舊的 Web UI。
原文: https://developer.aliyun.com/article/744734 官方地址: https://flink.apache.org/news/2020/02/11/release-1.10.0.html?spm=a2c6h.12873639.0.0.749e4fc0c1D14m
六 .Flink 1.11 版本 [重要版本]
Flink 1.11.0 正式發布。歷時近 4 個月,Flink 在生態、易用性、生產可用性、穩定性等方面都進行了增強和改善。
core engine 引入了 unaligned checkpoints,這是對 Flink 的容錯機制的重大更改,該機制可改善在高背壓下的檢查點性能。
一個新的 Source API 通過統一批處理和 streaming 執行以及將內部組件(例如事件時間處理、水印生成或空閑檢測)卸載到 Flink 來簡化(自定義)sources 的實現。
Flink SQL 引入了對變更數據捕獲(CDC)的支持,以輕松使用和解釋來自 Debezium 之類的工具的數據庫變更日志。更新的 FileSystem 連接器還擴展了 Table API/SQL 支持的用例和格式集,從而實現了直接啟用從 Kafka 到 Hive 的 streaming 數據傳輸等方案。
PyFlink 的多項性能優化,包括對矢量化用戶定義函數(Pandas UDF)的支持。這改善了與 Pandas 和 NumPy 之類庫的互操作性,使 Flink 在數據科學和 ML 工作負載方面更強大。
重要變化
- [FLINK-17339] 從 Flink 1.11 開始,Blink planner 是 Table API/SQL中的默認設置。自 Flink 1.10 起,SQL 客戶端已經存在這種情況。仍支持舊的 Flink 規划器,但未積極開發。
- [FLINK-5763] Savepoints 現在將其所有狀態包含在一個目錄中(元數據和程序狀態)。這樣可以很容易地找出組成 savepoint 狀態的文件,並允許用戶通過簡單地移動目錄來重新定位 savepoint。
- [FLINK-16408] 為了減輕對 JVM metaspace 的壓力,只要任務分配了至少一個插槽,TaskExecutor就會重用用戶代碼類加載器。這會稍微改變 Flink 的恢復行為,從而不會重新加載靜態字段。
- [FLINK-11086] Flink 現在支持 Hadoop 3.0.0 以上的 Hadoop 版本。請注意,Flink 項目不提供任何更新的flink-shaded-hadoop-x jars。用戶需要通過HADOOP_CLASSPATH環境變量(推薦)或 lib/ folder 提供 Hadoop 依賴項。
- [FLINK-16963] Flink 隨附的所有MetricReporters均已轉換為插件。這些不再應該放在/lib中(可能導致依賴沖突),而應該放在/plugins/< some_directory>中。
- [FLINK-12639] Flink 文檔正在做一些返工,因此從 Flink 1.11 開始,內容的導航和組織會有所變化。
官方原文: https://flink.apache.org/news/2020/07/06/release-1.11.0.html
6.1. Table & SQL 支持 Change Data Capture(CDC)
CDC 被廣泛使用在復制數據、更新緩存、微服務間同步數據、審計日志等場景,很多公司都在使用開源的 CDC 工具,如 MySQL CDC。通過 Flink 支持在 Table & SQL 中接入和解析 CDC 是一個強需求,在過往的很多討論中都被提及過,可以幫助用戶以實時的方式處理 changelog 流,進一步擴展 Flink 的應用場景,例如把 MySQL 中的數據同步到 PG 或 ElasticSearch 中,低延時的 temporal join 一個 changelog 等。
除了考慮到上面的真實需求,Flink 中定義的“Dynamic Table”概念在流上有兩種模型:append 模式和 update 模式。通過 append 模式把流轉化為“Dynamic Table”在之前的版本中已經支持,因此在 1.11.0 中進一步支持 update 模式也從概念層面完整的實現了“Dynamic Table”。
為了支持解析和輸出 changelog,如何在外部系統和 Flink 系統之間編解碼這些更新操作是首要解決的問題。考慮到 source 和 sink 是銜接外部系統的一個橋梁,因此 FLIP-95 在定義全新的 Table source 和 Table sink 接口時解決了這個問題。
在公開的 CDC 調研報告中,Debezium 和 Canal 是用戶中最流行使用的 CDC 工具,這兩種工具用來同步 changelog 到其它的系統中,如消息隊列。據此,FLIP-105 首先支持了 Debezium 和 Canal 這兩種格式,而且 Kafka source 也已經可以支持解析上述格式並輸出更新事件,在后續的版本中會進一步支持 Avro(Debezium) 和 Protobuf(Canal)。
CREATE TABLE my_table (
...) WITH (
'connector'='...', -- e.g. 'kafka'
'format'='debezium-json',
'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)
'debezium-json.ignore-parse-errors'='true' -- default: false
);
6.2. Table & SQL 支持 JDBC Catalog
1.11.0 之前,用戶如果依賴 Flink 的 source/sink 讀寫關系型數據庫或讀取 changelog 時,必須要手動創建對應的 schema。而且當數據庫中的 schema 發生變化時,也需要手動更新對應的 Flink 作業以保持一致和類型匹配,任何不匹配都會造成運行時報錯使作業失敗。用戶經常抱怨這個看似冗余且繁瑣的流程,體驗極差。
實際上對於任何和 Flink 連接的外部系統都可能有類似的上述問題,在 1.11.0 中重點解決了和關系型數據庫對接的這個問題。FLIP-93 提供了 JDBC catalog 的基礎接口以及 Postgres catalog 的實現,這樣方便后續實現與其它類型的關系型數據庫的對接。
1.11.0 版本后,用戶使用 Flink SQL 時可以自動獲取表的 schema 而不再需要輸入 DDL。除此之外,任何 schema 不匹配的錯誤都會在編譯階段提前進行檢查報錯,避免了之前運行時報錯造成的作業失敗。這是提升易用性和用戶體驗的一個典型例子。
6.3. Hive 實時數倉
從 1.9.0 版本開始 Flink 從生態角度致力於集成 Hive,目標打造批流一體的 Hive 數倉。經過前兩個版本的迭代,已經達到了 batch 兼容且生產可用,在 TPC-DS 10T benchmark 下性能達到 Hive 3.0 的 7 倍以上。
1.11.0 在 Hive 生態中重點實現了實時數倉方案,改善了端到端流式 ETL 的用戶體驗,達到了批流一體 Hive 數倉的目標。同時在兼容性、性能、易用性方面也進一步進行了加強。
在實時數倉的解決方案中,憑借 Flink 的流式處理優勢做到實時讀寫 Hive:
- Hive 寫入:FLIP-115 完善擴展了 FileSystem connector 的基礎能力和實現,Table/SQL 層的 sink 可以支持各種格式(CSV、Json、Avro、Parquet、ORC),而且支持 Hive table 的所有格式。
- Partition 支持:數據導入 Hive 引入 partition 提交機制來控制可見性,通過sink.partition-commit.trigger 控制 partition 提交的時機,通過 sink.partition-commit.policy.kind 選擇提交策略,支持 SUCCESS 文件和 metastore 提交。
- Hive 讀取:實時化的流式讀取 Hive,通過監控 partition 生成增量讀取新 partition,或者監控文件夾內新文件生成來增量讀取新文件。在 Hive 可用性方面的提升:
- FLIP-123 通過 Hive Dialect 為用戶提供語法兼容,這樣用戶無需在 Flink 和 Hive 的 CLI 之間切換,可以直接遷移 Hive 腳本到 Flink 中執行。
- 提供 Hive 相關依賴的內置支持,避免用戶自己下載所需的相關依賴。現在只需要單獨下載一個包,配置 HADOOP_CLASSPATH 就可以運行。
- 在 Hive 性能方面,1.10.0 中已經支持了 ORC(Hive 2+)的向量化讀取,1.11.0 中我們補全了所有版本的 Parquet 和 ORC 向量化支持來提升性能。
6.4. 全新 Source API
前面也提到過,source 和 sink 是 Flink 對接外部系統的一個橋梁,對於完善生態、可用性及端到端的用戶體驗是很重要的環節。社區早在一年前就已經規划了 source 端的徹底重構,從 FLIP-27 的 ID 就可以看出是很早的一個 feature。但是由於涉及到很多復雜的內部機制和考慮到各種 source connector 的實現,設計上需要考慮的很全面。從 1.10.0 就開始做 POC 的實現,最終趕上了 1.11.0 版本的發布。
先簡要回顧下 source 之前的主要問題:
對用戶而言,在 Flink 中改造已有的 source 或者重新實現一個生產級的 source connector 不是一件容易的事情,具體體現在沒有公共的代碼可以復用,而且需要理解很多 Flink 內部細節以及實現具體的 event time 分配、watermark 產出、idleness 監測、線程模型等。
批和流的場景需要實現不同的 source。
partitions/splits/shards 概念在接口中沒有顯式表達,比如 split 的發現邏輯和數據消費都耦合在 source function 的實現中,這樣在實現 Kafka 或 Kinesis 類型的 source 時增加了復雜性。
在 runtime 執行層,checkpoint 鎖被 source function 搶占會帶來一系列問題,框架很難進行優化。
FLIP-27 在設計時充分考慮了上述的痛點:
- 首先在 Job Manager 和 Task Manager 中分別引入兩種不同的組件 Split Enumerator 和 Source reader,解耦 split 發現和對應的消費處理,同時方便隨意組合不同的策略。比如現有的 Kafka connector 中有多種不同的 partition 發現策略和實現耦合在一起,在新的架構下,我們只需要實現一種 source reader,就可以適配多種 split enumerator 的實現來對應不同的 partition 發現策略。
- 在新架構下實現的 source connector 可以做到批流統一,唯一的小區別是對批場景的有限輸入,split enumerator 會產出固定數量的 split 集合並且每個 split 都是有限數據集;對於流場景的無限輸入,split enumerator 要么產出無限多的 split 或者 split 自身是無限數據集。
- 復雜的 timestamp assigner 以及 watermark generator 透明的內置在 source reader 模塊內運行,對用戶來說是無感知的。這樣用戶如果想實現新的 source connector,一般不再需要重復實現這部分功能。
目前 Flink 已有的 source connector 會在后續的版本中基於新架構來重新實現,legacy source 也會繼續維護幾個版本保持兼容性,用戶也可以按照 release 文檔中的說明來嘗試體驗新 source 的開發。
6.5. PyFlink 生態
眾所周知,Python 語言在機器學習和數據分析領域有着廣泛的使用。Flink 從 1.9.0 版本開始發力兼容 Python 生態,Python 和 Flink 合力為 PyFlink,把 Flink 的實時分布式處理能力輸出給 Python 用戶。前兩個版本 PyFlink 已經支持了 Python Table API 和 UDF,在 1.11.0 中擴大對 Python 生態庫 Pandas 的支持以及和 SQL DDL/Client 的集成,同時 Python UDF 性能有了極大的提升。
具體來說,之前普通的 Python UDF 每次調用只能處理一條數據,而且在 Java 端和 Python 端都需要序列化/反序列化,開銷很大。1.11.0 中 Flink 支持在 Table & SQL 作業中自定義和使用向量化 Python UDF,用戶只需要在 UDF 修飾中額外增加一個參數 udf_type=“pandas” 即可。這樣帶來的好處是:
- 每次調用可以處理 N 條數據。
- 數據格式基於 Apache Arrow,大大降低了 Java、Python 進程之間的序列化/反序列化開銷。
- 方便 Python 用戶基於 Numpy 和 Pandas 等數據分析領域常用的 Python 庫,開發高性能的 Python UDF。
除此之外,1.11.0 中 PyFlink 還支持:
- PyFlink table 和 Pandas DataFrame 之間無縫切換(FLIP-120),增強 Pandas 生態的易用性和兼容性。
- Table & SQL 中可以定義和使用 Python UDTF(FLINK-14500),不再必需 Java/Scala UDTF。
- Cython 優化 Python UDF 的性能(FLIP-121),對比 1.10.0 可以提升 30 倍。
- Python UDF 中用戶自定義 metric(FLIP-112),方便監控和調試 UDF 的執行。
上述解讀的都是側重 API 層面,用戶開發作業可以直接感知到的易用性的提升。下面我們看看執行引擎層在 1.11.0 中都有哪些值得關注的變化。
6.6. 生產可用性和穩定性提升
6.6.1 支持 application 模式和 Kubernetes 增強
1.11.0 版本前,Flink 主要支持如下兩種模式運行:
Session 模式:提前啟動一個集群,所有作業都共享這個集群的資源運行。優勢是避免每個作業單獨啟動集群帶來的額外開銷,缺點是隔離性稍差。如果一個作業把某個 Task Manager(TM)容器搞掛,會導致這個容器內的所有作業都跟着重啟。雖然每個作業有自己獨立的 Job Manager(JM)來管理,但是這些 JM 都運行在一個進程中,容易帶來負載上的瓶頸。
Per-job 模式:為了解決 session 模式隔離性差的問題,每個作業根據資源需求啟動獨立的集群,每個作業的 JM 也是運行在獨立的進程中,負載相對小很多。
以上兩種模式的共同問題是需要在客戶端執行用戶代碼,編譯生成對應的 Job Graph 提交到集群運行。在這個過程需要下載相關 jar 包並上傳到集群,客戶端和網絡負載壓力容易成為瓶頸,尤其當一個客戶端被多個用戶共享使用。
1.11.0 中引入了 application 模式(FLIP-85)來解決上述問題,按照 application 粒度來啟動一個集群,屬於這個 application 的所有 job 在這個集群中運行。核心是 Job Graph 的生成以及作業的提交不在客戶端執行,而是轉移到 JM 端執行,這樣網絡下載上傳的負載也會分散到集群中,不再有上述 client 單點上的瓶頸。
用戶可以通過 bin/flink run-application 來使用 application 模式,目前 Yarn 和 Kubernetes(K8s)都已經支持這種模式。Yarn application 會在客戶端將運行作業需要的依賴都通過 Yarn Local Resource 傳遞到 JM。K8s application 允許用戶構建包含用戶 jar 與依賴的鏡像,同時會根據作業自動創建 TM,並在結束后銷毀整個集群,相比 session 模式具有更好的隔離性。K8s 不再有嚴格意義上的 per-job 模式,application 模式相當於 per-job 在集群進行提交作業的實現。
除了支持 application 模式,Flink 原生 K8s 在 1.11.0 中還完善了很多基礎的功能特性(FLINK-14460),以達到生產可用性的標准。例如 Node Selector、Label、Annotation、Toleration 等。為了更方便的與 Hadoop 集成,也支持根據環境變量自動掛載 Hadoop 配置的功能。
6.6.2 Checkpoint & Savepoint 優化
checkpoint 和 savepoint 機制一直是 Flink 保持先進性的核心競爭力之一,社區在這個領域的改動很謹慎,最近的幾個大版本中幾乎沒有大的功能和架構上的調整。在用戶郵件列表中,我們經常能看到用戶反饋和抱怨的相關問題:比如 checkpoint 長時間做不出來失敗,savepoint 在作業重啟后不可用等等。1.11.0 有選擇的解決了一些這方面的常見問題,提高生產可用性和穩定性。
1.11.0 之前, savepoint 中 meta 數據和 state 數據分別保存在兩個不同的目錄中,這樣如果想遷移 state 目錄很難識別這種映射關系,也可能導致目錄被誤刪除,對於目錄清理也同樣有麻煩。1.11.0 把兩部分數據整合到一個目錄下,這樣方便整體轉移和復用。另外,之前 meta 引用 state 采用的是絕對路徑,這樣 state 目錄遷移后路徑發生變化也不可用,1.11.0 把 state 引用改成了相對路徑解決了這個問題(FLINK-5763),這樣 savepoint 的管理維護、復用更加靈活方便。
實際生產環境中,用戶經常遭遇 checkpoint 超時失敗、長時間不能完成帶來的困擾。一旦作業 failover 會造成回放大量的歷史數據,作業長時間沒有進度,端到端的延遲增加。1.11.0 從不同維度對 checkpoint 的優化和提速做了改進,目標實現分鍾甚至秒級的輕量型 checkpoint。
首先,增加了 Checkpoint Coordinator 通知 task 取消 checkpoint 的機制(FLINK-8871),這樣避免 task 端還在執行已經取消的 checkpoint 而對系統帶來不必要的壓力。同時 task 端放棄已經取消的 checkpoint,可以更快的參與執行 coordinator 新觸發的 checkpoint,某種程度上也可以避免新 checkpoint 再次執行超時而失敗。這個優化也對后面默認開啟 local recovery 提供了便利,task 端可以及時清理失效 checkpoint 的資源。
- 在反壓場景下,整個數據鏈路堆積了大量 buffer,導致 checkpoint barrier 排在數據 buffer 后面,不能被 task 及時處理對齊,也就導致了 checkpoint 長時間不能執行。1.11.0 中從兩個維度對這個問題進行解決:
1)嘗試減少數據鏈路中的 buffer 總量(FLINK-16428),這樣 checkpoint barrier 可以盡快被處理對齊。
上游輸出端控制單個 sub partition 堆積 buffer 的最大閾值(backlog),避免負載不均場景下單個鏈路上堆積大量 buffer。在不影響網絡吞吐性能的情況下合理修改上下游默認的 buffer 配置。上下游數據傳輸的基礎協議進行了調整,允許單個數據鏈路可以配置 0 個獨占 buffer 而不死鎖,這樣總的 buffer 數量和作業並發規模解耦。根據實際需求在吞吐性能和 checkpoint 速度兩者之間權衡,自定義 buffer 配比。這個優化有一部分工作已經在 1.11.0 中完成,剩余部分會在下個版本繼續推進完成。
2)實現了全新的 unaligned checkpoint 機制(FLIP-76)從根本上解決了反壓場景下 checkpoint barrier 對齊的問題。
實際上這個想法早在 1.10.0 版本之前就開始醞釀設計,由於涉及到很多模塊的大改動,實現機制和線程模型也很復雜。我們實現了兩種不同方案的原型 POC 進行了測試、性能對比,確定了最終的方案,因此直到 1.11.0 才完成了 MVP 版本,這也是 1.11.0 中執行引擎層唯一的一個重量級 feature。其基本思想可以概括為:
Checkpoint barrier 跨數據 buffer 傳輸,不在輸入輸出隊列排隊等待處理,這樣就和算子的計算能力解耦,barrier 在節點之間的傳輸只有網絡延時,可以忽略不計。每個算子多個輸入鏈路之間不需要等待 barrier 對齊來執行 checkpoint,第一個到的 barrier 就可以提前觸發 checkpoint,這樣可以進一步提速 checkpoint,不會因為個別鏈路的延遲而影響整體。
為了和之前 aligned checkpoint 的語義保持一致,所有未被處理的輸入輸出數據 buffer 都將作為 channel state 在 checkpoint 執行時進行快照持久化,在 failover 時連同 operator state 一同進行恢復。
換句話說,aligned 機制保證的是 barrier 前面所有數據必須被處理完,狀態實時體現到 operator state 中;而 unaligned 機制把 barrier 前面的未處理數據所反映的 operator state 延后到 failover restart 時通過 channel state 回放進行體現,從狀態恢復的角度來說最終都是一致的。 注意這里雖然引入了額外的 in-flight buffer 的持久化,但是這個過程實際是在 checkpoint 的異步階段完成的,同步階段只是進行了輕量級的 buffer 引用,所以不會過多占用算子的計算時間而影響吞吐性能。
Unaligned checkpoint 在反壓嚴重的場景下可以明顯加速 checkpoint 的完成時間,因為它不再依賴於整體的計算吞吐能力,而和系統的存儲性能更加相關,相當於計算和存儲的解耦。但是它的使用也有一定的局限性,它會增加整體 state 的大小,對存儲 IO 帶來額外的開銷,因此在 IO 已經是瓶頸的場景下就不太適合使用 unaligned checkpoint 機制。
1.11.0 中 unaligned checkpoint 還沒有作為默認模式,需要用戶手動配置來開啟,並且只在 exactly-once 模式下生效。但目前還不支持 savepoint 模式,因為 savepoint 涉及到作業的 rescale 場景,channel state 目前還不支持 state 拆分,在后面的版本會進一步支持,所以 savepoint 目前還是會使用之前的 aligned 模式,在反壓場景下有可能需要很長時間才能完成。
引用文章: https://developer.aliyun.com/article/767711
七 .Flink 1.12 版本 [重要版本]
- 在 DataStream API 上添加了高效的批執行模式的支持。這是批處理和流處理實現真正統一的運行時的一個重要里程碑。
- 實現了基於Kubernetes的高可用性(HA)方案,作為生產環境中,ZooKeeper方案之外的另外一種選擇。
- 擴展了 Kafka SQL connector,使其可以在 upsert 模式下工作,並且支持在 SQL DDL 中處理 connector 的 metadata。現在,時態表 Join 可以完全用 SQL 來表示,不再依賴於 Table API 了。
- PyFlink 中添加了對於 DataStream API 的支持,將 PyFlink 擴展到了更復雜的場景,比如需要對狀態或者定時器 timer 進行細粒度控制的場景。除此之外,現在原生支持將 PyFlink 作業部署到 Kubernetes上。
7.1. DataStream API 支持批執行模式
Flink 的核心 API 最初是針對特定的場景設計的,盡管 Table API / SQL 針對流處理和批處理已經實現了統一的 API,但當用戶使用較底層的 API 時,仍然需要在批處理(DataSet API)和流處理(DataStream API)這兩種不同的 API 之間進行選擇。鑒於批處理是流處理的一種特例,將這兩種 API 合並成統一的 API,有一些非常明顯的好處,比如:
- 可復用性:作業可以在流和批這兩種執行模式之間自由地切換,而無需重寫任何代碼。因此,用戶可以復用同一個作業,來處理實時數據和歷史數據。
- 維護簡單:統一的 API 意味着流和批可以共用同一組 connector,維護同一套代碼,並能夠輕松地實現流批混合執行,例如 backfilling 之類的場景。
考慮到這些優點,社區已朝着流批統一的 DataStream API 邁出了第一步:支持高效的批處理(FLIP-134)。從長遠來看,這意味着 DataSet API 將被棄用(FLIP-131),其功能將被包含在 DataStream API 和 Table API / SQL 中。
- 有限流上的批處理
您已經可以使用 DataStream API 來處理有限流(例如文件)了,但需要注意的是,運行時並不“知道”作業的輸入是有限的。為了優化在有限流情況下運行時的執行性能,新的 BATCH 執行模式,對於聚合操作,全部在內存中進行,且使用 sort-based shuffle(FLIP-140)和優化過的調度策略(請參見 Pipelined Region Scheduling 了解更多詳細信息)。因此,DataStream API 中的 BATCH 執行模式已經非常接近 Flink 1.12 中 DataSet API 的性能。有關性能的更多詳細信息,請查看 FLIP-140。
在 Flink 1.12 中,默認執行模式為 STREAMING,要將作業配置為以 BATCH 模式運行,可以在提交作業的時候,設置參數 execution.runtime-mode:
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
或者通過編程的方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeMode.BATCH);
注意:盡管 DataSet API 尚未被棄用,但我們建議用戶優先使用具有 BATCH 執行模式的 DataStream API 來開發新的批作業,並考慮遷移現有的 DataSet 作業。
7.2. 新的 Data Sink API (Beta)
之前發布的 Flink 版本中[1],已經支持了 source connector 工作在流批兩種模式下,因此在 Flink 1.12 中,社區着重實現了統一的 Data Sink API(FLIP-143)。新的抽象引入了 write/commit 協議和一個更加模塊化的接口。Sink 的實現者只需要定義 what 和 how:SinkWriter,用於寫數據,並輸出需要 commit 的內容(例如,committables);Committer 和 GlobalCommitter,封裝了如何處理 committables。框架會負責 when 和 where:即在什么時間,以及在哪些機器或進程中 commit。
這種模塊化的抽象允許為 BATCH 和 STREAMING 兩種執行模式,實現不同的運行時策略,以達到僅使用一種 sink 實現,也可以使兩種模式都可以高效執行。Flink 1.12 中,提供了統一的 FileSink connector,以替換現有的 StreamingFileSink connector (FLINK-19758)。其它的 connector 也將逐步遷移到新的接口。
7.3. 基於 Kubernetes 的高可用 (HA) 方案
Flink 可以利用 Kubernetes 提供的內置功能來實現 JobManager 的 failover,而不用依賴 ZooKeeper。為了實現不依賴於 ZooKeeper 的高可用方案,社區在 Flink 1.12(FLIP-144)中實現了基於 Kubernetes 的高可用方案。該方案與 ZooKeeper 方案基於相同的接口[3],並使用 Kubernetes 的 ConfigMap[4] 對象來處理從 JobManager 的故障中恢復所需的所有元數據。關於如何配置高可用的 standalone 或原生 Kubernetes 集群的更多詳細信息和示例,請查閱文檔[5]。
注意:需要注意的是,這並不意味着 ZooKeeper 將被刪除,這只是為 Kubernetes 上的 Flink 用戶提供了另外一種選擇。
7.4. 其它功能改進
- 將現有的 connector 遷移到新的 Data Source API
在之前的版本中,Flink 引入了新的 Data Source API(FLIP-27),以允許實現同時適用於有限數據(批)作業和無限數據(流)作業使用的 connector 。在 Flink 1.12 中,社區從 FileSystem connector(FLINK-19161)出發,開始將現有的 source connector 移植到新的接口。
注意: 新的 source 實現,是完全不同的實現,與舊版本的實現不兼容。
- Pipelined Region 調度 (FLIP-119)
在之前的版本中,Flink 對於批作業和流作業有兩套獨立的調度策略。Flink 1.12 版本中,引入了統一的調度策略, 該策略通過識別 blocking 數據傳輸邊,將 ExecutionGraph 分解為多個 pipelined region。這樣一來,對於一個 pipelined region 來說,僅當有數據時才調度它,並且僅在所有其所需的資源都被滿足時才部署它;同時也可以支持獨立地重啟失敗的 region。對於批作業來說,新策略可顯著地提高資源利用率,並消除死鎖。
- 支持 Sort-Merge Shuffle (FLIP-148)
為了提高大規模批作業的穩定性、性能和資源利用率,社區引入了 sort-merge shuffle,以替代 Flink 現有的實現。這種方案可以顯著減少 shuffle 的時間,並使用較少的文件句柄和文件寫緩存(這對於大規模批作業的執行非常重要)。在后續版本中(FLINK-19614),Flink 會進一步優化相關性能。
注意:該功能是實驗性的,在 Flink 1.12 中默認情況下不啟用。要啟用 sort-merge shuffle,需要在 TaskManager 的網絡配置[6]中設置合理的最小並行度。
- Flink WebUI 的改進 (FLIP-75)
作為對上一個版本中,Flink WebUI 一系列改進的延續,Flink 1.12 在 WebUI 上暴露了 JobManager 內存相關的指標和配置參數(FLIP-104)。對於 TaskManager 的指標頁面也進行了更新,為 Managed Memory、Network Memory 和 Metaspace 添加了新的指標,以反映自 Flink 1.10(FLIP-102)開始引入的 TaskManager 內存模型的更改[7]。
7.5. Table API/SQL 變更
7.5.1. SQL Connectors 中的 Metadata 處理
如果可以將某些 source(和 format)的元數據作為額外字段暴露給用戶,對於需要將元數據與記錄數據一起處理的用戶來說很有意義。一個常見的例子是 Kafka,用戶可能需要訪問 offset、partition 或 topic 信息、讀寫 kafka 消息中的 key 或 使用消息 metadata中的時間戳進行時間相關的操作。
在 Flink 1.12 中,Flink SQL 支持了元數據列用來讀取和寫入每行數據中 connector 或 format 相關的列(FLIP-107)。這些列在 CREATE TABLE 語句中使用 METADATA(保留)關鍵字來聲明。
CREATE TABLE kafka_table (
id BIGINT,
name STRING,
event_time TIMESTAMP(3) METADATA FROM 'timestamp', -- access Kafka 'timestamp' metadata
headers MAP METADATA -- access Kafka 'headers' metadata
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'format' = 'avro'
);
在 Flink 1.12 中,已經支持 Kafka 和 Kinesis connector 的元數據,並且 FileSystem connector 上的相關工作也已經在計划中(FLINK-19903)。由於 Kafka record 的結構比較復雜,社區還專門為 Kafka connector 實現了新的屬性[8],以控制如何處理鍵/值對。關於 Flink SQL 中元數據支持的完整描述,請查看每個 connector 的文檔[9]以及 FLIP-107 中描述的用例。
7.5.2. Upsert Kafka Connector
在某些場景中,例如讀取 compacted topic 或者輸出(更新)聚合結果的時候,需要將 Kafka 消息記錄的 key 當成主鍵處理,用來確定一條數據是應該作為插入、刪除還是更新記錄來處理。為了實現該功能,社區為 Kafka 專門新增了一個 upsert connector(upsert-kafka),該 connector 擴展自現有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既可以作為 source 使用,也可以作為 sink 使用,並且提供了與現有的 kafka connector 相同的基本功能和持久性保證,因為兩者之間復用了大部分代碼。
要使用 upsert-kafka connector,必須在創建表時定義主鍵,並為鍵(key.format)和值(value.format)指定序列化反序列化格式。完整的示例,請查看最新的文檔[10]。
7.5.3. SQL 中 支持 Temporal Table Join
在之前的版本中,用戶需要通過創建時態表函數(temporal table function) 來支持時態表 join(temporal table join) ,而在 Flink 1.12 中,用戶可以使用標准的 SQL 語句 FOR SYSTEM_TIME AS OF(SQL:2011)來支持 join。此外,現在任意包含時間列和主鍵的表,都可以作為時態表,而不僅僅是 append-only 表。這帶來了一些新的應用場景,比如將 Kafka compacted topic 或數據庫變更日志(來自 Debezium 等)作為時態表。
CREATE TABLE orders (
order_id STRING,
currency STRING,
amount INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND
) WITH (
…
);
-- Table backed by a Kafka compacted topic
CREATE TABLE latest_rates (
currency STRING,
rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time - INTERVAL ‘5’ SECOND,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
…
);
-- Event-time temporal table join
SELECT
o.order_id,
o.order_time,
o.amount * r.rate AS amount,
r.currency
FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.order_time r
ON o.currency = r.currency;
上面的示例同時也展示了如何在 temporal table join 中使用 Flink 1.12 中新增的 upsert-kafka connector。
- 使用 Hive 表進行 Temporal Table Join
用戶也可以將 Hive 表作為時態表來使用,Flink 既支持自動讀取 Hive 表的最新分區作為時態表(FLINK-19644),也支持在作業執行時追蹤整個 Hive 表的最新版本作為時態表。請參閱文檔,了解更多關於如何在 temporal table join 中使用 Hive 表的示例。
7.5.4. Table API/SQL 中的其它改進
- Kinesis Flink SQL Connector (FLINK-18858)
從 Flink 1.12 開始,Table API / SQL 原生支持將 Amazon Kinesis Data Streams(KDS)作為 source 和 sink 使用。新的 Kinesis SQL connector 提供了對於增強的Fan-Out(EFO)以及 Sink Partition 的支持。如需了解 Kinesis SQL connector 所有支持的功能、配置選項以及對外暴露的元數據信息,請查看最新的文檔。
- 在 FileSystem/Hive connector 的流式寫入中支持小文件合並 (FLINK-19345)
很多 bulk format,例如 Parquet,只有當寫入的文件比較大時,才比較高效。當 checkpoint 的間隔比較小時,這會成為一個很大的問題,因為會創建大量的小文件。在 Flink 1.12 中,File Sink 增加了小文件合並功能,從而使得即使作業 checkpoint 間隔比較小時,也不會產生大量的文件。要開啟小文件合並,可以按照文檔[11]中的說明在 FileSystem connector 中設置 auto-compaction = true 屬性。
- Kafka Connector 支持 Watermark 下推 (FLINK-20041)
為了確保使用 Kafka 的作業的結果的正確性,通常來說,最好基於分區來生成 watermark,因為分區內數據的亂序程度通常來說比分區之間數據的亂序程度要低很多。Flink 現在允許將 watermark 策略下推到 Kafka connector 里面,從而支持在 Kafka connector 內部構造基於分區的 watermark[12]。一個 Kafka source 節點最終所產生的 watermark 由該節點所讀取的所有分區中的 watermark 的最小值決定,從而使整個系統可以獲得更好的(即更接近真實情況)的 watermark。該功能也允許用戶配置基於分區的空閑檢測策略,以防止空閑分區阻礙整個作業的 event time 增長。
新增的 Formats
利用 Multi-input 算子進行 Join 優化 (FLINK-19621)
Shuffling 是一個 Flink 作業中最耗時的操作之一。為了消除不必要的序列化反序列化開銷、數據 spilling 開銷,提升 Table API / SQL 上批作業和流作業的性能, planner 當前會利用上一個版本中已經引入的N元算子(FLIP-92),將由 forward 邊所連接的多個算子合並到一個 Task 里執行。
Type Inference for Table API UDAFs (FLIP-65)
Flink 1.12 完成了從 Flink 1.9 開始的,針對 Table API 上的新的類型系統[2]的工作,並在聚合函數(UDAF)上支持了新的類型系統。從 Flink 1.12 開始,與標量函數和表函數類似,聚合函數也支持了所有的數據類型。
7.6. PyFlink: Python DataStream API
為了擴展 PyFlink 的可用性,Flink 1.12 提供了對於 Python DataStream API(FLIP-130)的初步支持,該版本支持了無狀態類型的操作(例如 Map,FlatMap,Filter,KeyBy 等)。如果需要嘗試 Python DataStream API,可以安裝PyFlink,然后按照該文檔[14]進行操作,文檔中描述了如何使用 Python DataStream API 構建一個簡單的流應用程序。
from pyflink.common.typeinfo import Types
from pyflink.datastream import MapFunction, StreamExecutionEnvironment
class MyMapFunction(MapFunction):
def map(self, value):
return value + 1
env = StreamExecutionEnvironment.get_execution_environment()
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
mapped_stream.print()
env.execute("datastream job")
7.7.PyFlink 中的其它改進
- PyFlink Jobs on Kubernetes (FLINK-17480)
除了 standalone 部署和 YARN 部署之外,現在也原生支持將 PyFlink 作業部署在 Kubernetes 上。最新的文檔中詳細描述了如何在 Kubernetes 上啟動 session 或 application 集群。
- 用戶自定義聚合函數 (UDAFs)
從 Flink 1.12 開始,您可以在 PyFlink 作業中定義和使用 Python UDAF 了(FLIP-139)。普通的 UDF(標量函數)每次只能處理一行數據,而 UDAF(聚合函數)則可以處理多行數據,用於計算多行數據的聚合值。您也可以使用 Pandas UDAF[15](FLIP-137),來進行向量化計算(通常來說,比普通 Python UDAF 快10倍以上)。
注意: 普通 Python UDAF,當前僅支持在 group aggregations 以及流模式下使用。如果需要在批模式或者窗口聚合中使用,建議使用 Pandas UDAF。
原文: https://developer.aliyun.com/article/780123
官方原文: https://flink.apache.org/news/2020/12/10/release-1.12.0.html
你好,我是王知無,一個大數據領域的硬核原創作者。
做過后端架構、數據中間件、數據平台&架構、算法工程化。
專注大數據領域實時動態&技術提升&個人成長&職場進階,歡迎關注。