簡介: Flink 1.13.0 版本讓流處理應用的使用像普通應用一樣簡單和自然,並且讓用戶可以更好地理解流作業的性能。
翻譯 | 高贇
Review | 朱翥、馬國維
Flink 1.13 發布了!Flink 1.13 包括了超過 200 名貢獻者所提交的 1000 多項修復和優化。
這一版本中,Flink 的一個主要目標取得了重要進展,即讓流處理應用的使用和普通應用一樣簡單和自然。Flink 1.13 新引入的被動擴縮容使得流作業的擴縮容和其它應用一樣簡單,用戶僅需要修改並發度即可。
這個版本還包括一系列重要改動使用戶可以更好的理解流作業的性能。當流作業的性能不及預期的時候,這些改動可以使用戶可以更好的分析原因。這些改動包括用於識別瓶頸節點的負載和反壓可視化、分析算子熱點代碼的 CPU 火焰圖和分析 State Backend 狀態的 State 訪問性能指標。
除了這些特性外,Flink 社區還添加了大量的其它優化,我們會在本文后續討論其中的一些。我們希望用戶可以享受新的版本和特性帶來的便利,在本文最后,我們還會介紹升級Flink版本需要注意的一些變化。
我們鼓勵用戶下載試用新版 Flink 並且通過郵件列表和 JIRA 來反饋遇到的問題。
重要特性
被動擴縮容
Flink 項目的一個初始目標,就是希望流處理應用可以像普通應用一樣簡單和自然,被動擴縮容是 Flink 針對這一目標上的最新進展。
當考慮資源管理和部分的時候,Flink 有兩種可能的模式。用戶可以將 Flink 應用部署到 k8s、yarn 等資源管理系統之上,並且由 Flink 主動的來管理資源並按需分配和釋放資源。這一模式對於經常改變資源需求的作業和應用非常有用,比如批作業和實時 SQL 查詢。在這種模式下,Flink 所啟動的 Worker 數量是由應用設置的並發度決定的。在 Flink 中我們將這一模式叫做主動擴縮容。
對於長時間運行的流處理應用,一種更適合的模型是用戶只需要將作業像其它的長期運行的服務一樣啟動起來,而不需要考慮是部署在 k8s、yarn 還是其它的資源管理平台上,並且不需要考慮需要申請的資源的數量。相反,它的規模是由所分配的 worker 數量來決定的。當 worker 數量發生變化時,Flink 自動的改動應用的並發度。在 Flink 中我們將這一模式叫做被動擴縮容。
Flink 的 Application 部署模式開啟了使 Flink 作業更接近普通應用(即啟動 Flink 作業不需要執行兩個獨立的步驟來啟動集群和提交應用)的努力,而被動擴縮容完成了這一目標:用戶不再需要使用額外的工具(如腳本、K8s 算子)來讓 worker 的數量與應用並發度設置保持一致。
用戶現在可以將自動擴縮容的工具應用到 Flink 應用之上,就像普通的應用程序一樣,只要用戶了解擴縮容的代價:有狀態的流應用在擴縮容的時候需要將狀態重新分發。
如果想要嘗試被動擴縮容,用戶可以增加 scheduler-mode: reactive 這一配置項,然后啟動一個應用集群(Standalone 或者 K8s)。更多細節見被動擴縮容的文檔。
分析應用的性能
對所有應用程序來說,能夠簡單的分析和理解應用的性能是非常關鍵的功能。這一功能對 Flink 更加重要,因為 Flink 應用一般是數據密集的(即需要處理大量的數據)並且需要在(近)實時的延遲內給出結果。
當 Flink 應用處理的速度跟不上數據輸入的速度時,或者當一個應用占用的資源超過預期,下文介紹的這些工具可以幫你分析原因。
瓶頸檢測與反壓監控
Flink 性能分析首先要解決的問題經常是:哪個算子是瓶頸?
為了回答這一問題,Flink 引入了描述作業繁忙(即在處理數據)與反壓(由於下游算子不能及時處理結果而無法繼續輸出)程度的指標。應用中可能的瓶頸是那些繁忙並且上游被反壓的算子。
Flink 1.13 優化了反壓檢測的邏輯(使用基於任務 Mailbox 計時,而不在再於堆棧采樣),並且重新實現了作業圖的 UI 展示:Flink 現在在 UI 上通過顏色和數值來展示繁忙和反壓的程度。
Web UI 中的 CPU 火焰圖
Flink 關於性能另一個經常需要回答的問題:瓶頸算子中的哪部分計算邏輯消耗巨大?
針對這一問題,一個有效的可視化工具是火焰圖。它可以幫助回答以下問題:
- 哪個方法調現在在占用 CPU?
- 不同方法占用 CPU 的比例如何?
- 一個方法被調用的棧是什么樣子的?
火焰圖是通過重復采樣線程的堆棧來構建的。在火焰圖中,每個方法調用被表示為一個矩形,矩形的長度與這個方法出現在采樣中的次數成正比。火焰圖在 UI 上的一個例子如下圖所示。
火焰圖的文檔包括啟用這一功能的更多細節和指令。
State 訪問延遲指標
另一個可能的性能瓶頸是 state backend,尤其是當作業的 state 超過內存容量而必須使用 RocksDB state backend 時。
這里並不是想說 RocksDB 性能不夠好(我們非常喜歡 RocksDB!),但是它需要滿足一些條件才能達到最好的性能。例如,用戶可能很容易遇到非故意的在雲上由於使用了錯誤的磁盤資源類型而不能滿足 RockDB 的 IO 性能需求的問題。
基於 CPU 火焰圖,新的 State Backend 的延遲指標可以幫助用戶更好的判斷性能不符合預期是否是由 State Backend 導致的。例如,如果用戶發現 RocksDB 的單次訪問需要幾毫秒的時間,那么就需要查看內存和 I/O 的配置。這些指標可以通過設置 state.backend.rocksdb.latency-track-enabled 這一選項來啟用。這些指標是通過采樣的方式來監控性能的,所以它們對 RocksDB State Backend 的性能影響是微不足道的。
通過 Savepoint 來切換 State Backend
用戶現在可以在從一個 Savepoint 重啟時切換一個 Flink 應用的 State Backend。這使得 Flink 應用不再被限制只能使用應用首次運行時選擇的 State Backend。
基於這一功能,用戶現在可以首先使用一個 HashMap State Backend(純內存的 State Backend),如果后續狀態變得過大的話,就切換到 RocksDB State Backend 中。
在實現層,Flink 現在統一了所有 State Backend 的 Savepoint 格式來實現這一功能。
K8s 部署時使用用戶指定的 Pod 模式
原生 kubernetes 部署(Flink 主動要求 K8s 來啟動 Pod)中,現在可以使用自定義的 Pod 模板。
使用這些模板,用戶可以使用一種更符合 K8s 的方式來設置 JM 和 TM 的 Pod,這種方式比 Flink K8s 集成內置的配置項更加靈活。
生產可用的 Unaligned Checkpoint
Unaligned Checkpoint 目前已達到了生產可用的狀態,我們鼓勵用戶在存在反壓的情況下試用這一功能。
具體來說,Flink 1.13 中引入的這些功能使 Unaligned Checkpoint 更容易使用:
- 用戶現在使用 Unaligned Checkpoint 時也可以擴縮容應用。如果用戶需要因為性能原因不能使用 Savepoint而必須使用 Retained checkpoint 時,這一功能會非常方便。
- 對於沒有反壓的應用,啟用 Unaligned Checkpoint 現在代價更小。Unaligned Checkpoint 現在可以通過超時來自動觸發,即一個應用默認會使用 Aligned Checkpoint(不存儲傳輸中的數據),而只在對齊超過一定時間范圍時自動切換到 Unaligned Checkpoint(存儲傳輸中的數據)。
關於如何啟用 Unaligned Checkpoint 可以參考相關文檔。
機器學習遷移到單獨的倉庫
為了加速 Flink 機器學習的進展(流批統一的機器學習),現在 Flink 機器學習開啟了新的 flink-ml 倉庫。我們采用類似於 Stateful Function 項目的管理方式,通過使用一個單獨的倉庫從而簡化代碼合並的流程並且可以進行單獨的版本發布,從而提高開發的效率。
用戶可以關注 Flink 在機器學習方面的進展,比如與 Alink(Flink 常用機器學習算法套件)的互操作以及 Flink 與 Tensorflow 的集成。
SQL / Table API 進展
與之前的版本類似,SQL 和 Table API 仍然在所有開發中占用很大的比例。
通過 Table-valued 函數來定義時間窗口
在流式 SQL 查詢中,一個最經常使用的是定義時間窗口。Flink 1.13 中引入了一種新的定義窗口的方式:通過 Table-valued 函數。這一方式不僅有更強的表達能力(允許用戶定義新的窗口類型),並且與 SQL 標准更加一致。
Flink 1.13 在新的語法中支持 TUMBLE 和 HOP 窗口,在后續版本中也會支持 SESSION 窗口。我們通過以下兩個例子來展示這一方法的表達能力:
- 例 1:一個新引入的 CUMULATE 窗口函數,它可以支持按特定步長擴展的窗口,直到達到最大窗口大小:
SELECT window_time, window_start, window_end, SUM(price) AS total_price FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, window_time;
- 例 2:用戶在 table-valued 窗口函數中可以訪問窗口的起始和終止時間,從而使用戶可以實現新的功能。例如,除了常規的基於窗口的聚合和 Join 之外,用戶現在也可以實現基於窗口的 Top-K 聚合:
SELECT window_time, ... FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC) as rank FROM t ) WHERE rank <= 100;
提高 DataStream API 與 Table API / SQL 的互操作能力
這一版本極大的簡化了 DataStream API 與 Table API 混合的程序。
Table API 是一種非常方便的應用開發接口,因為這經支持表達式的程序編寫並提供了大量的內置函數。但是有時候用戶也需要切換回 DataStream,例如當用戶存在表達能力、靈活性或者 State 訪問的需求時。
Flink 新引入的 StreamTableEnvironment.toDataStream()/.fromDataStream() 可以將一個 DataStream API 聲明的 Source 或者 Sink 當作 Table 的 Source 或者 Sink 來使用。主要的優化包括:
- DataStream 與 Table API 類型系統的自動轉換。
- Event Time 配置的無縫集成,Watermark 行為的高度一致性。
- Row 類型(即 Table API 中數據的表示)有了極大的增強,包括 toString() / hashCode() 和 equals() 方法的優化,按名稱訪問字段值的支持與稀疏表示的支持。
Table table = tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP(3)") .watermark("rowtime", "SOURCE_WATERMARK()") .build()); DataStream<Row> dataStream = tableEnv.toDataStream(table) .keyBy(r -> r.getField("user")) .window(...);
SQL Client: 初始化腳本和語句集合 (Statement Sets)
SQL Client 是一種直接運行和部署 SQL 流或批作業的簡便方式,用戶不需要編寫代碼就可以從命令行調用 SQL,或者作為 CI / CD 流程的一部分。
這個版本極大的提高了 SQL Client 的功能。現在基於所有通過 Java 編程(即通過編程的方式調用 TableEnvironment 來發起查詢)可以支持的語法,現在 SQL Client 和 SQL 腳本都可以支持。這意味着 SQL 用戶不再需要添加膠水代碼來部署他們的SQL作業。
配置簡化和代碼共享
Flink 后續將不再支持通過 Yaml 的方式來配置 SQL Client(注:目前還在支持,但是已經被標記為廢棄)。作為替代,SQL Client 現在支持使用一個初始化腳本在主 SQL 腳本執行前來配置環境。
這些初始化腳本通常可以在不同團隊/部署之間共享。它可以用來加載常用的 catalog,應用通用的配置或者定義標准的視圖。
./sql-client.sh -i init1.sql init2.sql -f sqljob.sql
更多的配置項
通過增加配置項,優化 SET / RESET 命令,用戶可以更方便的在 SQL Client 和 SQL 腳本內部來控制執行的流程。
通過語句集合來支持多查詢
多查詢允許用戶在一個 Flink 作業中執行多個 SQL 查詢(或者語句)。這對於長期運行的流式 SQL 查詢非常有用。
語句集可以用來將一組查詢合並為一組同時執行。
以下是一個可以通過 SQL Client 來執行的 SQL 腳本的例子。它初始化和配置了執行多查詢的環境。這一腳本包括了所有的查詢和所有的環境初始化和配置的工作,從而使它可以作為一個自包含的部署組件。
-- set up a catalog CREATE CATALOG hive_catalog WITH ('type' = 'hive'); USE CATALOG hive_catalog; -- or use temporary objects CREATE TEMPORARY TABLE clicks ( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP ) WITH ( 'connector' = 'kafka', 'topic' = 'clicks', 'properties.bootstrap.servers' = '...', 'format' = 'avro' ); -- set the execution mode for jobs SET execution.runtime-mode=streaming; -- set the sync/async mode for INSERT INTOs SET table.dml-sync=false; -- set the job's parallelism SET parallism.default=10; -- set the job name SET pipeline.name = my_flink_job; -- restore state from the specific savepoint path SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-bb0dab; BEGIN STATEMENT SET; INSERT INTO pageview_pv_sink SELECT page_id, count(1) FROM clicks GROUP BY page_id; INSERT INTO pageview_uv_sink SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id; END;
Hive 查詢語法兼容性
用戶現在在 Flink 上也可以使用 Hive SQL 語法。除了 Hive DDL 方言之外,Flink現在也支持常用的 Hive DML 和 DQL 方言。
為了使用 Hive SQL 方言,需要設置 table.sql-dialect 為 hive 並且加載 HiveModule。后者非常重要,因為必須要加載 Hive 的內置函數后才能正確實現對 Hive 語法和語義的兼容性。例子如下:
CREATE CATALOG myhive WITH ('type' = 'hive'); -- setup HiveCatalog USE CATALOG myhive; LOAD MODULE hive; -- setup HiveModule USE MODULES hive,core; SET table.sql-dialect = hive; -- enable Hive dialect SELECT key, value FROM src CLUSTER BY key; -- run some Hive queries
需要注意的是, Hive 方言中不再支持 Flink 語法的 DML 和 DQL 語句。如果要使用 Flink 語法,需要切換回 default 的方言配置。
優化的 SQL 時間函數
在數據處理中時間處理是一個重要的任務。但是與此同時,處理不同的時區、日期和時間是一個日益復雜的任務。
在 Flink 1.13 中,我們投入了大量的精力來簡化時間函數的使用。我們調整了時間相關函數的返回類型使其更加精確,例如 PROCTIME(),CURRENT_TIMESTAMP() 和 NOW()。
其次,用戶現在還可以基於一個 TIMESTAMP_LTZ 類型的列來定義 Event Time 屬性,從而可以優雅的在窗口處理中支持夏令時。
用戶可以參考 Release Note 來查看該部分的完整變更。
PyFlink 核心優化
這個版本對 PyFlink 的改進主要是使基於 Python 的 DataStream API 與 Table API 與 Java/scala 版本的對應功能更加一致。
Python DataStream API 中的有狀態算子
在 Flink 1.13 中,Python 程序員可以享受到 Flink 狀態處理 API 的所有能力。在 Flink 1.12 版本重構過的 Python DataStream API 現在已經擁有完整的狀態訪問能力,從而使用戶可以將數據的信息記錄到 state 中並且在后續訪問。
帶狀態的處理能力是許多依賴跨記錄狀態共享(例如 Window Operator)的復雜數據處理場景的基礎。
以下例子展示了一個自定義的計算窗口的實現:
class CountWindowAverage(FlatMapFunction): def __init__(self, window_size): self.window_size = window_size def open(self, runtime_context: RuntimeContext): descriptor = ValueStateDescriptor("average", Types.TUPLE([Types.LONG(), Types.LONG()])) self.sum = runtime_context.get_state(descriptor) def flat_map(self, value): current_sum = self.sum.value() if current_sum is None: current_sum = (0, 0) # update the count current_sum = (current_sum[0] + 1, current_sum[1] + value[1]) # if the count reaches window_size, emit the average and clear the state if current_sum[0] >= self.window_size: self.sum.clear() yield value[0], current_sum[1] // current_sum[0] else: self.sum.update(current_sum) ds = ... # type: DataStream ds.key_by(lambda row: row[0]) \ .flat_map(CountWindowAverage(5))
PyFlink DataStream API 中的用戶自定義窗口
Flink 1.13 中 PyFlink DataStream 接口增加了對用戶自定義窗口的支持,現在用戶可以使用標准窗口之外的窗口定義。
由於窗口是處理無限數據流的核心機制 (通過將流切分為多個有限的『桶』),這一功能極大的提高的 API 的表達能力。
PyFlink Table API 中基於行的操作
Python Table API 現在支持基於行的操作,例如用戶對行數據的自定義函數。這一功能使得用戶可以使用非內置的數據處理函數。
一個使用 map() 操作的 Python Table API 示例如下:
@udf(result_type=DataTypes.ROW( [DataTypes.FIELD("c1", DataTypes.BIGINT()), DataTypes.FIELD("c2", DataTypes.STRING())])) def increment_column(r: Row) -> Row: return Row(r[0] + 1, r[1]) table = ... # type: Table mapped_result = table.map(increment_column)
除了 map(),這一 API 還支持 flat_map(),aggregate(),flat_aggregate() 和其它基於行的操作。這使 Python Table API 的功能與 Java Table API 的功能更加接近。
PyFlink DataStream API 支持 Batch 執行模式
對於有限流,PyFlink DataStream API 現在已經支持 Flink 1.12 DataStream API 中引入的 Batch 執行模式。
通過復用數據有限性來跳過 State backend 和 Checkpoint 的處理,Batch 執行模式可以簡化運維,並且提高有限流處理的性能。
其它優化
基於 Hugo 的 Flink 文檔
Flink 文檔從 JekyII 遷移到了 Hugo。如果您發現有問題,請務必通知我們,我們非常期待用戶對新的界面的感受。
Web UI 支持歷史異常
Flink Web UI 現在可以展示導致作業失敗的 n 次歷史異常,從而提升在一個異常導致多個后續異常的場景下的調試體驗。用戶可以在異常歷史中找到根異常。
優化失敗 Checkpoint 的異常和失敗原因的匯報
Flink 現在提供了失敗或被取消的 Checkpoint 的統計,從而使用戶可以更簡單的判斷 Checkpoint 失敗的原因,而不需要去查看日志。
Flink 之前的版本只有在 Checkpoint 成功的時候才會匯報指標(例如持久化數據的大小、觸發時間等)。
提供『恰好一次』一致性的 JDBC Sink
從 1.13 開始,通過使用事務提交數據,JDBC Sink 可以對支持 XA 事務的數據庫提供『恰好一次』的一致性支持。這一特性要求目標數據庫必須有(或鏈接到)一個 XA 事務處理器。
這一 Sink 現在只能在 DataStream API 中使用。用戶可以通過 JdbcSink.exactlyOnceSink(…) 來創建這一 Sink(或者通過顯式初始化一個 JdbcXaSinkFunction)。
PyFlink Table API 在 Group 窗口上支持用戶自定義的聚合函數
PyFlink Table API 現在對 Group 窗口同時支持基於 Python 的用戶自定義聚合函數(User-defined Aggregate Functions, UDAFs)以及 Pandas UDAFs。這些函數對許多數據分析或機器學習訓練的程序非常重要。
在 Flink 1.13 之前,這些函數僅能在無限的 Group-by 聚合場景下使用。Flink 1.13 優化了這一限制。
Batch 執行模式下 Sort-merge Shuffle 優化
Flink 1.13 優化了針對批處理程序的 Sort-merge Blocking Shuffle 的性能和內存占用情況。這一 Shuffle 模式是在Flink 1.12 的 FLIP-148 中引入的。
這一優化避免了大規模作業下不斷出現 OutOfMemoryError: Direct Memory 的問題,並且通過 I/O 調度和 broadcast 優化提高了性能(尤其是在機械硬盤上)。
HBase 連接器支持異步維表查詢和查詢緩存
HBase Lookup Table Source 現在可以支持異步查詢模式和查詢緩存。這極大的提高了使用這一 Source 的 Table / SQL 維表 Join 的性能,並且在一些典型情況下可以減少對 HBase 的 I/O 請求數量。
在之前的版本中,HBase Lookup Source 僅支持同步通信,從而導致作業吞吐以及資源利用率降低。
升級 Flink 1.13 需要注意的改動
- FLINK-21709 – 老的 Table & SQL API 計划器已經被標記為廢棄,並且將在 Flink 1.14 中被刪除。Blink 計划器在若干版本之前已經被設置為默認計划器,並且將成為未來版本中的唯一計划器。這意味着 BatchTableEnvironment 和 DataSet API 互操作后續也將不再支持。用戶需要切換到統一的 TableEnvironment 來編寫流或者批的作業。
- FLINK-22352 – Flink 社區決定廢棄對 Apache mesos 的支持,未來有可能會進一步刪除這部分功能。用戶最好能夠切換到其它的資源管理系統上。
- FLINK-21935 – state.backend.async 這一配置已經被禁用了,因為現在 Flink 總是會異步的來保存快照(即之前的配置默認值),並且現在沒有實現可以支持同步的快照保存操作。
- FLINK-17012 – Task 的 RUNNING 狀態被細分為兩步:INITIALIZING 和 RUNNING。Task 的 INITIALIZING 階段包括加載 state 和在啟用 unaligned checkpoint 時恢復 In-flight 數據的過程。通過顯式區分這兩種狀態,監控系統可以更好的區分任務是否已經在實際工作。
- FLINK-21698 – NUMERIC 和 TIMESTAMP 類型之間的直接轉換存在問題,現在已經被禁用,例如 CAST(numeric AS TIMESTAMP(3))。用戶應該使用 TO_TIMESTAMP(FROM_UNIXTIME(numeric)) 來代替。
- FLINK-22133 – 新的 Source 接口有一個小的不兼容的修改,即 SplitEnumerator.snapshotState() 方法現在多接受一個 checkpoint id 參數來表示正在進行的 snapshot 操作所屬的 checkpoint 的 id。
- FLINK-19463 – 由於老的 Statebackend 接口承載了過多的語義並且容易引起困惑,這一接口被標記為廢棄。這是一個純 API 層的改動,而並不會影響應用運行時。對於如何升級現有作業,請參考作業遷移指引 。
其它資源
二進制和代碼可以從 Flink 官網的下載頁面獲得,最新的 PyFlink 發布可以從 PyPI 獲得。
如果想要升級到 Flink 1.13,請參考發布說明。這一版本與之前 1.x 的版本在標記為@Public 的接口上是兼容的。
用戶也可以查看新版本修改列表與更新后的文檔來獲得修改和新功能的詳細列表。
本文為阿里雲原創內容,未經允許不得轉載。