一 .前言
官方發布了Flink1.14版本,但是遺憾的是,中文官網中的案例和資料還都是基於很古老的版本。所以大家照着官網資料跑不通基本代碼也是很正常的。
所以整理一下從1.7 版本到1.14版本之間的相對大的變動. 做到在學習的過程中可以做到心里有數。
二 .Flink 1.7 版本
在 Flink 1.7.0,我們更關注實現快速數據處理以及以無縫方式為 Flink 社區構建數據密集型應用程序。我們最新版本包括一些令人興奮的新功能和改進,例如對 Scala 2.12 的支持,Exactly-Once 語義的 S3 文件接收器,復雜事件處理與流SQL的集成.
2.1. Flink中的Scala 2.12支持
Flink 1.7.0 是第一個完全支持 Scala 2.12 的版本。這可以讓用戶使用新的 Scala 版本編寫 Flink 應用程序以及利用 Scala 2.12 的生態系統。
2.2. 狀態變化
在許多情況下,由於需求的變化,長期運行的 Flink 應用程序會在其生命周期內發生變化。在不丟失當前應用程序進度狀態的情況下更改用戶狀態是應用程序變化的關鍵要求。Flink 1.7.0 版本中社區添加了狀態變化,允許我們靈活地調整長時間運行的應用程序的用戶狀態模式,同時保持與先前保存點的兼容。通過狀態變化,我們可以在狀態模式中添加或刪除列。當使用 Avro 生成類作為用戶狀態時,狀態模式變化可以開箱即用,這意味着狀態模式可以根據 Avro 的規范進行變化。雖然 Avro 類型是 Flink 1.7 中唯一支持模式變化的內置類型,但社區仍在繼續致力於在未來的 Flink 版本中進一步擴展對其他類型的支持。
2.3. Exactly-once語義的S3 StreamingFileSink
Flink 1.6.0 中引入的 StreamingFileSink 現在已經擴展到 S3 文件系統,並保證 Exactly-once 語義。使用此功能允許所有 S3 用戶構建寫入 S3 的 Exactly-once 語義端到端管道。
2.4. Streaming SQL中支持MATCH_RECOGNIZE
這是 Apache Flink 1.7.0 的一個重要補充,它為 Flink SQL 提供了 MATCH_RECOGNIZE 標准的初始支持。此功能融合了復雜事件處理(CEP)和SQL,可以輕松地對數據流進行模式匹配,從而實現一整套新的用例。此功能目前處於測試階段。
2.5. Streaming SQL中的 Temporal Tables 和 Temporal Joins
Temporal Tables 是 Apache Flink 中的一個新概念,它為表的更改歷史記錄提供(參數化)視圖,可以返回表在任何時間點的內容。例如,我們可以使用具有歷史貨幣匯率的表。隨着時間的推移,表會不斷發生變化,並增加更新的匯率。Temporal Table 是一種視圖,可以返回匯率在任何時間點的實際狀態。通過這樣的表,可以使用正確的匯率將不同貨幣的訂單流轉換為通用貨幣。
Temporal Joins 允許 Streaming 數據與不斷變化/更新的表的內存和計算效率的連接,使用處理時間或事件時間,同時符合ANSI SQL。
流式 SQL 的其他功能除了上面提到的主要功能外,Flink 的 Table&SQL API 已經擴展到更多用例。以下內置函數被添加到API:TO_BASE64,LOG2,LTRIM,REPEAT,REPLACE,COSH,SINH,TANH。SQL Client 現在支持在環境文件和 CLI 會話中自定義視圖。此外,CLI 中還添加了基本的 SQL 語句自動完成功能。社區添加了一個 Elasticsearch 6 table sink,允許存儲動態表的更新結果。
2.6. 版本化REST API
從 Flink 1.7.0 開始,REST API 已經版本化。這保證了 Flink REST API 的穩定性,因此可以在 Flink 中針對穩定的 API開發第三方應用程序。因此,未來的 Flink 升級不需要更改現有的第三方集成。
2.7. Kafka 2.0 Connector
Apache Flink 1.7.0 繼續添加更多的連接器,使其更容易與更多外部系統進行交互。在此版本中,社區添加了 Kafka 2.0 連接器,可以從 Kafka 2.0 讀寫數據時保證 Exactly-Once 語義。
2.8. 本地恢復
Apache Flink 1.7.0 通過擴展 Flink 的調度來完成本地恢復功能,以便在恢復時考慮之前的部署位置。如果啟用了本地恢復,Flink 將在運行任務的機器上保留一份最新檢查點的本地副本。將任務調度到之前的位置,Flink 可以通過從本地磁盤讀取檢查點狀態來最小化恢復狀態的網絡流量。此功能大大提高了恢復速度。
2.9. 刪除Flink的傳統模式
Apache Flink 1.7.0 標志着 Flip-6 工作已經完全完成並且與傳統模式達到功能奇偶校驗。因此,此版本刪除了對傳統模式的支持。
三 .Flink 1.8 版本
- 新特性和改進:
- Schema Evolution Story 最終版
- 基於 TTL 持續清除舊狀態
- 使用用戶定義的函數和聚合進行 SQL 模式檢測
- 符合 RFC 的 CSV 格式
- 新的 KafkaDeserializationSchema,可以直接訪問 ConsumerRecord
- FlinkKinesisConsumer 中的分片水印選項
- DynamoDB Streams 的新用戶捕獲表更改
- 支持用於子任務協調的全局聚合
重要變化:
- 使用 Flink 捆綁 Hadoop 庫的更改:不再發布包含 hadoop 的便捷二進制文件
- FlinkKafkaConsumer 現在將根據主題規范過濾已恢復的分區
- 表 API 的 Maven 依賴更改:之前具有flink-table依賴關系的用戶需要將依賴關系從flink-table-planner更新為正確的依賴關系 flink-table-api-,具體取決於是使用 Java 還是 Scala:flink-table-api-java-bridge或者flink-table-api-scala-bridge
3.1. 使用TTL(生存時間)連續增量清除舊的Key狀態
我們在Flink 1.6(FLINK-9510)中為Key狀態引入了TTL(生存時間)。此功能允許在訪問時清理並使Key狀態條目無法訪問。另外,在編寫保存點/檢查點時,現在也將清理狀態。Flink 1.8引入了對RocksDB狀態后端(FLINK-10471)和堆狀態后端(FLINK-10473)的舊條數的連續清理。這意味着舊的條數將(根據TTL設置)不斷被清理掉。
3.2. 恢復保存點時對模式遷移的新支持
使用Flink 1.7.0,我們在使用AvroSerializer時添加了對更改狀態模式的支持。使用Flink 1.8.0,我們在TypeSerializers將所有內置遷移到新的序列化器快照抽象方面取得了很大進展,該抽象理論上允許模式遷移。在Flink附帶的序列化程序中,我們現在支持PojoSerializer (FLINK-11485)和Java EnumSerializer (FLINK-11334)以及有限情況下的Kryo(FLINK-11323)的模式遷移格式。
3.3. 保存點兼容性
TraversableSerializer 此序列化程序(FLINK-11539)中的更新,包含Scala的Flink 1.2中的保存點將不再與Flink 1.8兼容。可以通過升級到Flink 1.3和Flink 1.7之間的版本,然后再更新至Flink 1.8來解決此限制。
3.4. RocksDB版本沖突並切換到FRocksDB(FLINK-10471)
需要切換到名為FRocksDB的RocksDB的自定義構建,因為需要RocksDB中的某些更改來支持使用TTL進行連續狀態清理。FRocksDB的已使用版本基於RocksDB的升級版本5.17.2。對於Mac OS X,僅支持OS X版本> =10.13的RocksDB版本5.17.2。
另見:https://github.com/facebook/rocksdb/issues/4862
3.5. Maven 依賴
使用Flink捆綁Hadoop庫的更改(FLINK-11266)
包含hadoop的便捷二進制文件不再發布。
如果部署依賴於flink-shaded-hadoop2包含 flink-dist,則必須從下載頁面的可選組件部分手動下載並打包Hadoop jar並將其復制到/lib目錄中。另外一種方法,可以通過打包flink-dist和激活 include-hadoopmaven配置文件來構建包含hadoop的Flink分發。
由於hadoop flink-dist默認不再包含在內,因此指定-DwithoutHadoop何時打包flink-dist將不再影響構建。
3.6. TaskManager配置(FLINK-11716)
TaskManagers現在默認綁定到主機IP地址而不是主機名。可以通過配置選項控制此行為taskmanager.network.bind-policy。如果你的Flink集群在升級后遇到莫名其妙的連接問題,嘗試設置taskmanager.network.bind-policy: name在flink-conf.yaml 返回前的1.8的設置行為。
3.7. Table API 的變動
- 直接表構造函數使用的取消預測(FLINK-11447) Flink 1.8不贊成Table在Table API中直接使用該類的構造函數。此構造函數以前將用於執行與橫向表的連接。你現在應該使用table.joinLateral()或 table.leftOuterJoinLateral()代替。這種更改對於將Table類轉換為接口是必要的,這將使Table API在未來更易於維護和更清潔。
- 引入新的CSV格式符(FLINK-9964)
此版本為符合RFC4180的CSV文件引入了新的格式符。新描述符可用作 org.apache.flink.table.descriptors.Csv。
目前,這只能與Kafka一起使用。舊描述符可org.apache.flink.table.descriptors.OldCsv用於文件系統連接器。
靜態生成器方法在TableEnvironment(FLINK-11445)上的棄用,為了將API與實際實現分開:TableEnvironment.getTableEnvironment()。
不推薦使用靜態方法。你現在應該使用Batch/StreamTableEnvironment.create()。
- 表API Maven模塊中的更改(FLINK-11064)
之前具有flink-table依賴關系的用戶需要更新其依賴關系flink-table-planner以及正確的依賴關系flink-table-api-?,具體取決於是使用Java還是Scala:flink-table-api-java-bridge或者flink-table-api-scala-bridge。
- 更改為外部目錄表構建器(FLINK-11522)
ExternalCatalogTable.builder()不贊成使用ExternalCatalogTableBuilder()。
- 更改為表API連接器jar的命名(FLINK-11026)
Kafka/elasticsearch6 sql-jars的命名方案已經更改。在maven術語中,它們不再具有sql-jar限定符,而artifactId現在以前綴為例,flink-sql而不是flink例如flink-sql-connector-kafka。
- 更改為指定Null的方式(FLINK-11785)
現在Table API中的Null需要定義nullof(type)而不是Null(type)。舊方法已被棄用。
3.8. 連接器變動
- 引入可直接訪問ConsumerRecord的新KafkaDeserializationSchema(FLINK-8354)
對於FlinkKafkaConsumers,我們推出了一個新的KafkaDeserializationSchema ,可以直接訪問KafkaConsumerRecord。這包含了該 KeyedSerializationSchema功能,該功能已棄用但目前仍可以使用。
- FlinkKafkaConsumer現在將根據主題規范過濾恢復的分區(FLINK-10342)
從Flink 1.8.0開始,現在FlinkKafkaConsumer總是過濾掉已恢復的分區,這些分區不再與要在還原的執行中訂閱的指定主題相關聯。此行為在以前的版本中不存在FlinkKafkaConsumer。
如果您想保留以前的行為。請使用上面的
disableFilterRestoredPartitionsWithSubscribedTopics()
配置方法FlinkKafkaConsumer。
考慮這個例子:如果你有一個正在消耗topic的Kafka Consumer A,你做了一個保存點,然后改變你的Kafka消費者而不是從topic消費B,然后從保存點重新啟動你的工作。在此更改之前,您的消費者現在將使用這兩個主題A,B因為它存儲在消費者正在使用topic消費的狀態A。通過此更改,您的使用者將僅B在還原后使用topic,因為我們使用配置的topic過濾狀態中存儲的topic。
其它接口改變:
1、從TypeSerializer接口(FLINK-9803)中刪除了canEqual()方法
這些canEqual()方法通常用於跨類型層次結構進行適當的相等性檢查。在TypeSerializer實際上並不需要這個屬性,因此該方法現已刪除。
2、刪除CompositeSerializerSnapshot實用程序類(FLINK-11073)
該CompositeSerializerSnapshot實用工具類已被刪除。
現在CompositeTypeSerializerSnapshot,你應該使用復合序列化程序的快照,該序列化程序將序列化委派給多個嵌套的序列化程序。有關使用的說明,請參閱此處CompositeTypeSerializerSnapshot。
四 .Flink 1.9 版本
2019年 8月22日,Apache Flink 1.9.0 版本正式發布,這也是阿里內部版本 Blink 合並入 Flink 后的首次版本發布。
此次版本更新帶來的重大功能包括批處理作業的批式恢復,以及 Table API 和 SQL 的基於 Blink 的新查詢引擎(預覽版)。同時,這一版本還推出了 State Processor API,這是社區最迫切需求的功能之一,該 API 使用戶能夠用 Flink DataSet 作業靈活地讀寫保存點。此外,Flink 1.9 還包括一個重新設計的 WebUI 和新的 Python Table API (預覽版)以及與 Apache Hive 生態系統的集成(預覽版)。
新功能和改進
- 細粒度批作業恢復 (FLIP-1)
- State Processor API (FLIP-43)
- Stop-with-Savepoint (FLIP-34)
- 重構 Flink WebUI
- 預覽新的 Blink SQL 查詢處理器
- Table API / SQL 的其他改進
- 預覽 Hive 集成 (FLINK-10556)
- 預覽新的 Python Table API (FLIP-38)
4.1. 細粒度批作業恢復 (FLIP-1)
批作業(DataSet、Table API 和 SQL)從 task 失敗中恢復的時間被顯著縮短了。在 Flink 1.9 之前,批處理作業中的 task 失敗是通過取消所有 task 並重新啟動整個作業來恢復的,即作業從頭開始,所有進度都會廢棄。在此版本中,Flink 將中間結果保留在網絡 shuffle 的邊緣,並使用此數據去恢復那些僅受故障影響的 task。所謂 task 的 “failover regions” (故障區)是指通過 pipelined 方式連接的數據交換方式,定義了 task 受故障影響的邊界。
要使用這個新的故障策略,需要確保 flink-conf.yaml 中有 jobmanager.execution.failover-strategy: region 的配置。
注意:1.9 發布包中默認就已經包含了該配置項,不過當從之前版本升級上來時,如果要復用之前的配置的話,需要手動加上該配置。
“Region” 的故障策略也能同時提升 “embarrassingly parallel” 類型的流作業的恢復速度,也就是沒有任何像 keyBy() 和 rebalance 的 shuffle 的作業。當這種作業在恢復時,只有受影響的故障區的 task 需要重啟。對於其他類型的流作業,故障恢復行為與之前的版本一樣。
4.2. State Processor API (FLIP-43)
直到 Flink 1.9,從外部訪問作業的狀態僅局限於:Queryable State(可查詢狀態)實驗性功能。此版本中引入了一種新的、強大的類庫,基於 DataSet 支持讀取、寫入、和修改狀態快照。在實踐上,這意味着:
- Flink 作業的狀態可以自主構建了,可以通過讀取外部系統的數據(例如外部數據庫),然后轉換成 savepoint。
- Savepoint 中的狀態可以使用任意的 Flink 批處理 API 查詢(DataSet、Table、SQL)。例如,分析相關的狀態模式或檢查狀態差異以支持應用程序審核或故障排查。
- Savepoint 中的狀態 schema 可以離線遷移了,而之前的方案只能在訪問狀態時進行,是一種在線遷移。
- Savepoint 中的無效數據可以被識別出來並糾正。
新的 State Processor API 覆蓋了所有類型的快照:savepoint,full checkpoint 和 incremental checkpoint。
4.3. Stop-with-Savepoint (FLIP-34)
“Cancel-with-savepoint” 是停止、重啟、fork、或升級 Flink 作業的一個常用操作。然而,當前的實現並沒有保證輸出到 exactly-once sink 的外部存儲的數據持久化。為了改進停止作業時的端到端語義,Flink 1.9 引入了一種新的 SUSPEND 模式,可以帶 savepoint 停止作業,保證了輸出數據的一致性。你可以使用 Flink CLI 來 suspend 一個作業:
bin/flink stop -p [:targetSavepointDirectory] :jobId
4.4. 重構 Flink WebUI
社區討論了現代化 Flink WebUI 的提案,決定采用 Angular 的最新穩定版來重構這個組件。從 Angular 1.x 躍升到了 7.x 。重新設計的 UI 是 1.9.0 的默認版本,不過有一個按鈕可以切換到舊版的 WebUI。
4.5. 新 Blink SQL 查詢處理器預覽
在 Blink 捐贈給 Apache Flink 之后,社區就致力於為 Table API 和 SQL 集成 Blink 的查詢優化器和 runtime。第一步,我們將 flink-table 單模塊重構成了多個小模塊(FLIP-32)。這對於 Java 和 Scala API 模塊、優化器、以及 runtime 模塊來說,有了一個更清晰的分層和定義明確的接口。
緊接着,我們擴展了 Blink 的 planner 以實現新的優化器接口,所以現在有兩個插件化的查詢處理器來執行 Table API 和 SQL:1.9 以前的 Flink 處理器和新的基於 Blink 的處理器。基於 Blink 的查詢處理器提供了更好地 SQL 覆蓋率(1.9 完整支持 TPC-H,TPC-DS 的支持在下一個版本的計划中)並通過更廣泛的查詢優化(基於成本的執行計划選擇和更多的優化規則)、改進的代碼生成機制、和調優過的算子實現來提升批處理查詢的性能。除此之外,基於 Blink 的查詢處理器還提供了更強大的流處理能力,包括一些社區期待已久的新功能(如維表 Join,TopN,去重)和聚合場景緩解數據傾斜的優化,以及內置更多常用的函數。
注:兩個查詢處理器之間的語義和功能大部分是一致的,但並未完全對齊。具體請查看發布日志。
不過, Blink 的查詢處理器的集成還沒有完全完成。因此,1.9 之前的 Flink 處理器仍然是1.9 版本的默認處理器,建議用於生產設置。你可以在創建 TableEnvironment 時通過 EnvironmentSettings 配置啟用 Blink 處理器。被選擇的處理器必須要在正在執行的 Java 進程的類路徑中。對於集群設置,默認兩個查詢處理器都會自動地加載到類路徑中。當從 IDE 中運行一個查詢時,需要在項目中顯式地增加一個處理器的依賴。
4.6. Table API / SQL 的其他改進
除了圍繞 Blink Planner 令人興奮的進展外,社區還做了一系列的改進,包括:
- 為 Table API / SQL 的 Java 用戶去除 Scala 依賴 (FLIP-32) 作為重構和拆分 flink-table 模塊工作的一部分,我們為 Java 和 Scala 創建了兩個單獨的 API 模塊。對於 Scala 用戶來說,沒有什么改變。不過現在 Java 用戶在使用 Table API 和 SQL 時,可以不用引入一堆 Scala 依賴了。
- 重構 Table API / SQL 的類型系統(FLIP-37) 我們實現了一個新的數據類型系統,以便從 Table API 中移除對 Flink TypeInformation 的依賴,並提高其對 SQL 標准的遵從性。不過還在進行中,預計將在下一版本完工,在 Flink 1.9 中,UDF 尚未移植到新的類型系統上。
- Table API 的多行多列轉換(FLIP-29) Table API 擴展了一組支持多行和多列、輸入和輸出的轉換的功能。這些轉換顯著簡化了處理邏輯的實現,同樣的邏輯使用關系運算符來實現是比較麻煩的。
- 嶄新的統一的 Catalog API Catalog 已有的一些接口被重構和(某些)被替換了,從而統一了內部和外部 catalog 的處理。這項工作主要是為了 Hive 集成(見下文)而啟動的,不過也改進了 Flink 在管理 catalog 元數據的整體便利性。
- SQL API 中的 DDL 支持 (FLINK-10232) 到目前為止,Flink SQL 已經支持 DML 語句(如 SELECT,INSERT)。但是外部表(table source 和 table sink)必須通過 Java/Scala 代碼的方式或配置文件的方式注冊。1.9 版本中,我們支持 SQL DDL 語句的方式注冊和刪除表(CREATE TABLE,DROP TABLE)。然而,我們還沒有增加流特定的語法擴展來定義時間戳抽取和 watermark 生成策略等。流式的需求將會在下一版本完整支持。
五 .Flink 1.10 版本 [重要版本 : Blink 整合完成]
作為 Flink 社區迄今為止規模最大的一次版本升級,Flink 1.10 容納了超過 200 位貢獻者對超過 1200 個 issue 的開發實現,包含對 Flink 作業的整體性能及穩定性的顯著優化、對原生 Kubernetes 的初步集成(beta 版本)以及對 Python 支持(PyFlink)的重大優化。
Flink 1.10 同時還標志着對 Blink[1] 的整合宣告完成,隨着對 Hive 的生產級別集成及對 TPC-DS 的全面覆蓋,Flink 在增強流式 SQL 處理能力的同時也具備了成熟的批處理能力。
5.1. 內存管理及配置優化
Flink 目前的 TaskExecutor 內存模型存在着一些缺陷,導致優化資源利用率比較困難,例如:
流和批處理內存占用的配置模型不同;流處理中的 RocksDB state backend 需要依賴用戶進行復雜的配置。為了讓內存配置變的對於用戶更加清晰、直觀,Flink 1.10 對 TaskExecutor 的內存模型和配置邏輯進行了較大的改動 (FLIP-49 [7])。這些改動使得 Flink 能夠更好地適配所有部署環境(例如 Kubernetes, Yarn, Mesos),讓用戶能夠更加嚴格的控制其內存開銷。
- Managed 內存擴展
Managed 內存的范圍有所擴展,還涵蓋了 RocksDB state backend 使用的內存。盡管批處理作業既可以使用堆內內存也可以使用堆外內存,使用 RocksDB state backend 的流處理作業卻只能利用堆外內存。因此為了讓用戶執行流和批處理作業時無需更改集群的配置,我們規定從現在起 managed 內存只能在堆外。
- 簡化 RocksDB 配置
此前,配置像 RocksDB 這樣的堆外 state backend 需要進行大量的手動調試,例如減小 JVM 堆空間、設置 Flink 使用堆外內存等。現在,Flink 的開箱配置即可支持這一切,且只需要簡單地改變 managed 內存的大小即可調整 RocksDB state backend 的內存預算。
另一個重要的優化是,Flink 現在可以限制 RocksDB 的 native 內存占用(FLINK-7289 [8]),以避免超過總的內存預算——這對於 Kubernetes 等容器化部署環境尤為重要。關於如何開啟、調試該特性,請參考 RocksDB 調試[9]。
注:FLIP-49 改變了集群的資源配置過程,因此從以前的 Flink 版本升級時可能需要對集群配置進行調整。詳細的變更日志及調試指南請參考文檔[10]。
5.2. 統一的作業提交邏輯
在此之前,提交作業是由執行環境負責的,且與不同的部署目標(例如 Yarn, Kubernetes, Mesos)緊密相關。這導致用戶需要針對不同環境保留多套配置,增加了管理的成本。
在 Flink 1.10 中,作業提交邏輯被抽象到了通用的 Executor 接口(FLIP-73 [11])。新增加的 ExecutorCLI (FLIP-81 [12])引入了為任意執行目標[13]指定配置參數的統一方法。此外,隨着引入 JobClient(FLINK-74 [14])負責獲取 JobExecutionResult,獲取作業執行結果的邏輯也得以與作業提交解耦。
5.3. 原生 Kubernetes 集成(Beta)
對於想要在容器化環境中嘗試 Flink 的用戶來說,想要在 Kubernetes 上部署和管理一個 Flink standalone 集群,首先需要對容器、算子及像 kubectl 這樣的環境工具有所了解。
在 Flink 1.10 中,我們推出了初步的支持 session 模式的主動 Kubernetes 集成(FLINK-9953 [15])。其中,“主動”指 Flink ResourceManager (K8sResMngr) 原生地與 Kubernetes 通信,像 Flink 在 Yarn 和 Mesos 上一樣按需申請 pod。用戶可以利用 namespace,在多租戶環境中以較少的資源開銷啟動 Flink。這需要用戶提前配置好 RBAC 角色和有足夠權限的服務賬號。
正如在統一的作業提交邏輯一節中提到的,Flink 1.10 將命令行參數映射到了統一的配置。因此,用戶可以參閱 Kubernetes 配置選項,在命令行中使用以下命令向 Kubernetes 提交 Flink 作業。
./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id= examples/streaming/WindowJoin.jar
5.4. Table API/SQL: 生產可用的 Hive 集成
Flink 1.9 推出了預覽版的 Hive 集成。該版本允許用戶使用 SQL DDL 將 Flink 特有的元數據持久化到 Hive Metastore、調用 Hive 中定義的 UDF 以及讀、寫 Hive 中的表。Flink 1.10 進一步開發和完善了這一特性,帶來了全面兼容 Hive 主要版本[17]的生產可用的 Hive 集成。
- Batch SQL 原生分區支持
此前,Flink 只支持寫入未分區的 Hive 表。在 Flink 1.10 中,Flink SQL 擴展支持了 INSERT OVERWRITE 和 PARTITION 的語法(FLIP-63 [18]),允許用戶寫入 Hive 中的靜態和動態分區。
寫入靜態分區
INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 …)] select_statement1 FROM from_statement;
寫入動態分區
INSERT { INTO | OVERWRITE } TABLE tablename1 select_statement1 FROM from_statement;
對分區表的全面支持,使得用戶在讀取數據時能夠受益於分區剪枝,減少了需要掃描的數據量,從而大幅提升了這些操作的性能。
- 其他優化
除了分區剪枝,Flink 1.10 的 Hive 集成還引入了許多數據讀取[19]方面的優化,例如:
投影下推:Flink 采用了投影下推技術,通過在掃描表時忽略不必要的域,最小化 Flink 和 Hive 表之間的數據傳輸量。這一優化在表的列數較多時尤為有效。
LIMIT 下推:對於包含 LIMIT 語句的查詢,Flink 在所有可能的地方限制返回的數據條數,以降低通過網絡傳輸的數據量。讀取數據時的 ORC 向量化:為了提高讀取 ORC 文件的性能,對於 Hive 2.0.0 及以上版本以及非復合數據類型的列,Flink 現在默認使用原生的 ORC 向量化讀取器。
- 將可插拔模塊作為 Flink 內置對象(Beta)
Flink 1.10 在 Flink table 核心引入了通用的可插拔模塊機制,目前主要應用於系統內置函數(FLIP-68 [20])。通過模塊,用戶可以擴展 Flink 的系統對象,例如像使用 Flink 系統函數一樣使用 Hive 內置函數。新版本中包含一個預先實現好的 HiveModule,能夠支持多個 Hive 版本,當然用戶也可以選擇編寫自己的可插拔模塊。
5.5. 其他 Table API/SQL 優化
- SQL DDL 中的 watermark 和計算列
Flink 1.10 在 SQL DDL 中增加了針對流處理定義時間屬性及產生 watermark 的語法擴展(FLIP-66 [22])。這使得用戶可以在用 DDL 語句創建的表上進行基於時間的操作(例如窗口)以及定義 watermark 策略。
CREATE TABLE table_name (
WATERMARK FOR columnName AS <watermark_strategy_expression>
) WITH (
...
)
- 其他 SQL DDL 擴展
Flink 現在嚴格區分臨時/持久、系統/目錄函數(FLIP-57 [24])。這不僅消除了函數引用中的歧義,還帶來了確定的函數解析順序(例如,當存在命名沖突時,比起目錄函數、持久函數 Flink 會優先使用系統函數、臨時函數)。
在 FLIP-57 的基礎上,我們擴展了 SQL DDL 的語法,支持創建目錄函數、臨時函數以及臨時系統函數(FLIP-79):
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
[IF NOT EXISTS] [catalog_name.][db_name.]function_name
AS identifier [LANGUAGE JAVA|SCALA]
關於目前完整的 Flink SQL DDL 支持,請參考最新的文檔[26]。
注:為了今后正確地處理和保證元對象(表、視圖、函數)上的行為一致性,Flink 廢棄了 Table API 中的部分對象申明方法,以使留下的方法更加接近標准的 SQL DDL(FLIP-64)。
- 批處理完整的 TPC-DS 覆蓋
TPC-DS 是廣泛使用的業界標准決策支持 benchmark,用於衡量基於 SQL 的數據處理引擎性能。Flink 1.10 端到端地支持所有 TPC-DS 查詢(FLINK-11491 [28]),標志着 Flink SQL 引擎已經具備滿足現代數據倉庫及其他類似的處理需求的能力。
5.6. PyFlink: 支持原生用戶自定義函數(UDF)
作為 Flink 全面支持 Python 的第一步,在之前版本中我們發布了預覽版的 PyFlink。在新版本中,我們專注於讓用戶在 Table API/SQL 中注冊並使用自定義函數(UDF,另 UDTF / UDAF 規划中)(FLIP-58)。
如果你對這一特性的底層實現(基於 Apache Beam 的可移植框架)感興趣,請參考 FLIP-58 的 Architecture 章節以及 FLIP-78。這些數據結構為支持 Pandas 以及今后將 PyFlink 引入到 DataStream API 奠定了基礎。
從 Flink 1.10 開始,用戶只要執行以下命令就可以輕松地通過 pip 安裝 PyFlink:
pip install apache-flink
5.7. 重要變更
- FLINK-10725[34]:Flink 現在可以使用 Java 11 編譯和運行。
- FLINK-15495[35]:SQL 客戶端現在默認使用 Blink planner,向用戶提供最新的特性及優化。Table API 同樣計划在下個版本中從舊的 planner 切換到 Blink planner,我們建議用戶現在就開始嘗試和熟悉 Blink planner。
- FLINK-13025[36]:新的 Elasticsearch sink connector[37] 全面支持 Elasticsearch 7.x 版本。
- FLINK-15115[38]:Kafka 0.8 和 0.9 的 connector 已被標記為廢棄並不再主動支持。如果你還在使用這些版本或有其他相關問題,請通過 @dev 郵件列表聯系我們。
- FLINK-14516[39]:非基於信用的網絡流控制已被移除,同時移除的還有配置項“taskmanager.network.credit.model”。今后,Flink 將總是使用基於信用的網絡流控制。
- FLINK-12122[40]:在 Flink 1.5.0 中,FLIP-6[41] 改變了 slot 在 TaskManager 之間的分布方式。要想使用此前的調度策略,既盡可能將負載分散到所有當前可用的 TaskManager,用戶可以在 flink-conf.yaml 中設置 “cluster.evenly-spread-out-slots: true”。
- FLINK-11956[42]:s3-hadoop 和 s3-presto 文件系統不再使用類重定位加載方式,而是使用插件方式加載,同時無縫集成所有認證提供者。我們強烈建議其他文件系統也只使用插件加載方式,並將陸續移除重定位加載方式。
Flink 1.9 推出了新的 Web UI,同時保留了原來的 Web UI 以備不時之需。截至目前,我們沒有收到關於新的 UI 存在問題的反饋,因此社區投票決定在 Flink 1.10 中移除舊的 Web UI。
原文: https://developer.aliyun.com/article/744734
官方地址: https://flink.apache.org/news/2020/02/11/release-1.10.0.html
5.7. 重要變更 FLINK-10725[34]:Flink 現在可以使用 Java 11 編譯和運行。FLINK-15495[35]:SQL 客戶端現在默認使用 Blink planner,向用戶提供最新的特性及優化。Table API 同樣計划在下個版本中從舊的 planner 切換到 Blink planner,我們建議用戶現在就開始嘗試和熟悉 Blink planner。FLINK-13025[36]:新的 Elasticsearch sink connector[37] 全面支持 Elasticsearch 7.x 版本。FLINK-15115[38]:Kafka 0.8 和 0.9 的 connector 已被標記為廢棄並不再主動支持。如果你還在使用這些版本或有其他相關問題,請通過 @dev 郵件列表聯系我們。FLINK-14516[39]:非基於信用的網絡流控制已被移除,同時移除的還有配置項“taskmanager.network.credit.model”。今后,Flink 將總是使用基於信用的網絡流控制。FLINK-12122[40]:在 Flink 1.5.0 中,FLIP-6[41] 改變了 slot 在 TaskManager 之間的分布方式。要想使用此前的調度策略,既盡可能將負載分散到所有當前可用的 TaskManager,用戶可以在 flink-conf.yaml 中設置 “cluster.evenly-spread-out-slots: true”。FLINK-11956[42]:s3-hadoop 和 s3-presto 文件系統不再使用類重定位加載方式,而是使用插件方式加載,同時無縫集成所有認證提供者。我們強烈建議其他文件系統也只使用插件加載方式,並將陸續移除重定位加載方式。Flink 1.9 推出了新的 Web UI,同時保留了原來的 Web UI 以備不時之需。截至目前,我們沒有收到關於新的 UI 存在問題的反饋,因此社區投票決定[43]在 Flink 1.10 中移除舊的 Web UI。
原文: https://developer.aliyun.com/article/744734 官方地址: https://flink.apache.org/news/2020/02/11/release-1.10.0.html?spm=a2c6h.12873639.0.0.749e4fc0c1D14m
六 .Flink 1.11 版本 [重要版本]
Flink 1.11.0 正式發布。歷時近 4 個月,Flink 在生態、易用性、生產可用性、穩定性等方面都進行了增強和改善。
core engine 引入了 unaligned checkpoints,這是對 Flink 的容錯機制的重大更改,該機制可改善在高背壓下的檢查點性能。
一個新的 Source API 通過統一批處理和 streaming 執行以及將內部組件(例如事件時間處理、水印生成或空閑檢測)卸載到 Flink 來簡化(自定義)sources 的實現。
Flink SQL 引入了對變更數據捕獲(CDC)的支持,以輕松使用和解釋來自 Debezium 之類的工具的數據庫變更日志。更新的 FileSystem 連接器還擴展了 Table API/SQL 支持的用例和格式集,從而實現了直接啟用從 Kafka 到 Hive 的 streaming 數據傳輸等方案。
PyFlink 的多項性能優化,包括對矢量化用戶定義函數(Pandas UDF)的支持。這改善了與 Pandas 和 NumPy 之類庫的互操作性,使 Flink 在數據科學和 ML 工作負載方面更強大。
重要變化
- [FLINK-17339] 從 Flink 1.11 開始,Blink planner 是 Table API/SQL中的默認設置。自 Flink 1.10 起,SQL 客戶端已經存在這種情況。仍支持舊的 Flink 規划器,但未積極開發。
- [FLINK-5763] Savepoints 現在將其所有狀態包含在一個目錄中(元數據和程序狀態)。這樣可以很容易地找出組成 savepoint 狀態的文件,並允許用戶通過簡單地移動目錄來重新定位 savepoint。
- [FLINK-16408] 為了減輕對 JVM metaspace 的壓力,只要任務分配了至少一個插槽,TaskExecutor就會重用用戶代碼類加載器。這會稍微改變 Flink 的恢復行為,從而不會重新加載靜態字段。
- [FLINK-11086] Flink 現在支持 Hadoop 3.0.0 以上的 Hadoop 版本。請注意,Flink 項目不提供任何更新的flink-shaded-hadoop-x jars。用戶需要通過HADOOP_CLASSPATH環境變量(推薦)或 lib/ folder 提供 Hadoop 依賴項。
- [FLINK-16963] Flink 隨附的所有MetricReporters均已轉換為插件。這些不再應該放在/lib中(可能導致依賴沖突),而應該放在/plugins/< some_directory>中。
- [FLINK-12639] Flink 文檔正在做一些返工,因此從 Flink 1.11 開始,內容的導航和組織會有所變化。
官方原文: https://flink.apache.org/news/2020/07/06/release-1.11.0.html
6.1. Table & SQL 支持 Change Data Capture(CDC)
CDC 被廣泛使用在復制數據、更新緩存、微服務間同步數據、審計日志等場景,很多公司都在使用開源的 CDC 工具,如 MySQL CDC。通過 Flink 支持在 Table & SQL 中接入和解析 CDC 是一個強需求,在過往的很多討論中都被提及過,可以幫助用戶以實時的方式處理 changelog 流,進一步擴展 Flink 的應用場景,例如把 MySQL 中的數據同步到 PG 或 ElasticSearch 中,低延時的 temporal join 一個 changelog 等。
除了考慮到上面的真實需求,Flink 中定義的“Dynamic Table”概念在流上有兩種模型:append 模式和 update 模式。通過 append 模式把流轉化為“Dynamic Table”在之前的版本中已經支持,因此在 1.11.0 中進一步支持 update 模式也從概念層面完整的實現了“Dynamic Table”。
為了支持解析和輸出 changelog,如何在外部系統和 Flink 系統之間編解碼這些更新操作是首要解決的問題。考慮到 source 和 sink 是銜接外部系統的一個橋梁,因此 FLIP-95 在定義全新的 Table source 和 Table sink 接口時解決了這個問題。
在公開的 CDC 調研報告中,Debezium 和 Canal 是用戶中最流行使用的 CDC 工具,這兩種工具用來同步 changelog 到其它的系統中,如消息隊列。據此,FLIP-105 首先支持了 Debezium 和 Canal 這兩種格式,而且 Kafka source 也已經可以支持解析上述格式並輸出更新事件,在后續的版本中會進一步支持 Avro(Debezium) 和 Protobuf(Canal)。
CREATE TABLE my_table (
...) WITH (
'connector'='...', -- e.g. 'kafka'
'format'='debezium-json',
'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)
'debezium-json.ignore-parse-errors'='true' -- default: false
);
6.2. Table & SQL 支持 JDBC Catalog
1.11.0 之前,用戶如果依賴 Flink 的 source/sink 讀寫關系型數據庫或讀取 changelog 時,必須要手動創建對應的 schema。而且當數據庫中的 schema 發生變化時,也需要手動更新對應的 Flink 作業以保持一致和類型匹配,任何不匹配都會造成運行時報錯使作業失敗。用戶經常抱怨這個看似冗余且繁瑣的流程,體驗極差。
實際上對於任何和 Flink 連接的外部系統都可能有類似的上述問題,在 1.11.0 中重點解決了和關系型數據庫對接的這個問題。FLIP-93 提供了 JDBC catalog 的基礎接口以及 Postgres catalog 的實現,這樣方便后續實現與其它類型的關系型數據庫的對接。
1.11.0 版本后,用戶使用 Flink SQL 時可以自動獲取表的 schema 而不再需要輸入 DDL。除此之外,任何 schema 不匹配的錯誤都會在編譯階段提前進行檢查報錯,避免了之前運行時報錯造成的作業失敗。這是提升易用性和用戶體驗的一個典型例子。
6.3. Hive 實時數倉
從 1.9.0 版本開始 Flink 從生態角度致力於集成 Hive,目標打造批流一體的 Hive 數倉。經過前兩個版本的迭代,已經達到了 batch 兼容且生產可用,在 TPC-DS 10T benchmark 下性能達到 Hive 3.0 的 7 倍以上。
1.11.0 在 Hive 生態中重點實現了實時數倉方案,改善了端到端流式 ETL 的用戶體驗,達到了批流一體 Hive 數倉的目標。同時在兼容性、性能、易用性方面也進一步進行了加強。
在實時數倉的解決方案中,憑借 Flink 的流式處理優勢做到實時讀寫 Hive:
- Hive 寫入:FLIP-115 完善擴展了 FileSystem connector 的基礎能力和實現,Table/SQL 層的 sink 可以支持各種格式(CSV、Json、Avro、Parquet、ORC),而且支持 Hive table 的所有格式。
- Partition 支持:數據導入 Hive 引入 partition 提交機制來控制可見性,通過sink.partition-commit.trigger 控制 partition 提交的時機,通過 sink.partition-commit.policy.kind 選擇提交策略,支持 SUCCESS 文件和 metastore 提交。
- Hive 讀取:實時化的流式讀取 Hive,通過監控 partition 生成增量讀取新 partition,或者監控文件夾內新文件生成來增量讀取新文件。在 Hive 可用性方面的提升:
- FLIP-123 通過 Hive Dialect 為用戶提供語法兼容,這樣用戶無需在 Flink 和 Hive 的 CLI 之間切換,可以直接遷移 Hive 腳本到 Flink 中執行。
- 提供 Hive 相關依賴的內置支持,避免用戶自己下載所需的相關依賴。現在只需要單獨下載一個包,配置 HADOOP_CLASSPATH 就可以運行。
- 在 Hive 性能方面,1.10.0 中已經支持了 ORC(Hive 2+)的向量化讀取,1.11.0 中我們補全了所有版本的 Parquet 和 ORC 向量化支持來提升性能。
6.4. 全新 Source API
前面也提到過,source 和 sink 是 Flink 對接外部系統的一個橋梁,對於完善生態、可用性及端到端的用戶體驗是很重要的環節。社區早在一年前就已經規划了 source 端的徹底重構,從 FLIP-27 的 ID 就可以看出是很早的一個 feature。但是由於涉及到很多復雜的內部機制和考慮到各種 source connector 的實現,設計上需要考慮的很全面。從 1.10.0 就開始做 POC 的實現,最終趕上了 1.11.0 版本的發布。
先簡要回顧下 source 之前的主要問題:
對用戶而言,在 Flink 中改造已有的 source 或者重新實現一個生產級的 source connector 不是一件容易的事情,具體體現在沒有公共的代碼可以復用,而且需要理解很多 Flink 內部細節以及實現具體的 event time 分配、watermark 產出、idleness 監測、線程模型等。
批和流的場景需要實現不同的 source。
partitions/splits/shards 概念在接口中沒有顯式表達,比如 split 的發現邏輯和數據消費都耦合在 source function 的實現中,這樣在實現 Kafka 或 Kinesis 類型的 source 時增加了復雜性。
在 runtime 執行層,checkpoint 鎖被 source function 搶占會帶來一系列問題,框架很難進行優化。
FLIP-27 在設計時充分考慮了上述的痛點:
- 首先在 Job Manager 和 Task Manager 中分別引入兩種不同的組件 Split Enumerator 和 Source reader,解耦 split 發現和對應的消費處理,同時方便隨意組合不同的策略。比如現有的 Kafka connector 中有多種不同的 partition 發現策略和實現耦合在一起,在新的架構下,我們只需要實現一種 source reader,就可以適配多種 split enumerator 的實現來對應不同的 partition 發現策略。
- 在新架構下實現的 source connector 可以做到批流統一,唯一的小區別是對批場景的有限輸入,split enumerator 會產出固定數量的 split 集合並且每個 split 都是有限數據集;對於流場景的無限輸入,split enumerator 要么產出無限多的 split 或者 split 自身是無限數據集。
- 復雜的 timestamp assigner 以及 watermark generator 透明的內置在 source reader 模塊內運行,對用戶來說是無感知的。這樣用戶如果想實現新的 source connector,一般不再需要重復實現這部分功能。
目前 Flink 已有的 source connector 會在后續的版本中基於新架構來重新實現,legacy source 也會繼續維護幾個版本保持兼容性,用戶也可以按照 release 文檔中的說明來嘗試體驗新 source 的開發。
6.5. PyFlink 生態
眾所周知,Python 語言在機器學習和數據分析領域有着廣泛的使用。Flink 從 1.9.0 版本開始發力兼容 Python 生態,Python 和 Flink 合力為 PyFlink,把 Flink 的實時分布式處理能力輸出給 Python 用戶。前兩個版本 PyFlink 已經支持了 Python Table API 和 UDF,在 1.11.0 中擴大對 Python 生態庫 Pandas 的支持以及和 SQL DDL/Client 的集成,同時 Python UDF 性能有了極大的提升。
具體來說,之前普通的 Python UDF 每次調用只能處理一條數據,而且在 Java 端和 Python 端都需要序列化/反序列化,開銷很大。1.11.0 中 Flink 支持在 Table & SQL 作業中自定義和使用向量化 Python UDF,用戶只需要在 UDF 修飾中額外增加一個參數 udf_type=“pandas” 即可。這樣帶來的好處是:
- 每次調用可以處理 N 條數據。
- 數據格式基於 Apache Arrow,大大降低了 Java、Python 進程之間的序列化/反序列化開銷。
- 方便 Python 用戶基於 Numpy 和 Pandas 等數據分析領域常用的 Python 庫,開發高性能的 Python UDF。
除此之外,1.11.0 中 PyFlink 還支持:
- PyFlink table 和 Pandas DataFrame 之間無縫切換(FLIP-120),增強 Pandas 生態的易用性和兼容性。
- Table & SQL 中可以定義和使用 Python UDTF(FLINK-14500),不再必需 Java/Scala UDTF。
- Cython 優化 Python UDF 的性能(FLIP-121),對比 1.10.0 可以提升 30 倍。
- Python UDF 中用戶自定義 metric(FLIP-112),方便監控和調試 UDF 的執行。
上述解讀的都是側重 API 層面,用戶開發作業可以直接感知到的易用性的提升。下面我們看看執行引擎層在 1.11.0 中都有哪些值得關注的變化。
6.6. 生產可用性和穩定性提升
6.6.1 支持 application 模式和 Kubernetes 增強
1.11.0 版本前,Flink 主要支持如下兩種模式運行:
Session 模式:提前啟動一個集群,所有作業都共享這個集群的資源運行。優勢是避免每個作業單獨啟動集群帶來的額外開銷,缺點是隔離性稍差。如果一個作業把某個 Task Manager(TM)容器搞掛,會導致這個容器內的所有作業都跟着重啟。雖然每個作業有自己獨立的 Job Manager(JM)來管理,但是這些 JM 都運行在一個進程中,容易帶來負載上的瓶頸。
Per-job 模式:為了解決 session 模式隔離性差的問題,每個作業根據資源需求啟動獨立的集群,每個作業的 JM 也是運行在獨立的進程中,負載相對小很多。
以上兩種模式的共同問題是需要在客戶端執行用戶代碼,編譯生成對應的 Job Graph 提交到集群運行。在這個過程需要下載相關 jar 包並上傳到集群,客戶端和網絡負載壓力容易成為瓶頸,尤其當一個客戶端被多個用戶共享使用。
1.11.0 中引入了 application 模式(FLIP-85)來解決上述問題,按照 application 粒度來啟動一個集群,屬於這個 application 的所有 job 在這個集群中運行。核心是 Job Graph 的生成以及作業的提交不在客戶端執行,而是轉移到 JM 端執行,這樣網絡下載上傳的負載也會分散到集群中,不再有上述 client 單點上的瓶頸。
用戶可以通過 bin/flink run-application 來使用 application 模式,目前 Yarn 和 Kubernetes(K8s)都已經支持這種模式。Yarn application 會在客戶端將運行作業需要的依賴都通過 Yarn Local Resource 傳遞到 JM。K8s application 允許用戶構建包含用戶 jar 與依賴的鏡像,同時會根據作業自動創建 TM,並在結束后銷毀整個集群,相比 session 模式具有更好的隔離性。K8s 不再有嚴格意義上的 per-job 模式,application 模式相當於 per-job 在集群進行提交作業的實現。
除了支持 application 模式,Flink 原生 K8s 在 1.11.0 中還完善了很多基礎的功能特性(FLINK-14460),以達到生產可用性的標准。例如 Node Selector、Label、Annotation、Toleration 等。為了更方便的與 Hadoop 集成,也支持根據環境變量自動掛載 Hadoop 配置的功能。
6.6.2 Checkpoint & Savepoint 優化
checkpoint 和 savepoint 機制一直是 Flink 保持先進性的核心競爭力之一,社區在這個領域的改動很謹慎,最近的幾個大版本中幾乎沒有大的功能和架構上的調整。在用戶郵件列表中,我們經常能看到用戶反饋和抱怨的相關問題:比如 checkpoint 長時間做不出來失敗,savepoint 在作業重啟后不可用等等。1.11.0 有選擇的解決了一些這方面的常見問題,提高生產可用性和穩定性。
1.11.0 之前, savepoint 中 meta 數據和 state 數據分別保存在兩個不同的目錄中,這樣如果想遷移 state 目錄很難識別這種映射關系,也可能導致目錄被誤刪除,對於目錄清理也同樣有麻煩。1.11.0 把兩部分數據整合到一個目錄下,這樣方便整體轉移和復用。另外,之前 meta 引用 state 采用的是絕對路徑,這樣 state 目錄遷移后路徑發生變化也不可用,1.11.0 把 state 引用改成了相對路徑解決了這個問題(FLINK-5763),這樣 savepoint 的管理維護、復用更加靈活方便。
實際生產環境中,用戶經常遭遇 checkpoint 超時失敗、長時間不能完成帶來的困擾。一旦作業 failover 會造成回放大量的歷史數據,作業長時間沒有進度,端到端的延遲增加。1.11.0 從不同維度對 checkpoint 的優化和提速做了改進,目標實現分鍾甚至秒級的輕量型 checkpoint。
首先,增加了 Checkpoint Coordinator 通知 task 取消 checkpoint 的機制(FLINK-8871),這樣避免 task 端還在執行已經取消的 checkpoint 而對系統帶來不必要的壓力。同時 task 端放棄已經取消的 checkpoint,可以更快的參與執行 coordinator 新觸發的 checkpoint,某種程度上也可以避免新 checkpoint 再次執行超時而失敗。這個優化也對后面默認開啟 local recovery 提供了便利,task 端可以及時清理失效 checkpoint 的資源。
- 在反壓場景下,整個數據鏈路堆積了大量 buffer,導致 checkpoint barrier 排在數據 buffer 后面,不能被 task 及時處理對齊,也就導致了 checkpoint 長時間不能執行。1.11.0 中從兩個維度對這個問題進行解決:
1)嘗試減少數據鏈路中的 buffer 總量(FLINK-16428),這樣 checkpoint barrier 可以盡快被處理對齊。
上游輸出端控制單個 sub partition 堆積 buffer 的最大閾值(backlog),避免負載不均場景下單個鏈路上堆積大量 buffer。在不影響網絡吞吐性能的情況下合理修改上下游默認的 buffer 配置。上下游數據傳輸的基礎協議進行了調整,允許單個數據鏈路可以配置 0 個獨占 buffer 而不死鎖,這樣總的 buffer 數量和作業並發規模解耦。根據實際需求在吞吐性能和 checkpoint 速度兩者之間權衡,自定義 buffer 配比。這個優化有一部分工作已經在 1.11.0 中完成,剩余部分會在下個版本繼續推進完成。
2)實現了全新的 unaligned checkpoint 機制(FLIP-76)從根本上解決了反壓場景下 checkpoint barrier 對齊的問題。
實際上這個想法早在 1.10.0 版本之前就開始醞釀設計,由於涉及到很多模塊的大改動,實現機制和線程模型也很復雜。我們實現了兩種不同方案的原型 POC 進行了測試、性能對比,確定了最終的方案,因此直到 1.11.0 才完成了 MVP 版本,這也是 1.11.0 中執行引擎層唯一的一個重量級 feature。其基本思想可以概括為:
Checkpoint barrier 跨數據 buffer 傳輸,不在輸入輸出隊列排隊等待處理,這樣就和算子的計算能力解耦,barrier 在節點之間的傳輸只有網絡延時,可以忽略不計。每個算子多個輸入鏈路之間不需要等待 barrier 對齊來執行 checkpoint,第一個到的 barrier 就可以提前觸發 checkpoint,這樣可以進一步提速 checkpoint,不會因為個別鏈路的延遲而影響整體。
為了和之前 aligned checkpoint 的語義保持一致,所有未被處理的輸入輸出數據 buffer 都將作為 channel state 在 checkpoint 執行時進行快照持久化,在 failover 時連同 operator state 一同進行恢復。
換句話說,aligned 機制保證的是 barrier 前面所有數據必須被處理完,狀態實時體現到 operator state 中;而 unaligned 機制把 barrier 前面的未處理數據所反映的 operator state 延后到 failover restart 時通過 channel state 回放進行體現,從狀態恢復的角度來說最終都是一致的。 注意這里雖然引入了額外的 in-flight buffer 的持久化,但是這個過程實際是在 checkpoint 的異步階段完成的,同步階段只是進行了輕量級的 buffer 引用,所以不會過多占用算子的計算時間而影響吞吐性能。
Unaligned checkpoint 在反壓嚴重的場景下可以明顯加速 checkpoint 的完成時間,因為它不再依賴於整體的計算吞吐能力,而和系統的存儲性能更加相關,相當於計算和存儲的解耦。但是它的使用也有一定的局限性,它會增加整體 state 的大小,對存儲 IO 帶來額外的開銷,因此在 IO 已經是瓶頸的場景下就不太適合使用 unaligned checkpoint 機制。
1.11.0 中 unaligned checkpoint 還沒有作為默認模式,需要用戶手動配置來開啟,並且只在 exactly-once 模式下生效。但目前還不支持 savepoint 模式,因為 savepoint 涉及到作業的 rescale 場景,channel state 目前還不支持 state 拆分,在后面的版本會進一步支持,所以 savepoint 目前還是會使用之前的 aligned 模式,在反壓場景下有可能需要很長時間才能完成。
引用文章: https://developer.aliyun.com/article/767711
七 .Flink 1.12 版本 [重要版本]
- 在 DataStream API 上添加了高效的批執行模式的支持。這是批處理和流處理實現真正統一的運行時的一個重要里程碑。
- 實現了基於Kubernetes的高可用性(HA)方案,作為生產環境中,ZooKeeper方案之外的另外一種選擇。
- 擴展了 Kafka SQL connector,使其可以在 upsert 模式下工作,並且支持在 SQL DDL 中處理 connector 的 metadata。現在,時態表 Join 可以完全用 SQL 來表示,不再依賴於 Table API 了。
- PyFlink 中添加了對於 DataStream API 的支持,將 PyFlink 擴展到了更復雜的場景,比如需要對狀態或者定時器 timer 進行細粒度控制的場景。除此之外,現在原生支持將 PyFlink 作業部署到 Kubernetes上。
7.1. DataStream API 支持批執行模式
Flink 的核心 API 最初是針對特定的場景設計的,盡管 Table API / SQL 針對流處理和批處理已經實現了統一的 API,但當用戶使用較底層的 API 時,仍然需要在批處理(DataSet API)和流處理(DataStream API)這兩種不同的 API 之間進行選擇。鑒於批處理是流處理的一種特例,將這兩種 API 合並成統一的 API,有一些非常明顯的好處,比如:
- 可復用性:作業可以在流和批這兩種執行模式之間自由地切換,而無需重寫任何代碼。因此,用戶可以復用同一個作業,來處理實時數據和歷史數據。
- 維護簡單:統一的 API 意味着流和批可以共用同一組 connector,維護同一套代碼,並能夠輕松地實現流批混合執行,例如 backfilling 之類的場景。
考慮到這些優點,社區已朝着流批統一的 DataStream API 邁出了第一步:支持高效的批處理(FLIP-134)。從長遠來看,這意味着 DataSet API 將被棄用(FLIP-131),其功能將被包含在 DataStream API 和 Table API / SQL 中。
- 有限流上的批處理
您已經可以使用 DataStream API 來處理有限流(例如文件)了,但需要注意的是,運行時並不“知道”作業的輸入是有限的。為了優化在有限流情況下運行時的執行性能,新的 BATCH 執行模式,對於聚合操作,全部在內存中進行,且使用 sort-based shuffle(FLIP-140)和優化過的調度策略(請參見 Pipelined Region Scheduling 了解更多詳細信息)。因此,DataStream API 中的 BATCH 執行模式已經非常接近 Flink 1.12 中 DataSet API 的性能。有關性能的更多詳細信息,請查看 FLIP-140。
在 Flink 1.12 中,默認執行模式為 STREAMING,要將作業配置為以 BATCH 模式運行,可以在提交作業的時候,設置參數 execution.runtime-mode:
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
或者通過編程的方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeMode.BATCH);
注意:盡管 DataSet API 尚未被棄用,但我們建議用戶優先使用具有 BATCH 執行模式的 DataStream API 來開發新的批作業,並考慮遷移現有的 DataSet 作業。
7.2. 新的 Data Sink API (Beta)
之前發布的 Flink 版本中[1],已經支持了 source connector 工作在流批兩種模式下,因此在 Flink 1.12 中,社區着重實現了統一的 Data Sink API(FLIP-143)。新的抽象引入了 write/commit 協議和一個更加模塊化的接口。Sink 的實現者只需要定義 what 和 how:SinkWriter,用於寫數據,並輸出需要 commit 的內容(例如,committables);Committer 和 GlobalCommitter,封裝了如何處理 committables。框架會負責 when 和 where:即在什么時間,以及在哪些機器或進程中 commit。
這種模塊化的抽象允許為 BATCH 和 STREAMING 兩種執行模式,實現不同的運行時策略,以達到僅使用一種 sink 實現,也可以使兩種模式都可以高效執行。Flink 1.12 中,提供了統一的 FileSink connector,以替換現有的 StreamingFileSink connector (FLINK-19758)。其它的 connector 也將逐步遷移到新的接口。
7.3. 基於 Kubernetes 的高可用 (HA) 方案
Flink 可以利用 Kubernetes 提供的內置功能來實現 JobManager 的 failover,而不用依賴 ZooKeeper。為了實現不依賴於 ZooKeeper 的高可用方案,社區在 Flink 1.12(FLIP-144)中實現了基於 Kubernetes 的高可用方案。該方案與 ZooKeeper 方案基於相同的接口[3],並使用 Kubernetes 的 ConfigMap[4] 對象來處理從 JobManager 的故障中恢復所需的所有元數據。關於如何配置高可用的 standalone 或原生 Kubernetes 集群的更多詳細信息和示例,請查閱文檔[5]。
注意:需要注意的是,這並不意味着 ZooKeeper 將被刪除,這只是為 Kubernetes 上的 Flink 用戶提供了另外一種選擇。
7.4. 其它功能改進
- 將現有的 connector 遷移到新的 Data Source API
在之前的版本中,Flink 引入了新的 Data Source API(FLIP-27),以允許實現同時適用於有限數據(批)作業和無限數據(流)作業使用的 connector 。在 Flink 1.12 中,社區從 FileSystem connector(FLINK-19161)出發,開始將現有的 source connector 移植到新的接口。
注意: 新的 source 實現,是完全不同的實現,與舊版本的實現不兼容。
- Pipelined Region 調度 (FLIP-119)
在之前的版本中,Flink 對於批作業和流作業有兩套獨立的調度策略。Flink 1.12 版本中,引入了統一的調度策略, 該策略通過識別 blocking 數據傳輸邊,將 ExecutionGraph 分解為多個 pipelined region。這樣一來,對於一個 pipelined region 來說,僅當有數據時才調度它,並且僅在所有其所需的資源都被滿足時才部署它;同時也可以支持獨立地重啟失敗的 region。對於批作業來說,新策略可顯著地提高資源利用率,並消除死鎖。
- 支持 Sort-Merge Shuffle (FLIP-148)
為了提高大規模批作業的穩定性、性能和資源利用率,社區引入了 sort-merge shuffle,以替代 Flink 現有的實現。這種方案可以顯著減少 shuffle 的時間,並使用較少的文件句柄和文件寫緩存(這對於大規模批作業的執行非常重要)。在后續版本中(FLINK-19614),Flink 會進一步優化相關性能。
注意:該功能是實驗性的,在 Flink 1.12 中默認情況下不啟用。要啟用 sort-merge shuffle,需要在 TaskManager 的網絡配置[6]中設置合理的最小並行度。
- Flink WebUI 的改進 (FLIP-75)
作為對上一個版本中,Flink WebUI 一系列改進的延續,Flink 1.12 在 WebUI 上暴露了 JobManager 內存相關的指標和配置參數(FLIP-104)。對於 TaskManager 的指標頁面也進行了更新,為 Managed Memory、Network Memory 和 Metaspace 添加了新的指標,以反映自 Flink 1.10(FLIP-102)開始引入的 TaskManager 內存模型的更改[7]。
7.5. Table API/SQL 變更
7.5.1. SQL Connectors 中的 Metadata 處理
如果可以將某些 source(和 format)的元數據作為額外字段暴露給用戶,對於需要將元數據與記錄數據一起處理的用戶來說很有意義。一個常見的例子是 Kafka,用戶可能需要訪問 offset、partition 或 topic 信息、讀寫 kafka 消息中的 key 或 使用消息 metadata中的時間戳進行時間相關的操作。
在 Flink 1.12 中,Flink SQL 支持了元數據列用來讀取和寫入每行數據中 connector 或 format 相關的列(FLIP-107)。這些列在 CREATE TABLE 語句中使用 METADATA(保留)關鍵字來聲明。
CREATE TABLE kafka_table (
id BIGINT,
name STRING,
event_time TIMESTAMP(3) METADATA FROM 'timestamp', -- access Kafka 'timestamp' metadata
headers MAP METADATA -- access Kafka 'headers' metadata
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'format' = 'avro'
);
在 Flink 1.12 中,已經支持 Kafka 和 Kinesis connector 的元數據,並且 FileSystem connector 上的相關工作也已經在計划中(FLINK-19903)。由於 Kafka record 的結構比較復雜,社區還專門為 Kafka connector 實現了新的屬性[8],以控制如何處理鍵/值對。關於 Flink SQL 中元數據支持的完整描述,請查看每個 connector 的文檔[9]以及 FLIP-107 中描述的用例。
7.5.2. Upsert Kafka Connector
在某些場景中,例如讀取 compacted topic 或者輸出(更新)聚合結果的時候,需要將 Kafka 消息記錄的 key 當成主鍵處理,用來確定一條數據是應該作為插入、刪除還是更新記錄來處理。為了實現該功能,社區為 Kafka 專門新增了一個 upsert connector(upsert-kafka),該 connector 擴展自現有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既可以作為 source 使用,也可以作為 sink 使用,並且提供了與現有的 kafka connector 相同的基本功能和持久性保證,因為兩者之間復用了大部分代碼。
要使用 upsert-kafka connector,必須在創建表時定義主鍵,並為鍵(key.format)和值(value.format)指定序列化反序列化格式。完整的示例,請查看最新的文檔[10]。
7.5.3. SQL 中 支持 Temporal Table Join
在之前的版本中,用戶需要通過創建時態表函數(temporal table function) 來支持時態表 join(temporal table join) ,而在 Flink 1.12 中,用戶可以使用標准的 SQL 語句 FOR SYSTEM_TIME AS OF(SQL:2011)來支持 join。此外,現在任意包含時間列和主鍵的表,都可以作為時態表,而不僅僅是 append-only 表。這帶來了一些新的應用場景,比如將 Kafka compacted topic 或數據庫變更日志(來自 Debezium 等)作為時態表。
CREATE TABLE orders (
order_id STRING,
currency STRING,
amount INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND
) WITH (
…
);
-- Table backed by a Kafka compacted topic
CREATE TABLE latest_rates (
currency STRING,
rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time - INTERVAL ‘5’ SECOND,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
…
);
-- Event-time temporal table join
SELECT
o.order_id,
o.order_time,
o.amount * r.rate AS amount,
r.currency
FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.order_time r
ON o.currency = r.currency;
上面的示例同時也展示了如何在 temporal table join 中使用 Flink 1.12 中新增的 upsert-kafka connector。
- 使用 Hive 表進行 Temporal Table Join
用戶也可以將 Hive 表作為時態表來使用,Flink 既支持自動讀取 Hive 表的最新分區作為時態表(FLINK-19644),也支持在作業執行時追蹤整個 Hive 表的最新版本作為時態表。請參閱文檔,了解更多關於如何在 temporal table join 中使用 Hive 表的示例。
7.5.4. Table API/SQL 中的其它改進
- Kinesis Flink SQL Connector (FLINK-18858)
從 Flink 1.12 開始,Table API / SQL 原生支持將 Amazon Kinesis Data Streams(KDS)作為 source 和 sink 使用。新的 Kinesis SQL connector 提供了對於增強的Fan-Out(EFO)以及 Sink Partition 的支持。如需了解 Kinesis SQL connector 所有支持的功能、配置選項以及對外暴露的元數據信息,請查看最新的文檔。
- 在 FileSystem/Hive connector 的流式寫入中支持小文件合並 (FLINK-19345)
很多 bulk format,例如 Parquet,只有當寫入的文件比較大時,才比較高效。當 checkpoint 的間隔比較小時,這會成為一個很大的問題,因為會創建大量的小文件。在 Flink 1.12 中,File Sink 增加了小文件合並功能,從而使得即使作業 checkpoint 間隔比較小時,也不會產生大量的文件。要開啟小文件合並,可以按照文檔[11]中的說明在 FileSystem connector 中設置 auto-compaction = true 屬性。
- Kafka Connector 支持 Watermark 下推 (FLINK-20041)
為了確保使用 Kafka 的作業的結果的正確性,通常來說,最好基於分區來生成 watermark,因為分區內數據的亂序程度通常來說比分區之間數據的亂序程度要低很多。Flink 現在允許將 watermark 策略下推到 Kafka connector 里面,從而支持在 Kafka connector 內部構造基於分區的 watermark[12]。一個 Kafka source 節點最終所產生的 watermark 由該節點所讀取的所有分區中的 watermark 的最小值決定,從而使整個系統可以獲得更好的(即更接近真實情況)的 watermark。該功能也允許用戶配置基於分區的空閑檢測策略,以防止空閑分區阻礙整個作業的 event time 增長。
新增的 Formats
利用 Multi-input 算子進行 Join 優化 (FLINK-19621)
Shuffling 是一個 Flink 作業中最耗時的操作之一。為了消除不必要的序列化反序列化開銷、數據 spilling 開銷,提升 Table API / SQL 上批作業和流作業的性能, planner 當前會利用上一個版本中已經引入的N元算子(FLIP-92),將由 forward 邊所連接的多個算子合並到一個 Task 里執行。
Type Inference for Table API UDAFs (FLIP-65)
Flink 1.12 完成了從 Flink 1.9 開始的,針對 Table API 上的新的類型系統[2]的工作,並在聚合函數(UDAF)上支持了新的類型系統。從 Flink 1.12 開始,與標量函數和表函數類似,聚合函數也支持了所有的數據類型。
7.6. PyFlink: Python DataStream API
為了擴展 PyFlink 的可用性,Flink 1.12 提供了對於 Python DataStream API(FLIP-130)的初步支持,該版本支持了無狀態類型的操作(例如 Map,FlatMap,Filter,KeyBy 等)。如果需要嘗試 Python DataStream API,可以安裝PyFlink,然后按照該文檔[14]進行操作,文檔中描述了如何使用 Python DataStream API 構建一個簡單的流應用程序。
from pyflink.common.typeinfo import Types
from pyflink.datastream import MapFunction, StreamExecutionEnvironment
class MyMapFunction(MapFunction):
def map(self, value):
return value + 1
env = StreamExecutionEnvironment.get_execution_environment()
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
mapped_stream.print()
env.execute("datastream job")
7.7.PyFlink 中的其它改進
- PyFlink Jobs on Kubernetes (FLINK-17480)
除了 standalone 部署和 YARN 部署之外,現在也原生支持將 PyFlink 作業部署在 Kubernetes 上。最新的文檔中詳細描述了如何在 Kubernetes 上啟動 session 或 application 集群。
- 用戶自定義聚合函數 (UDAFs)
從 Flink 1.12 開始,您可以在 PyFlink 作業中定義和使用 Python UDAF 了(FLIP-139)。普通的 UDF(標量函數)每次只能處理一行數據,而 UDAF(聚合函數)則可以處理多行數據,用於計算多行數據的聚合值。您也可以使用 Pandas UDAF[15](FLIP-137),來進行向量化計算(通常來說,比普通 Python UDAF 快10倍以上)。
注意: 普通 Python UDAF,當前僅支持在 group aggregations 以及流模式下使用。如果需要在批模式或者窗口聚合中使用,建議使用 Pandas UDAF。
原文: https://developer.aliyun.com/article/780123
官方原文: https://flink.apache.org/news/2020/12/10/release-1.12.0.html
八 .Flink 1.13 版本
概要
這個版本是一些永久性的更新,幫助用戶更好理解Flink程序的性能。當我們的流處理的速度並不是我們希望看到的性能的時候,這些新特性能幫助我們找到原因:數據加載和背壓圖能幫助定位性能瓶頸所在, CPU火焰圖可以定位哪些代碼是程序中的熱點代碼,State Access Latencies可以查看狀態的保存情況
除了上述的特征,Flink社區還改進了系統的許多地方,其中有一些會在下面展示。
主要功能點
響應式伸縮
響應式伸縮是Flink的最新功能,它使流處理應用程序和其他應用程序一樣自然,一樣管理簡單。
Flink的資源管理和部署具有雙重特性,我們可以將Flink應用程序部署到K8S或者YARN等資源協調器上,這樣Flink會積極管理和分配資源,並釋放workers. 這對於快速修改Jobs或者Application的資源要求是非常有用的,比如批處理的應用或者ad-hoc的SQL查詢。worker的數量將遵循Flink應用的並行度。
對於長時間運行的無限流程序,部署模式和其他長期運行的程序一樣:應用程序不知道自己是運行在K8S,EKS或者YARN平台上,也不需要嘗試獲取一定數量的worker;相反,只需要提供給應用程序worker的數量,應用程序會根據提供的worker的數量自動調節並行度,我們稱這種特性為響應式伸縮。
應用程序的部署模式開啟了這項工作,類似於其他的程序部署一樣(通過避開兩個單獨的步驟部署:1. 開啟一個集群 2. 提交一個應用)。Flink的響應式伸縮模型完成了這點,使用者不需要使用額外的工具(腳本或者K8S命令)來保持worder的數量和程序的並行度的一致。
現在可以像對待其他典型應用程序一樣,在Flink的應用程序中放一個自動伸縮器。同時在配置自動伸縮器的時候,你需要關心重新縮放的成本,因為有狀態的流處理在伸縮的時候需要移動它的狀態。
如果你需要嘗試這個伸縮器的,需要添加scheduler-mode: reactive 這個配置到集群(只能是standalone 或者K8S集群)。詳細參考
分析應用程序性能
和其他的程序一樣,分析和理解Flink應用程序的性能是非常重要的。通常更關鍵的是,在了解性能的同時我們希望Flink能夠在(近)實時延遲內提供結果,因為Flink應用程序通常是數據密集型的應用。
當程序處理的速度跟不上數據進來的速度,或者應用程序占用的資源超過了預期,下面的功能能幫助我們追蹤到原因:
Bottleneck detection, Back Pressure monitoring
性能分析期間的第一個問題通常是:哪個Operation是瓶頸?
為了幫助回答這個問題,Flink公開了一些指標來描述那些當前處於繁忙或者背壓狀態的tasks的繁忙程度或者被壓程度(背壓是指有能力工作但不能工作,因為它們的后續操作符不能接受更多數據)。瓶頸所在都是那些繁忙的operators, 它們的上游operator實際承擔大數據量的壓力。
Flink 1.13帶來了一個改進的背壓度量系統(使用任務郵箱計時而不是線程堆棧采樣),以及一個重新設計的作業數據流圖形表示,用顏色編碼和繁忙度和背壓比率表示。
- CPU flame graphs in Web UI
性能分析的第二個問題是:在所有有性能瓶頸的operators中,哪些operator的工作開銷是最昂貴的?
回答這個問題,最直觀的就是看CPU的火焰圖:
- 當前哪些方法在消耗CPU的資源?
- 各個方法消耗的CPU的資源的多少對比?
- 堆棧上的哪些調用會導致執行特定的方法?
火焰圖是跟蹤堆棧線程然后重復多次采樣而生成的。每個方法的調用都會有一個長方型表示,長方型的長度和它在采樣中出現的次數成正比。啟用后,可以在Operator UI上查看:
Access Latency Metrics for State
還有一個性能瓶頸的地方可能是backend state, 特別是當你的狀態大小大於Flink當前可用的主內存並且你使用的是RockDB存儲你的狀態。
這並不是說RockDB慢,而是它在一定的條件下才能實現良好的性能。如果在雲上使用了錯誤的硬盤資源類型,可有可能導致RockDB對磁盤IOPs的需求不足。
在CPU火焰圖之上,新的后端狀態延遲指標可以幫助解狀態是否響應。e.g. 如果您看到RocksDB狀態訪問開始花費幾毫秒的時間,可能需要查看您的內存和I/O配置。這些指標可以通過設置state.backend.rocksdb.latency-track-enabled可選項來激活使用。指標抽樣收集應該對RocksDB狀態后端性能有很小的影響。
Switching State Backend with savepoints
當需要從savepoint中回復Flink Job的時候,現在可以更改state backend。 這就意味着Flink的應用的狀態不再鎖定在程序最初啟動時使用的狀態了。e.g. 基於這個特性,我們可以在開始時使用HashMap來記錄狀態(純粹在JVM中), 然后再狀態增長太大的時候切換到RockDB來記錄狀態。
實際上,Flink現在有了規范的Savepoint格式,當為Savepoint創建數據快照時,所有狀態后端都使用這種格式。
User-specified pod templates for Kubernetes deployments 在native K8S 部署模式下,用戶可以指定pod模板。
使用這些模板,用戶可以以Kubernetes-y的方式配置JobManagers和TaskManagers,其靈活性超出了直接內置到Flink的Kubernetes集成中的配置選項。
Unaligned Checkpoints - production-ready
非對齊的checkpoint可以在生產中使用了。如果你想在背壓狀態下看到程序的問題,鼓勵使用unaligned checkpoints.
下面這些改變使unaligned checkpoints更容易使用:
- 在可以從unaligned checkpoints重新調整應用程序。如果您的應用程序由於無法(負擔不起)創建Savepoints而需要從checkpoints進行擴展,那么這將非常方便
- 對於沒有back-pressured的應用程序,啟用unaligned checkpoints成本更低。unaligned checkpoints現在可以通過超時自適應地觸發,這意味着一個checkpoint作為一個對齊的checkpoint開始(不存儲任何飛行中的事件),並回落到一個未對齊的checkpoint(存儲一些飛行中的事件),如果對齊階段花費的時間超過了一定的時間.
Machine Learning Library moving to a separate repository
為了加快Flink機器學習(流、批處理和統一機器學習)的開發, 我們把主要經歷放在Flink項目下的新庫Flink-ml。在這里,我們遵循類似於Stateful Functions的方法,通過允許更多輕量級貢獻工作流和單獨的發布周期,單獨的存儲庫幫助加快了開發。
請繼續關注機器學習方面的更多更新,比如與ALink (Flink上許多常見的機器學習算法套件)的相互作用,或者Flink與TensorFlow的集成。
SQL和表的接口改進
與以前的版本一樣,SQL和Table API仍然需求量最大的部分。
Windows via Table-valued functions 表值函數定義窗口
定義時間窗口是流SQL查詢中最常見的操作之一。Flink 1.13介紹了一種定義窗口的新方法: 通過表值函數。這種方法不僅表達能力更強(允許您定義新的窗口類型),而且完全符合SQL標准。
Flink 1.13支持新的語法中的TUMBLE和HOP窗口,后續的版本中還會有SESSION窗口。為了演示增加的表達能力,考慮下面兩個例子。
-- 一種新的 CUMULATE 窗函數,它給窗口分配一個擴展步長直到達到最大窗口大小:
SELECT window_time, window_start, window_end, SUM(price) AS total_price
FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, window_time;
可以引用表值窗口函數的窗口開始時間和窗口結束時間,從而使新類型的構造成為可能。除了常規的窗口聚合和窗口連接之外,現在可以表示窗口Top-K聚合:
SELECT window_time, ...
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC)
as rank
FROM t
) WHERE rank <= 100;
改進DataStream API和Table API/SQL之間的互操轉換
這個版本從根本上簡化了DataStream API和Table API程序的混合。Table API是開發應用程序的好方法,它具有聲明特性和許多內置函數。 但有時需要轉到使用DataStream API,以獲得其表達性、靈活性和對狀態的顯式控制。
新的方法StreamTableEnvironment.toDataStream()/.fromDataStream() 從DataStream API創建一個DataStream作為表的Source或者Sink.
顯著的改進有:
- DataStream和Table API類型自動類型轉換
- Event Time配置的無縫集成; 為了高一致性,水印在邊界流動
- 對Row類(表示來自Table API的行事件)的增強已經得到了重大改進(改進了toString()/hashCode()/equals()方法的行為),現在支持通過名稱訪問實例屬性值,並支持稀疏表示。
Table table=tableEnv.fromDataStream(
dataStream,Schema.newBuilder()
.columnByMetadata("rowtime","TIMESTAMP(3)")
.watermark("rowtime","SOURCE_WATERMARK()")
.build());
DataStream<Row> dataStream=tableEnv.toDataStream(table)
.keyBy(r->r.getField("user"))
.window(...)
SQL Client: Init scripts and Statement Sets
SQL客戶端是一種直接運行和部署SQL流作業和批處理作業的簡便方法,不需要命令行編寫代碼,也不需要CI/CD的支持。
本版本改進了許多SQL客戶端的功能,幾乎Java應用中可以使用所有的operations都可以在SQL客戶端或者SQL腳本中使用。也就是說SQL用戶將寫更少的代碼。
Easier Configuration and Code Sharing 更簡單的配置和代碼共享
SQL客戶端將停止對YAML文件的支持,轉而在執行主SQL腳本前接受一個或者多個初始化腳本來配置session.
這些初始化的腳本通常在軟對或者部署之間共享,可以用於加載公共的catalogs,應用公共配置設置或者定義標准視圖。
./sql-client.sh -i init1.sql init2.sql -f sqljob.sql
更多的配置選項
一組更大的可識別配置選項和改進的set/RESET命令使得在SQL客戶端和SQL腳本中定義和控制應用程序的執行變得更容易。
在一個上下文中支持多查詢
支持在一個Flink Job中執行多個SQL語句查詢,這對無限流中的SQL查詢非常有用。
Statement Set是將應該放在一起執行的查詢分組在一起的機制
下面是一個可以通過Flink SQL命令行客戶端運行的SQL腳本例子。 它設置和配置環境並執行多個查詢。 該腳本捕獲端到端查詢和所有環境構建和配置工作,使其成為自包含的artifact。
-- set up a catalog
CREATE CATALOG hive_catalog WITH ('type' = 'hive');
USE CATALOG hive_catalog;
-- or use temporary objects
CREATE TEMPORARY TABLE clicks (
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = '...',
'format' = 'avro'
);
-- set the execution mode for jobs
SET execution.runtime-mode=streaming;
-- set the sync/async mode for INSERT INTOs
SET table.dml-sync=false;
-- set the job's parallelism
SET parallism.default=10;
-- set the job name
SET pipeline.name = my_flink_job;
-- restore state from the specific savepoint path
SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-bb0dab;
BEGIN STATEMENT SET;
INSERT INTO pageview_pv_sink
SELECT page_id, count(1) FROM clicks GROUP BY page_id;
INSERT INTO pageview_uv_sink
SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;
Hive查詢語法兼容性
現在可以使用Hive SQL語法編寫針對Flink的SQL查詢。 除了Hive的DDL方言,Flink現在也接受常用的Hive DML和DQL方言。
要使用Hive SQL方言,設置 table.sql-dialect 為 hive並加載 HiveModule 。 HiveModule 的加載很重要,因為Hive的內置函數需要適當的語法和語義兼容性。 下面的例子說明了這一點:
CREATE CATALOG myhive WITH ('type' = 'hive'); -- setup HiveCatalog
USE CATALOG myhive;
LOAD MODULE hive; -- setup HiveModule
USE MODULES hive,core;
SET table.sql-dialect = hive; -- enable Hive dialect
SELECT key, value FROM src CLUSTER BY key; -- run some Hive queries
請注意,Hive方言不再支持Flink的SQL語法的DML和DQL語句, 需要切換回Flink語法的默認方言。
改進SQL時間函數的行為
處理時間是任何數據處理的關鍵要素。 但同時,處理包含不同的時區、日期和時間的數據時是一項非常精細的任務。
在Flink 1.13。 官方花了很多精力簡化與時間相關的函數的使用。 調整了(更具體地)函數的返回類型,如 PROCTIME() 、 CURRENT_TIMESTAMP 、 NOW()。
此外,您現在還可以在 TIMESTAMP_LTZ 列上定義event time屬性,以便在Daylight Saving Time的支持下優雅地進行窗口處理。
PyFlink的改進
PyFlink這個版本的主要的主題是讓Python DataStream API和Table API在特性上更接近Java/Scala API。
Python DataStream API中的有狀態操作
在Flink 1.13中, Python程序員現在也可以充分享受Apache Flink的有狀態流處理api的潛力。 Flink 1.12中引入的重新架構過的Python DataStream API,現在擁有完整的狀態功能,允許用戶記住狀態中的事件的信息,並在以后對其進行操作。
這種有狀態處理能力是許多更復雜的處理操作的基礎,這些操作需要記住跨單個事件的信息(例如,Windowing operations)。
下面這個例子展示了一個自定義計數窗口的實現,使用state:
class CountWindowAverage(FlatMapFunction):
def __init__(self, window_size):
self.window_size = window_size
def open(self, runtime_context: RuntimeContext):
descriptor = ValueStateDescriptor("average", Types.TUPLE([Types.LONG(), Types.LONG()]))
self.sum = runtime_context.get_state(descriptor)
def flat_map(self, value):
current_sum = self.sum.value()
if current_sum is None:
current_sum = (0, 0)
# update the count
current_sum = (current_sum[0] + 1, current_sum[1] + value[1])
# if the count reaches window_size, emit the average and clear the state
if current_sum[0] >= self.window_size:
self.sum.clear()
yield value[0], current_sum[1] // current_sum[0]
else:
self.sum.update(current_sum)
ds = ... # type: DataStream
ds.key_by(lambda row: row[0]) \
.flat_map(CountWindowAverage(5))
PyFlink DataStream API中用戶自定義窗口
Flink 1.13為PyFlink DataStream API增加了對用戶定義的窗口的支持。 Flink程序現在可以在標准窗口定義之外使用窗口。
因為窗口是所有處理無限流的程序的核心(通過將無限流分割成有大小的“桶”),這大大提高了API的表達能力。
PyFlink Table API中基於行的操作
Python Table API現在支持基於行的操作,例如,自定義的行轉換函數。 這些函數是在內置函數之外對表應用數據轉換的一種簡單方法。
這是一個在Python Table API中使用map()操作的例子:
@udf(result_type=DataTypes.ROW(
[DataTypes.FIELD("c1", DataTypes.BIGINT()),
DataTypes.FIELD("c2", DataTypes.STRING())]))
def increment_column(r: Row) -> Row:
return Row(r[0] + 1, r[1])
table = ... # type: Table
mapped_result = table.map(increment_column)
除了map()之外,該API還支持flat_map()、aggregate()、flat_aggregate()和其他基於行操作的函數。 這使得Python Table API與Java Table API在特性上更相近了。
PyFlink DataStream程序的批處理執行模式
PyFlink DataStream API現在也支持有界流的批處理執行模式,這是在Flink 1.12中為Java DataStream API引入的。
批處理執行模式通過利用有界流的特性,繞過狀態后端和檢查點,簡化了有界流上的操作並提高了程序的性能。
其他改進
使用Hugo查看Flink Documentation
Flink文檔已經從Jekyll遷移到了Hugo。
Web UI中的歷史異常
Flink Web UI將顯示多個(n個)導致作業失敗的異常。 這有助於調試由根故障導致后續故障的場景。 可以在異常歷史記錄中找到失敗的根本原因。
更好的報告失敗的checkpoints的異常或者失敗的原因
Flink現在提供了失敗或被中止的檢查點的統計信息,以便更容易地確定失敗原因,而不必分析日志
以前版本的Flink只有在檢查點成功的情況下才會報告指標(例如,持久化數據的大小、觸發時間)。
PyFlink Table API支持用戶在Group Windows自定義聚合函數
PyFlink的Table API中的組窗口現在同時支持一般的Python用戶定義聚合函數(udaf)和Pandas udaf。 這些功能對於許多分析和ML訓練程序是至關重要的。
Flink 1.13對以前的版本進行了改進,在以前的版本中,這些函數只支持無界的Group-by聚合。
改進Batch Execution下的Sort-Merge Shuffle
Flink 1.13提高了批執行程序的內存穩定性和Sort-Merge Shuffle的性能,Flink 1.12最初是通過FLIP-148引入的。
具有更高並行度(1000秒)的程序應該不再頻繁觸發OutOfMemoryError: Direct Memory。 通過更好的I/O調度和廣播優化提高了性能(特別是在旋轉磁盤上)。
HBase連接器支持異步查找和查找緩存
HBase Lookup Table Source現在支持異步查找模式和查找緩存。 這極大地提高了對HBase進行查找連接的Table/SQL作業的性能,同時減少了典型場景下對HBase的I/O請求。
在以前的版本中,HBase Lookup Source只進行同步通信,導致管道利用率和吞吐量較低。
九、Flink1.14版本
批流一體
流批一體其實從 Flink 1.9 版本開始就受到持續的關注,它作為社區 RoadMap 的重要組成部分,是大數據實時化必然的趨勢。但是另一方面,傳統離線的計算需求其實並不會被實時任務完全取代,而是會長期存在。
在實時和離線的需求同時存在的狀態下,以往的流批獨立技術方案存在着一些痛點,比如:
- 需要維護兩套系統,相應的就需要兩組開發人員,人力的投入成本很高;
- 另外,兩套數據鏈路處理相似內容帶來維護的風險性和冗余;
- 最重要的一點是,如果流批使用的不是同一套數據處理系統,引擎本身差異可能會存在數據口徑不一致的問題,從而導致業務數據存在一定的誤差。這種誤差對於大數據分析會有比較大的影響。
在這樣的背景下,Flink 社區認定了實時離線一體化的技術路線是比較重要的技術趨勢和方向。
Flink 在過去的幾個版本中,在流批一體方面做了很多的工作。可以認為 Flink 在引擎層面,API 層面和算子的執行層面上做到了真正的流與批用同一套機制運行。但是在任務具體的執行模式上會有 2 種不同的模式:
對於無限的數據流,統一采用了流的執行模式。流的執行模式指的是所有計算節點是通過 Pipeline 模式去連接的,Pipeline 是指上游和下游計算任務是同時運行的,隨着上游不斷產出數據,下游同時在不斷消費數據。這種全 Pipeline 的執行方式可以:
- 通過 eventTime 表示數據是什么時候產生的;
- 通過 watermark 得知在哪個時間點,數據已經到達了;
- 通過 state 來維護計算中間狀態;
- 通過 Checkpoint 做容錯的處理。
下圖是不同的執行模式:
- 對於有限的數據集有 2 種執行模式,我們可以把它看成一個有限的數據流去做處理,也可以把它看成批的執行模式。批的執行模式雖然也有 eventTime,但是對於 watermark 來說只支持正無窮。對數據和 state 排序后,它在任務的調度和 shuffle 上會有更多的選擇。
流批的執行模式是有區別的,最主要的就是批的執行模式會有落盤的中間過程,只有當前面任務執行完成,下游的任務才會觸發,這個容錯機制是通過 shuffle 進行容錯的。
這 2 者也各有各的執行優勢:
- 對於流的執行模式來說,它沒有落盤的壓力,同時容錯是基於數據的分段,通過不斷對數據進行打點 Checkpoint 去保證斷點恢復;
- 然而在批處理上,因為要經過 shuffle 落盤,所以對磁盤會有壓力。但是因為數據是經過排序的,所以對批來說,后續的計算效率可能會有一定的提升。同時,在執行時候是經過分段去執行任務的,無需同時執行。在容錯計算方面是根據 stage 進行容錯。
這兩種各有優劣,可以根據作業的具體場景來進行選擇。
Flink 1.14 的優化點主要是針對在流的執行模式下,如何去處理有限數據集。之前處理無限數據集,和現在處理有限數據集最大的區別在於引入了 "任務可能會結束" 的概念。在這種情況下帶來一些新的問題,如下圖:
在流的執行模式下的 Checkpoint 機制
- 對於無限流,它的 Checkpoint 是由所有的 source 節點進行觸發的,由 source 節點發送 Checkpoint Barrier ,當 Checkpoint Barrier 流過整個作業時候,同時會存儲當前作業所有的 state 狀態。
- 而在有限流的 Checkpoint 機制中,Task 是有可能提早結束的。上游的 Task 有可能先處理完任務提早退出了,但下游的 Task 卻還在執行中。在同一個 stage 不同並發下,有可能因為數據量不一致導致部分任務提早完成了。這種情況下,在后續的執行作業中,如何進行 Checkpoint?
在 1.14 中,JobManager 動態根據當前任務的執行情況,去明確 Checkpoint Barrier 是從哪里開始觸發。同時在部分任務結束后,后續的 Checkpoint 只會保存仍在運行 Task 所對應的 stage,通過這種方式能夠讓任務執行完成后,還可以繼續做 Checkpoint ,在有限流執行中提供更好的容錯保障。
Task 結束后的兩階段提交
我們在部分 Sink 使用上,例如下圖的 Kafka Sink 上,涉及到 Task 需要依靠 Checkpoint 機制,進行二階段提交,從而保證數據的 Exactly-once 一致性。
具體可以這樣說:在 Checkpoint 過程中,每個算子只會進行准備提交的操作。比如數據會提交到外部的臨時存儲目錄下,所有任務都完成這次 Checkpoint 后會收到一個信號,之后才會執行正式的 commit,把所有分布式的臨時文件一次性以事務的方式提交到外部系統。
這種算法在當前有限流的情況下,作業結束后並不能保證有 Checkpoint,那么最后一部分數據如何提交?
在 1.14 中,這個問題得到了解決。Task 處理完所有數據之后,必須等待 Checkpoint 完成后才可以正式的退出,這是流批一體方面針對有限流任務結束的一些改進。
Checkpoint 機制
1. 現有 Checkpoint 機制痛點
目前 Flink 觸發 Checkpoint 是依靠 barrier 在算子間進行流通,barrier 隨着算子一直往下游進行發送,當算子下游遇到 barrier 的時候就會進行快照操作,然后再把 barrier 往下游繼續發送。對於多路的情況我們會把 barrier 進行對齊,把先到 barrier 的這一路數據暫時性的 block,等到兩路 barrier 都到了之后再做快照,最后才會去繼續往下發送 barrier。
現有的 Checkpoint 機制存在以下問題:
- 反壓時無法做出 Checkpoint :在反壓時候 barrier 無法隨着數據往下游流動,造成反壓的時候無法做出 Checkpoint。但是其實在發生反壓情況的時候,我們更加需要去做出對數據的 Checkpoint,因為這個時候性能遇到了瓶頸,是更加容易出問題的階段;
- Barrier 對齊阻塞數據處理 :阻塞對齊對於性能上存在一定的影響;
- 恢復性能受限於 Checkpoint 間隔 :在做恢復的時候,延遲受到多大的影響很多時候是取決於 Checkpoint 的間隔,間隔越大,需要 replay 的數據就會越多,從而造成中斷的影響也就會越大。但是目前 Checkpoint 間隔受制於持久化操作的時間,所以沒辦法做的很快。
2. Unaligned Checkpoint
針對這些痛點,Flink 在最近幾個版本一直在持續的優化,Unaligned Checkpoint 就是其中一個機制。barrier 算子在到達 input buffer 最前面的時候,就會開始觸發 Checkpoint 操作。它會立刻把 barrier 傳到算子的 OutPut Buffer 的最前面,相當於它會立刻被下游的算子所讀取到。通過這種方式可以使得 barrier 不受到數據阻塞,解決反壓時候無法進行 Checkpoint 的問題。
當我們把 barrier 發下去后,需要做一個短暫的暫停,暫停的時候會把算子的 State 和 input output buffer 中的數據進行一個標記,以方便后續隨時准備上傳。對於多路情況會一直等到另外一路 barrier 到達之前數據,全部進行標注。
通過這種方式整個在做 Checkpoint 的時候,也不需要對 barrier 進行對齊,唯一需要做的停頓就是在整個過程中對所有 buffer 和 state 標注。這種方式可以很好的解決反壓時無法做出 Checkpoint ,和 Barrier 對齊阻塞數據影響性能處理的問題。
3. Generalized Incremental Checkpoint
Generalized Incremental Checkpoint 主要是用於減少 Checkpoint 間隔,如左圖 1 所示,在 Incremental Checkpoint 當中,先讓算子寫入 state 的 changelog。寫完后才把變化真正的數據寫入到 StateTable 上。state 的 changelog 不斷向外部進行持久的存儲化。在這個過程中我們其實無需等待整個 StateTable 去做一個持久化操作,我們只需要保證對應的 Checkpoint 這一部分的 changelog 能夠持久化完成,就可以開始做下一次 Checkpoint。StateTable 是以一個周期性的方式,獨立的去對外做持續化的一個過程。
這兩個過程進行拆分后,就有了從之前的需要做全量持久化 (Per Checkpoint) 變成 增量持久化 (Per Checkpoint) + 后台周期性全量持久化,從而達到同樣容錯的效果。在這個過程中,每一次 Checkpoint 需要做持久化的數據量減少了,從而使得做 Checkpoint 的間隔能夠大幅度減少。
其實在 RocksDB 也是能支持 Incremental Checkpoint 。但是有兩個問題:
- 第一個問題是 RocksDB 的 Incremental Checkpoint 是依賴它自己本身的一些實現,當中會存在一些數據壓縮,壓縮所消耗的時間以及壓縮效果具有不確定性,這個是和數據是相關的;
- 第二個問題是只能針對特定的 StateBackend 來使用,目前在做的 Generalized Incremental Checkpoint 實際上能夠保證的是,它與 StateBackend 是無關的,從運行時的機制來保證了一個比較穩定、更小的 Checkpoint 間隔。 目前 Unaligned Checkpoint 是在 Flink 1.13 就已經發布了,在 1.14 版本主要是針對 bug 的修復和補充,針對 Generalized Incremental Checkpoint,目前社區還在做最后的沖刺,比較有希望在 1.14 中和大家見面。
性能與效率
- 大規模作業調度的優化
構建 Pipeline Region 的性能提升:所有由 pipline 邊所連接構成的子圖 。在 Flink 任務調度中需要通過識別 Pipeline Region 來保證由同一個 Pipline 邊所連接的任務能夠同時進行調度。否則有可能上游的任務開始調度,但是下游的任務並沒有運行。從而導致上游運行完的數據無法給下游的節點進行消費,可能會造成死鎖的情況 任務部署階段:每個任務都要從哪些上游讀取數據,這些信息會生成 Result Partition Deployment Descriptor。 這兩個構建過程在之前的版本都有 O (n^2) 的時間復雜度,主要問題需要對於每個下游節點去遍歷每一個上游節點的情況。例如去遍歷每一個上游是不是一個 Pipeline 邊連接的關系,或者去遍歷它的每一個上游生成對應的 Result Partition 信息。
目前通過引入 group 概念,假設已知上下游 2 個任務的連接方式是 all-to-all,那相當於把所有 Pipeline Region 信息或者 Result Partition 信息以 Group 的形式進行組合,這樣只需知道下游對應的是上游的哪一個 group,就可以把一個 O (n^2) 的復雜度優化到了 O (n)。我們用 wordcount 任務做了一下測試,對比優化前后的性能。
從表格中可以看到構建速度具有大幅度提升,構建 Pipeline Region 的性能從秒級提升至毫秒級別。任務部署我們是從第一個任務開始部署到所有任務開始運行的狀態,這邊只統計了流,因為批需要上游結束后才能結束調度。從整體時間來看,整個任務初始化,調度以及部署的階段,大概能夠減少分鍾級的時間消耗。
- 細粒度資源管理
細粒度資源管理在過去很多的版本都一直在做,在 Flink1.14 終於可以把這一部分 API 開放出來在 DataSteam 提供給用戶使用了。用戶可以在 DataStream 中自定義 SlotSharingGroup 的划分情況,如下圖所示的方式去定義 Slot 的資源划分,實現了支持 DataStream API,自定義 SSG 划分方式以及資源配置 TaskManager 動態資源扣減。
對於每一個 Slot 可以通過比較細粒度的配置,我們在 Runtime 上會自動根據用戶資源配置進行動態的資源切割。
這樣做的好處是不會像之前那樣有固定資源的 Slot,而是做資源的動態扣減,通過這樣的方式希望能夠達到更加精細的資源管理和資源的使用率。
Table / SQL / Python API
1. Table API / SQL
Window Table-Valued Function 支持更多算子與窗口類型 ,可以看如下表格的對比:
從表格中可以看出對於原有的三個窗口類型進行加強,同時新增 Session 窗口類型,目前支持 Aggregate 的操作。
1.1 支持聲明式注冊 Source/Sink
- Table API 支持使用聲明式的方式注冊 Source / Sink 功能對齊 SQL DDL;
- 同時支持 FLIP-27 新的 Source 接口;
- new Source 替代舊的 connect() 接口。
1.2 全新代碼生成器
解決了大家在生成代碼超過 Java 最長代碼限制,新的代碼生成器會對代碼進行拆解,徹底解決代碼超長的問題。
1.3 移除 Flink Planner
新版本中,Blink Planner 將成為 Flink Planner 的唯一實現。
2. Python API
在之前的版本中,如果有先后執行的兩個 UDF,它的執行過程如下圖左方。在 JVM 上面有 Java 的 Operator,先把數據發給 Python 下面的 UDF 去執行,執行后又發回給 Java,然后傳送給下游的 Operator,最后再進行一次 Python 的這種跨進程的傳輸去處理,會導致存在很多次冗余的數據傳輸。
在 1.14 版本中,改進如右圖,可以把它們連接在一起,只需要一個來回的 Java 和 Python 進行數據通信,通過減少傳輸數據次數就能夠達到比較好的性能上的提升。
3. 支持 LoopBack 模式
在以往本地執行實際是在 Python 的進程中去運行客戶端程序,提交 Java 進程啟動一個迷你集群去執行 Java 部分代碼。Java 部分代碼也會和生產環境部分的一樣,去啟動一個新的 Python 進程去執行對應的 Python UDF,從圖下可以看出新的進程其實在本地調試中是沒有必要存在的。
所以支持 lookback 模式后可以讓 Java 的 opt 直接把 UDF 運行在之前 Python client 所運行的相同的進程內,通過這種方式:
- 首先是避免了啟動額外進程所帶來的開銷;
- 最重要的是在本地調試中,我們可以在同一個進程內能夠更好利用一些工具進行 debug,這個是對開發者體驗上的一個提升。
原文:https://developer.aliyun.com/article/789352
官方地址:https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/