Flink調優


第1章 資源配置調優

  Flink性能調優的第一步,就是為任務分配合適的資源,在一定范圍內,增加資源的分配與性能的提升是成正比的,實現了最優的資源配置后,在此基礎上再考慮進行后面論述的性能調優策略。

  提交方式主要是yarn-per-job,資源的分配在使用腳本提交Flink任務時進行指定。

  標准的Flink任務提交腳本Generic CLI 模式),1.11開始,增加了通用客戶端模式,參數使用-D <property=value>指定

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定並行度
-Dyarn.application.queue=test \ 指定yarn隊列
-Djobmanager.memory.process.size=1024mb \ 指定JM的總進程大小
-Dtaskmanager.memory.process.size=1024mb \ 指定每個TM的總進程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每個TM的slot數
-c com.yuange.app.dwd.LogBaseApp \
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar

  參數列表:https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html

1.1 內存設置

  生產資源配置:

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定並行度
-Dyarn.application.queue=test \ 指定yarn隊列
-Djobmanager.memory.process.size=2048mb \ JM2~4G足夠
-Dtaskmanager.memory.process.size=6144mb \ 單個TM2~8G足夠
-Dtaskmanager.numberOfTaskSlots=2 \ 與容器核數1core:1slot或1core:2slot
-c com.yuange.app.dwd.LogBaseApp \
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar

  Flink是實時流處理,關鍵在於資源情況能不能抗住高峰時期每秒的數據量,通常用QPS/TPS來描述數據情況。

1.2 並行度設置

1.2.1 最優並行度計算

  開發完成后,先進行壓測。任務並行度給10以下,測試單個並行度的處理上限。然后 QPS/單並行度的處理能力 = 並行度,不能只從QPS去得出並行度,因為有些字段少、邏輯簡單的任務,單並行度一秒處理幾萬條數據。而有些數據字段多,處理邏輯復雜,單並行度一秒只能處理1000條數據。最好根據高峰期的QPS壓測,並行度*1.2倍,富余一些資源。

1.2.2 Source 端並行度的配置

  數據源端是 Kafka,Source的並行度設置為Kafka對應Topic的分區數。

  如果已經等於 Kafka 的分區數,消費速度仍跟不上數據生產速度,考慮下Kafka 要擴大分區,同時調大並行度等於分區數。

  Flink 的一個並行度可以處理一至多個分區的數據,如果並行度多於 Kafka 的分區數,那么就會造成有的並行度空閑,浪費資源。

1.2.3 Transform端並行度的配置

  1)Keyby之前的算子

    一般不會做太重的操作,都是比如map、filter、flatmap等處理較快的算子,並行度可以和source保持一致。

  2)Keyby之后的算子

    如果並發較大,建議設置並行度為 2 的整數次冪,例如:128、256、512;

    小並發任務的並行度不一定需要設置成 2 的整數次冪;

    大並發任務如果沒有 KeyBy,並行度也無需設置為 2 的整數次冪;

1.2.4 Sink 端並行度的配置

  Sink 端是數據流向下游的地方,可以根據 Sink 端的數據量及下游的服務抗壓能力進行評估。如果Sink端是Kafka,可以設為Kafka對應Topic的分區數。

  Sink 端的數據量小,比較常見的就是監控告警的場景,並行度可以設置的小一些。

  Source 端的數據量是最小的,拿到 Source 端流過來的數據后做了細粒度的拆分,數據量不斷的增加,到 Sink 端的數據量就非常大。那么在 Sink 到下游的存儲中間件的時候就需要提高並行度。

  另外 Sink 端要與下游的服務進行交互,並行度還得根據下游的服務抗壓能力來設置,如果在 Flink Sink 這端的數據量過大的話,且 Sink 處並行度也設置的很大,但下游的服務完全撐不住這么大的並發寫入,可能會造成下游服務直接被寫掛,所以最終還是要在 Sink 處的並行度做一定的權衡。

1.3 RocksDB大狀態調優

  RocksDB 是基於 LSM Tree 實現的(類似HBase),寫數據都是先緩存到內存中,所以RocksDB 的寫請求效率比較高。RocksDB 使用內存結合磁盤的方式來存儲數據,每次獲取數據時,先從內存中 blockcache 中查找,如果內存中沒有再去磁盤中查詢。優化后差不多單並行度 TPS 5000 record/s,性能瓶頸主要在於 RocksDB 對磁盤的讀請求,所以當處理性能不夠時,僅需要橫向擴展並行度即可提高整個Job 的吞吐量。以下幾個調優參數:

  1)設置本地 RocksDB 目錄,在flink-conf.yaml 中配置:

state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb

  注意:不要配置單塊磁盤的多個目錄,務必將目錄配置到多塊不同的磁盤上,讓多塊磁盤來分擔壓力。當設置多個 RocksDB 本地磁盤目錄時,Flink 會隨機選擇要使用的目錄,所以就可能存在三個並行度共用同一目錄的情況。如果服務器磁盤數較多,一般不會出現該情況,但是如果任務重啟后吞吐量較低,可以檢查是否發生了多個並行度共用同一塊磁盤的情況。

  當一個 TaskManager 包含 3 個 slot 時,那么單個服務器上的三個並行度都對磁盤造成頻繁讀寫,從而導致三個並行度的之間相互爭搶同一個磁盤 io,這樣務必導致三個並行度的吞吐量都會下降。設置多目錄實現三個並行度使用不同的硬盤從而減少資源競爭。

  如下所示是測試過程中磁盤的 IO 使用率,可以看出三個大狀態算子的並行度分別對應了三塊磁盤,這三塊磁盤的 IO 平均使用率都保持在 45% 左右,IO 最高使用率幾乎都是 100%,而其他磁盤的 IO 平均使用率相對低很多。由此可見使用 RocksDB 做為狀態后端且有大狀態的頻繁讀取時, 對磁盤IO性能消耗確實比較大。

  如下圖所示,其中兩個並行度共用了 sdb 磁盤,一個並行度使用 sdj磁盤。可以看到 sdb 磁盤的 IO 使用率已經達到了 91.6%,就會導致 sdb 磁盤對應的兩個並行度吞吐量大大降低,從而使得整個 Flink 任務吞吐量降低。如果每個服務器上有一兩塊 SSD,強烈建議將 RocksDB 的本地磁盤目錄配置到 SSD 的目錄下,從 HDD 改為 SSD 對於性能的提升可能比配置 10 個優化參數更有效。 

  1)state.backend.incremental開啟增量檢查點,默認false,改為true。

  2)state.backend.rocksdb.predefined-optionsSPINNING_DISK_OPTIMIZED_HIGH_MEM設置為機械硬盤+內存模式,有條件上SSD,指定為FLASH_SSD_OPTIMIZED

  3)state.backend.rocksdb.block.cache-size: 整個 RocksDB 共享一個 block cache,讀數據時內存的 cache 大小,該參數越大讀數據時緩存命中率越高,默認大小為 8 MB,建議設置到 64 ~ 256 MB。

  4)state.backend.rocksdb.thread.num: 用於后台 flush 和合並 sst 文件的線程數,默認為 1,建議調大,機械硬盤用戶可以改為 4 等更大的值。

  5)state.backend.rocksdb.writebuffer.size: RocksDB 中,每個 State 使用一個 Column Family,每個 Column Family 使用獨占的 write buffer,建議調大,例如:32M

  6)state.backend.rocksdb.writebuffer.count: 每個 Column Family 對應的 writebuffer 數目,默認值是 2,對於機械磁盤來說,如果內存⾜夠大,可以調大到 5 左右

  7)state.backend.rocksdb.writebuffer.number-to-merge: 將數據從 writebuffer 中 flush 到磁盤時,需要合並的 writebuffer 數量,默認值為 1,可以調成3。

  8)state.backend.local-recovery: 設置本地恢復,當 Flink 任務失敗時,可以基於本地的狀態信息進行恢復任務,可能不需要從 hdfs 拉取數據

1.4 Checkpoint設置

  一般我們的 Checkpoint 時間間隔可以設置為分鍾級別,例如 1 分鍾、3 分鍾,對於狀態很大的任務每次 Checkpoint 訪問 HDFS 比較耗時,可以設置為 5~10 分鍾一次Checkpoint,並且調大兩次 Checkpoint 之間的暫停間隔,例如設置兩次Checkpoint 之間至少暫停 4或8 分鍾

  如果 Checkpoint 語義配置為 EXACTLY_ONCE,那么在 Checkpoint 過程中還會存在 barrier 對齊的過程,可以通過 Flink Web UI 的 Checkpoint 選項卡來查看 Checkpoint 過程中各階段的耗時情況,從而確定到底是哪個階段導致 Checkpoint 時間過長然后針對性的解決問題。

  RocksDB相關參數在1.3中已說明,可以在flink-conf.yaml指定,也可以在Job的代碼中調用API單獨指定,這里不再列出。

 // 使⽤ RocksDBStateBackend 做為狀態后端,並開啟增量 Checkpoint
 RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("hdfs://hadoop1:8020/flink/checkpoints", true);

 env.setStateBackend(rocksDBStateBackend);

 // 開啟Checkpoint,間隔為 3 分鍾
 env.enableCheckpointing(TimeUnit.MINUTES.toMillis(3));

 // 配置 Checkpoint
 CheckpointConfig checkpointConf = env.getCheckpointConfig();
 checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

 // 最小間隔 4分鍾
checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(4))

 // 超時時間 10分鍾
 checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));

 // 保存checkpoint
 checkpointConf.enableExternalizedCheckpoints(

CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

1.5 使 Flink ParameterTool 讀取置

  在實際開發中,有各種環境(開發、測試、預發、生產),作業也有很多的配置:算子的並行度配置、Kafka 數據源的配置(broker 地址、topic 名、group.id)、Checkpoint 是否開啟、狀態后端存儲路徑、數據庫地址、用戶名和密碼等各種各樣的配置,可能每個環境的這些配置對應的值都是不一樣的。

  如果你是直接在代碼⾥⾯寫死的配置,每次換個環境去運行測試作業,都要重新去修改代碼中的配置,然后編譯打包,提交運行,這樣就要花費很多時間在這些重復的勞動力上了。在 Flink 中可以通過使用 ParameterTool 類讀取配置,它可以讀取環境變量、運行參數、配置文件。

  ParameterTool 是可序列化的,所以你可以將它當作參數進行傳遞給算子的自定義函數類。

1.5.1 讀取運行參數

  我們可以在Flink的提交腳本添加運行參數,格式:

--參數名 參數值
--參數名 參數值

  在 Flink 程序中可以直接使用 ParameterTool.fromArgs(args) 獲取到所有的參數,也可以通過 parameterTool.get("username") 方法獲取某個參數對應的值。

  舉例:通過運行參數指定jobname

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定並行度
-Dyarn.application.queue=test \ 指定yarn隊列
-Djobmanager.memory.process.size=1024mb \ 指定JM的總進程大小
-Dtaskmanager.memory.process.size=1024mb \ 指定每個TM的總進程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每個TM的slot數
-c com.yuange.app.dwd.LogBaseApp \
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar \
--jobname dwd-LogBaseApp  //參數名自己隨便起,代碼里對應上即可

  在代碼里獲取參數值:

ParameterTool parameterTool = ParameterTool.fromArgs(args);
String myJobname = parameterTool.get("jobname");  //參數名對應
env.execute(myJobname);

1.5.2 讀取系統屬性

  ParameterTool 還⽀持通過 ParameterTool.fromSystemProperties() 方法讀取系統屬性。做個打印:

ParameterTool parameterTool = ParameterTool.fromSystemProperties();
System.out.println(parameterTool.toMap().toString());

  可以得到全面的系統屬性,部分結果:

1.5.3 讀取配置文件

  可以使用ParameterTool.fromPropertiesFile("/application.properties") 讀取 properties 配置文件。可以將所有要配置的地方(比如並行度和一些 Kafka、MySQL 等配置)都寫成可配置的,然后其對應的 key 和 value 值都寫在配置文件中,最后通過 ParameterTool 去讀取配置文件獲取對應的值。

1.5.4 注冊全局參數

  在 ExecutionConfig 中可以將 ParameterTool 注冊為全作業參數的參數,這樣就可以被 JobManager 的web 端以及用戶⾃定義函數中以配置值的形式訪問。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));

  可以不用將ParameterTool當作參數傳遞給算子的自定義函數,直接在用戶⾃定義的 Rich 函數中直接獲取到參數值了。

env.addSource(new RichSourceFunction() { 
@Override 
public void run(SourceContext sourceContext) throws Exception {
 while (true) { 
    ParameterTool parameterTool = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
   }
  } 

  @Override 
  public void cancel() {
  }
})

1.6 壓測方式

  壓測的方式很簡單,先在kafka中積壓數據,之后開啟Flink任務,出現反壓,就是處理瓶頸。相當於水庫先積水,一下子泄洪。數據可以是自己造的模擬數據,也可以是生產中的部分數據。

第2章 反壓處理

  反壓(BackPressure)通常產生於這樣的場景:短時間的負載高峰導致系統接收數據的速率遠高於它處理數據的速率。許多日常問題都會導致反壓,例如,垃圾回收停頓可能會導致流入的數據快速堆積,或遇到大促、秒殺活動導致流量陡增。反壓如果不能得到正確的處理,可能會導致資源耗盡甚至系統崩潰。

  反壓機制是指系統能夠自己檢測到被阻塞的 Operator,然后自適應地降低源頭或上游數據的發送速率,從而維持整個系統的穩定。Flink 任務一般運行在多個節點上,數據從上游算子發送到下游算子需要網絡傳輸,若系統在反壓時想要降低數據源頭或上游算子數據的發送速率,那么肯定也需要網絡傳輸。所以下面先來了解一下 Flink 的網絡流控(Flink 對網絡數據流量的控制)機制。

2.1 反壓現象及定位

  Flink 的反壓太過於天然了,導致無法簡單地通過監控 BufferPool 的使用情況來判斷反壓狀態。Flink 通過對運行中的任務進行采樣來確定其反壓,如果一個 Task 因為反壓導致處理速度降低了,那么它肯定會卡在向 LocalBufferPool 申請內存塊上。那么該 Task 的 stack trace 應該是這樣:

java.lang.Object.wait(Native Method)

o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163) o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) [...]

  監控對正常的任務運行有一定影響,因此只有當 Web 頁面切換到 Job 的 BackPressure 頁面時,JobManager 才會對該 Job 觸發反壓監控。默認情況下,JobManager 會觸發 100 次 stack trace 采樣,每次間隔 50ms 來確定反壓。Web 界面看到的比率表示在內部方法調用中有多少 stack trace 被卡在LocalBufferPool.requestBufferBlocking(),例如: 0.01 表示在 100 個采樣中只有 1 個被卡在LocalBufferPool.requestBufferBlocking()。采樣得到的比例與反壓狀態的對應關系如下:

  1)OK: 0 <= 比例 <= 0.10

  2)LOW: 0.10 < 比例 <= 0.5

  3)HIGH: 0.5 < 比例 <= 1

  Task 的狀態為 OK 表示沒有反壓,HIGH 表示這個 Task 被反壓。

2.1.1 利用 Flink Web UI 定位產生反壓的位置

  在 Flink Web UI 中有 BackPressure 的頁面,通過該頁面可以查看任務中 subtask 的反壓狀態,如下兩圖所示,分別展示了狀態是 OK 和 HIGH 的場景。排查的時候,先把operator chain禁用,方便定位。

2.1.2 利用Metrics定位反壓位置

  當某個 Task 吞吐量下降時,基於 Credit 的反壓機制,上游不會給該 Task 發送數據,所以該 Task 不會頻繁卡在向 Buffer Pool 去申請 Buffer。反壓監控實現原理就是監控 Task 是否卡在申請 buffer 這一步,所以遇到瓶頸的 Task 對應的反壓⻚⾯必然會顯示 OK,即表示沒有受到反壓。

  如果該 Task 吞吐量下降,造成該Task 上游的 Task 出現反壓時,必然會存在: Task 對應的 InputChannel 變滿,已經申請不到可用的Buffer 空間。如果該 Task 的 InputChannel 還能申請到可用 Buffer,那么上游就可以給該 Task 發送數據,上游 Task 也就不會被反壓了,所以說遇到瓶頸且導致上游 Task 受到反壓的 Task 對應的 InputChannel 必然是滿的(這⾥不考慮⽹絡遇到瓶頸的情況)。從這個思路出發,可以對該 Task 的 InputChannel 的使用情況進行監控,如果 InputChannel 使用率 100%,那么該 Task 就是我們要找的反壓源。Flink 1.9 及以上版本inPoolUsage 表示 inputFloatingBuffersUsage 和inputExclusiveBuffersUsage 的總和。

  反壓時,可以看到遇到瓶頸的該Task的inPoolUage為1

2.2 反壓的原因及處理

  先檢查基本原因,然后再深入研究更復雜的原因,最后找出導致瓶頸的原因。下面列出從最基本到比較復雜的一些反壓潛在原因。

  注意:反壓可能是暫時的,可能是由於負載高峰、CheckPoint 或作業重啟引起的數據積壓而導致反壓。如果反壓是暫時的,應該忽略它。另外,請記住,斷斷續續的反壓會影響我們分析和解決問題。

2.2.1 系統資源

  檢查涉及服務器基本資源的使用情況,如CPU、網絡或磁盤I/O,目前 Flink 任務使用最主要的還是內存和 CPU 資源,本地磁盤、依賴的外部存儲資源以及網卡資源一般都不會是瓶頸。如果某些資源被充分利用或大量使用,可以借助分析工具,分析性能瓶頸(JVM Profiler+ FlameGraph生成火焰圖)。

  如何生成火焰圖:http://www.54tianzhisheng.cn/2020/10/05/flink-jvm-profiler/

  如何讀懂火焰圖:https://zhuanlan.zhihu.com/p/29952444

  1)針對特定的資源調優Flink

  2)通過增加並行度或增加集群中的服務器數量來橫向擴展

  3)減少瓶頸算子上游的並行度,從而減少瓶頸算子接收的數據量(不建議,可能造成整個Job數據延遲增大)

2.2.2 垃圾收集(GC)

  長時間GC暫停會導致性能問題。可以通過打印調試GC日志(通過-XX:+PrintGCDetails)或使用某些內存或 GC 分析器GCViewer工具)來驗證是否處於這種情況。

  1)Flink提交腳本中,設置JVM參數,打印GC日志:

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定並行度
-Dyarn.application.queue=test \ 指定yarn隊列
-Djobmanager.memory.process.size=1024mb \ 指定JM的總進程大小
-Dtaskmanager.memory.process.size=1024mb \ 指定每個TM的總進程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每個TM的slot數
-Denv.java.opts="-XX:+PrintGCDetails -XX:+PrintGCDateStamps"
-c com.yuange.app.dwd.LogBaseApp \
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar

  2)下載GC日志的方式:

    因為是on yarn模式,運行的節點一個一個找比較麻煩。可以打開WebUI,選擇JobManager或者TaskManager,點擊Stdout,即可看到GC日志,點擊下載按鈕即可將GC日志通過HTTP的方式下載下來。

  分析GC日志:

    通過 GC 日志分析出單個 Flink Taskmanager 堆總大小、年輕代、老年代分配的內存空間、Full GC 后老年代剩余大小等,相關指標定義可以去 Github 具體查看。

    GCViewer地址:https://github.com/chewiebug/GCViewer

  擴展:最重要的指標是Full GC 后,老年代剩余大小這個指標,按照《Java 性能優化權威指南》這本書 Java 堆大小計算法則,設 Full GC 后老年代剩余大小空間為 M,那么堆的大小建議 3 ~ 4倍 M,新生代為 1 ~ 1.5 倍 M,老年代應為 2 ~ 3 倍 M。

2.2.3 CPU/線程瓶頸

  有時,一個或幾個線程導致 CPU 瓶頸,而整個機器的CPU使用率仍然相對較低,則可能無法看到 CPU 瓶頸。例如,48核的服務器上,單個 CPU 瓶頸的線程僅占用 2%的 CPU 使用率,就算單個線程發生 CPU 瓶頸,我們也看不出來。可以考慮使用2.2.1提到的分析工具,它們可以顯示每個線程的 CPU 使用情況來識別熱線程。

2.2.4 線程競爭

  與上⾯的 CPU/線程瓶頸問題類似,subtask 可能會因為共享資源上高負載線程的競爭而成為瓶頸。同樣,可以考慮使用2.2.1提到的分析工具,考慮在用戶代碼中查找同步開銷、鎖競爭,盡管避免在用戶代碼中添加同步。

2.2.5 負載不平衡

  如果瓶頸是由數據傾斜引起的,可以嘗試通過將數據分區的 key 進行加鹽或通過實現本地預聚合來減輕數據傾斜的影響。(關於數據傾斜的詳細解決方案,會在下一章節詳細討論)

2.2.6 外部依賴

  如果發現我們的 Source 端數據讀取性能比較低或者 Sink 端寫入性能較差,需要檢查第三方組件是否遇到瓶頸。例如,Kafka 集群是否需要擴容,Kafka 連接器是否並行度較低,HBase 的 rowkey 是否遇到熱點問題。關於第三方組件的性能問題,需要結合具體的組件來分析。

第3章 數據傾斜

3.1 判斷是否存在數據傾斜

  相同 Task 的多個 Subtask 中,個別Subtask 接收到的數據量明顯大於其他 Subtask 接收到的數據量,通過 Flink Web UI 可以精確地看到每個 Subtask 處理了多少數據,即可判斷出 Flink 任務是否存在數據傾斜。通常,數據傾斜也會引起反壓。

3.2 數據傾斜的解決

3.2.1 keyBy 后的聚合操作存在數據傾斜

  使用LocalKeyBy的思想:在 keyBy 上游算子數據發送之前,首先在上游算子的本地對數據進行聚合后再發送到下游,使下游接收到的數據量大大減少,從而使得 keyBy 之后的聚合操作不再是任務的瓶頸。類似MapReduce 中 Combiner 的思想,但是這要求聚合操作必須是多條數據或者一批數據才能聚合,單條數據沒有辦法通過聚合來減少數據量。從Flink LocalKeyBy 實現原理來講,必然會存在一個積攢批次的過程,在上游算子中必須攢夠一定的數據量,對這些數據聚合后再發送到下游。

  注意:Flink是實時流處理,如果keyby之后的聚合操作存在數據傾斜,且沒有開窗口的情況下,簡單的認為使用兩階段聚合,是不能解決問題的。因為這個時候Flink是來一條處理一條,且向下游發送一條結果,對於原來keyby的維度(第二階段聚合)來講,數據量並沒有減少,且結果重復計算(非FlinkSQL,未使用回撤流),如下圖所示:

  實現方式:以計算PV為例,keyby之前,使用flatMap實現LocalKeyby

class LocalKeyByFlatMap extends RichFlatMapFunction<String, Tuple2<String, 
 //Checkpoint 時為了保證 Exactly Once,將 buffer 中的數據保存到該 ListState 中
 private ListState<Tuple2<String, Long>> localPvStatListState;
 
 //本地 buffer,存放 local 端緩存的 app 的 pv 信息
 private HashMap<String, Long> localPvStat;
 
 //緩存的數據量大小,即:緩存多少數據再向下游發送
 private int batchSize;
 
 //計數器,獲取當前批次接收的數據量
 private AtomicInteger currentSize;

 //構造器,批次大小傳參
 LocalKeyByFlatMap(int batchSize){
     this.batchSize = batchSize;
 }

 @Override
 public void flatMap(String in, Collector collector) throws Exception {
     // 將新來的數據添加到 buffer 中
     Long pv = localPvStat.getOrDefault(in, 0L);
     localPvStat.put(in, pv + 1);
     // 如果到達設定的批次,則將 buffer 中的數據發送到下游
     if(currentSize.incrementAndGet() >= batchSize){
         // 遍歷 Buffer 中數據,發送到下游
         for(Map.Entry<String, Long> appIdPv: localPvStat.entrySet()) {
             collector.collect(Tuple2.of(appIdPv.getKey(), appIdPv.getValue()
         }
         // Buffer 清空,計數器清零
         localPvStat.clear();
         currentSize.set(0);
     }
 }

 @Override
 public void snapshotState(FunctionSnapshotContext functionSnapshotConte
     // 將 buffer 中的數據保存到狀態中,來保證 Exactly Once
     localPvStatListState.clear();
     for(Map.Entry<String, Long> appIdPv: localPvStat.entrySet()) {
         localPvStatListState.add(Tuple2.of(appIdPv.getKey(), appIdPv.ge
     }
 }

 @Override
 public void initializeState(FunctionInitializationContext context) {
     // 從狀態中恢復 buffer 中的數據
     localPvStatListState = context.getOperatorStateStore().getListState
     new ListStateDescriptor<>("localPvStat",
     TypeInformation.of(new TypeHint<Tuple2<String, Long>>})));
     localPvStat = new HashMap();
     if(context.isRestored()) {
         // 從狀態中恢復數據到 localPvStat 中
         for(Tuple2<String, Long> appIdPv: localPvStatListState.get()){
long pv = localPvStat.getOrDefault(appIdPv.f0, 0L);
             // 如果出現 pv != 0,說明改變了並行度,
             // ListState 中的數據會被均勻分發到新的 subtask中
             // 所以單個 subtask 恢復的狀態中可能包含兩個相同的 app 的數據
             localPvStat.put(appIdPv.f0, pv + appIdPv.f1);
         }
         // 從狀態恢復時,默認認為 buffer 中數據量達到了 batchSize,需要向下游發
         currentSize = new AtomicInteger(batchSize);
     } else {
         currentSize = new AtomicInteger(0);
     }
 }
}

3.2.2 keyBy 之前發生數據傾斜

  如果 keyBy 之前就存在數據傾斜,上游算子的某些實例可能處理的數據較多,某些實例可能處理的數據較少,產生該情況可能是因為數據源的數據本身就不均勻,例如由於某些原因 Kafka 的 topic 中某些 partition 的數據量較大,某些 partition 的數據量較少。對於不存在 keyBy 的 Flink 任務也會出現該情況。

  這種情況,需要讓 Flink 任務強制進行shuffle。使用shuffle、rebalance 或 rescale算子即可將數據均勻分配,從而解決數據傾斜的問題。

3.2.3 keyBy 后的窗口聚合操作存在數據傾斜

  因為使用了窗口,變成了有界數據的處理(3.2.1已分析過),窗口默認是觸發時才會輸出一條結果發往下游,所以可以使用兩階段聚合的方式:

  實現思路:

    1)第一階段聚合:key拼接隨機數前綴或后綴,進行keyby、開窗、聚合

      注意:聚合完不再是WindowedStream,要獲取WindowEnd作為窗口標記作為第二階段分組依據,避免不同窗口的結果聚合到一起)

    2)第二階段聚合:去掉隨機數前綴或后綴,按照原來的key及windowEnd作keyby、聚合

第4章 KafkaSource調優

4.1 動態發現分區

  當 FlinkKafkaConsumer 初始化時,每個 subtask 會訂閱一批 partition,但是當 Flink 任務運行過程中,如果被訂閱的 topic 創建了新的 partition,FlinkKafkaConsumer 如何實現動態發現新創建的 partition 並消費呢?

  在使用 FlinkKafkaConsumer 時,可以開啟 partition 的動態發現。通過 Properties指定參數開啟(單位是毫秒):

FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS 

  該參數表示間隔多久檢測一次是否有新創建的 partition。默認值是Long的最小值,表示不開啟,大於0表示開啟。開啟時會啟動一個線程根據傳入的interval定期獲取Kafka最新的元數據,新 partition 對應的那一個 subtask 會自動發現並從earliest 位置開始消費,新創建的 partition 對其他 subtask 並不會產生影響。

  代碼如下所示:

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, 30 * 1000 + "");

4.2 kafka數據源生成watermark

  Kafka單分區內有序,多分區間無序。在這種情況下,可以使用 Flink 中可識別 Kafka 分區的 watermark 生成機制。使用此特性,將在 Kafka 消費端內部針對每個 Kafka 分區生成 watermark,並且不同分區 watermark 的合並方式與在數據流 shuffle 時的合並方式相同。

  在單分區內有序的情況下,使用時間戳單調遞增按分區生成的 watermark 將生成完美的全局 watermark。可以不使用 TimestampAssigner直接 Kafka 記錄自身的時間戳

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.setProperty("group.id", "fffffffffff");


FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
                "flinktest",
                new SimpleStringSchema(),
                properties
        );

kafkaSourceFunction.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .forBoundedOutOfOrderness(Duration.ofMinutes(2))
);

env.addSource(kafkaSourceFunction)

4.3 設置空閑等待

  如果數據源中的某一個分區/分片在一段時間內未發送事件數據,則意味着 WatermarkGenerator 也不會獲得任何新數據去生成 watermark。我們稱這類數據源為空閑輸入或空閑源。在這種情況下,當某些其他分區仍然發送事件數據的時候就會出現問題。比如Kafka的Topic中,由於某些原因,造成個別Partition一直沒有新的數據。由於下游算子 watermark 的計算方式是取所有不同的上游並行數據源 watermark 的最小值,則其 watermark 將不會發生變化,導致窗口、定時器等不會被觸發。

  為了解決這個問題,你可以使用 WatermarkStrategy 來檢測空閑輸入並將其標記為空閑狀態。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
properties.setProperty("group.id", "fffffffffff");


FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
                "flinktest",
                new SimpleStringSchema(),
                properties
        );

kafkaSourceFunction.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .forBoundedOutOfOrderness(Duration.ofMinutes(2))
                        .withIdleness(Duration.ofMinutes(5))
);

env.addSource(kafkaSourceFunction)

4.4 Kafka的offset消費策略

  FlinkKafkaConsumer可以調用以下API,注意與”auto.offset.reset”區分開:

  1)setStartFromGroupOffsets():默認消費策略,默認讀取上次保存的offset信息,如果是應用第一次啟動,讀取不到上次的offset信息,則會根據這個參數auto.offset.reset的值來進行消費數據。建議使用這個。

  2)setStartFromEarliest():從最早的數據開始進行消費,忽略存儲的offset信息

  3)setStartFromLatest():從最新的數據進行消費,忽略存儲的offset信息

  4)setStartFromSpecificOffsets(Map):從指定位置進行消費

  5)setStartFromTimestamp(long):topic中指定的時間點開始消費,指定時間點之前的數據忽略

  6)當checkpoint機制開啟的時候,KafkaConsumer會定期把kafka的offset信息還有其他operator的狀態信息一塊保存起來。當job失敗重啟的時候,Flink會從最近一次的checkpoint中進行恢復數據,重新從保存的offset消費kafka中的數據(也就是說,上面幾種策略,只有第一次啟動的時候起作用)。

  7)為了能夠使用支持容錯的kafka Consumer,需要開啟checkpoint

第5章 FlinkSQL調優

  FlinkSQL官網配置參數:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/config.html

5.1 Group Aggregate優化

5.1.1 開啟MiniBatch(提升吞吐)

  MiniBatch是微批處理,原理是緩存一定的數據后再觸發處理,以減少對State的訪問,從而提升吞吐並減少數據的輸出量MiniBatch主要依靠在每個Task上注冊的Timer線程來觸發微批,需要消耗一定的線程調度性能。

  1)MiniBatch默認關閉,開啟方式如下:

// 初始化table environment
TableEnvironment tEnv = ...

// 獲取 tableEnv的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();

// 設置參數:
// 開啟miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量輸出的間隔時間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止OOM設置每個批次最多緩存數據的條數,可以設為2萬條
configuration.setString("table.exec.mini-batch.size", "20000");

  2)FlinkSQL參數配置列表:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/config.html

  3)適用場景

    微批處理通過增加延遲換取高吞吐,如果有超低延遲的要求,不建議開啟微批處理。通常對於聚合的場景,微批處理可以顯著的提升系統性能,建議開啟。

  4)注意事項:

    (1)目前,key-value 配置項僅被 Blink planner 支持。

    (2)1.12之前的版本有bug,開啟miniBatch,不會清理過期狀態,也就是說如果設置狀態的TTL,無法清理過期狀態。1.12版本才修復這個問題。

    (3)參考ISSUE:https://issues.apache.org/jira/browse/FLINK-17096

5.1.2 開啟LocalGlobal(解決常見數據熱點問題)

  LocalGlobal優化將原先的Aggregate分成Local+Global兩階段聚合,即MapReduce模型中的Combine+Reduce處理模式。第一階段在上游節點本地攢一批數據進行聚合(localAgg),並輸出這次微批的增量值(Accumulator)。第二階段再將收到的Accumulator合並(Merge),得到最終的結果(GlobalAgg)。

  LocalGlobal本質上能夠靠LocalAgg的聚合篩除部分傾斜數據,從而降低GlobalAgg的熱點,提升性能。結合下圖理解LocalGlobal如何解決數據傾斜的問題。

  由上圖可知:

    未開啟LocalGlobal優化,由於流中的數據傾斜,Key為紅色的聚合算子實例需要處理更多的記錄,這就導致了熱點問題。

    開啟LocalGlobal優化后,先進行本地聚合,再進行全局聚合。可大大減少GlobalAgg的熱點,提高性能。

  1)LocalGlobal開啟方式:

    (1)LocalGlobal優化需要先開啟MiniBatch,依賴於MiniBatch的參數。

    (2)table.optimizer.agg-phase-strategy: 聚合策略。默認AUTO,支持參數AUTO、TWO_PHASE(使用LocalGlobal兩階段聚合)、ONE_PHASE(僅使用Global一階段聚合)。

// 初始化table environment
TableEnvironment tEnv = ...

// 獲取 tableEnv的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();

// 設置參數:
// 開啟miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量輸出的間隔時間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止OOM設置每個批次最多緩存數據的條數,可以設為2萬條
configuration.setString("table.exec.mini-batch.size", "20000");
// 開啟LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

  2)判斷是否生效

    觀察最終生成的拓撲圖的節點名字中是否包含GlobalGroupAggregate或LocalGroupAggregate。

  3)適用場景

    LocalGlobal適用於提升如SUM、COUNT、MAX、MIN和AVG等普通聚合的性能,以及解決這些場景下的數據熱點問題。

  4)注意事項:

    (1)需要先開啟MiniBatch

    (2)開啟LocalGlobal需要UDAF實現Merge方法。

5.1.3 開啟Split Distinct(解決COUNT DISTINCT熱點問題)

  LocalGlobal優化針對普通聚合(例如SUM、COUNT、MAX、MIN和AVG)有較好的效果,對於COUNT DISTINCT收效不明顯,因為COUNT DISTINCT在Local聚合時,對於DISTINCT KEY的去重率不高,導致在Global節點仍然存在熱點。

  之前,為了解決COUNT DISTINCT的熱點問題,通常需要手動改寫為兩層聚合(增加按Distinct Key取模的打散層)。

  從Flink1.9.0版本開始,提供了COUNT DISTINCT自動打散功能,不需要手動重寫。Split Distinct和LocalGlobal的原理對比參見下圖。

  舉例:統計一天的UV

SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day

  如果手動實現兩階段聚合:

SELECT day, SUM(cnt)
FROM (
    SELECT day, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day

    第一層聚合: 將Distinct Key打散求COUNT DISTINCT。

    第二層聚合: 對打散去重后的數據進行SUM匯總。

  Split Distinct開啟方式,默認不開啟,使用參數顯式開啟:

    table.optimizer.distinct-agg.split.enabled: true,默認false。

    table.optimizer.distinct-agg.split.bucket-num: Split Distinct優化在第一層聚合中,被打散bucket數目。默認1024。

// 初始化table environment
TableEnvironment tEnv = ...

// 獲取 tableEnv的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();

// 設置參數:
// 開啟Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一層打散的bucket數目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");

  判斷是否生效

    觀察最終生成的拓撲圖的節點名中是否包含Expand節點,或者原來一層的聚合變成了兩層的聚合。

  適用場景

    使用COUNT DISTINCT,但無法滿足聚合節點性能要求。

  注意事項:

    1)目前不能在包含UDAF的Flink SQL中使用Split Distinct優化方法。

    2)拆分出來的兩個GROUP聚合還可參與LocalGlobal優化。

    3)從Flink1.9.0版本開始,提供了COUNT DISTINCT自動打散功能,不需要手動重寫(不用像上面的例子去手動實現)。

5.1.4 改寫為AGG WITH FILTER語法(提升大量COUNT DISTINCT場景性能)

  在某些場景下,可能需要從不同維度來統計UV,如Android中的UV,iPhone中的UV,Web中的UV和總UV,這時,可能會使用如下CASE WHEN語法。

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day

  在這種情況下,建議使用FILTER語法, 目前的Flink SQL優化器可以識別同一唯一鍵上的不同FILTER參數。如,在上面的示例中,三個COUNT DISTINCT都作用在user_id列上。此時,經過優化器識別后,Flink可以只使用一個共享狀態實例,而不是三個狀態實例,可減少狀態的大小和對狀態的訪問。

  將上邊的CASE WHEN替換成FILTER后,如下所示:

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day

5.2 TopN優化

5.2.1 使用最優算法

  當TopN的輸入是非更新流(例如Source),TopN只有一種算法AppendRank。當TopN的輸入是更新流時(例如經過了AGG/JOIN計算),TopN有2種算法,性能從高到低分別是:UpdateFastRank 和RetractRank。算法名字會顯示在拓撲圖的節點名字上。

  注意:apache社區版的Flink1.12目前還沒有UnaryUpdateRank,阿里雲實時計算版Flink才有

  UpdateFastRank :最優算法

  需要具備2個條件:

    1)輸入流有PK(Primary Key)信息,例如ORDER BY AVG。

    2)排序字段的更新是單調的,且單調方向與排序方向相反。例如,ORDER BY COUNT/COUNT_DISTINCT/SUM(正數)DESC。

      如果要獲取到優化Plan,則您需要在使用ORDER BY SUM DESC時,添加SUM為正數的過濾條件。

  AppendFast:結果只追加,不更新

  RetractRank:普通算法,性能差

    不建議在生產環境使用該算法。請檢查輸入流是否存在PK信息,如果存在,則可進行UpdateFastRank優化。

5.2.2 無排名優化(解決數據膨脹問題)

  1)TopN語法:

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
    ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
  FROM table_name)
WHERE rownum <= N [AND conditions]

  2)數據膨脹問題:

    根據TopN的語法,rownum字段會作為結果表的主鍵字段之一寫入結果表。但是這可能導致數據膨脹的問題。例如,收到一條原排名9的更新數據,更新后排名上升到1,則從1到9的數據排名都發生變化了,需要將這些數據作為更新都寫入結果表。這樣就產生了數據膨脹,導致結果表因為收到了太多的數據而降低更新速度。

  3)使用方式 

    TopN的輸出結果無需要顯示rownum值,僅需在最終前端顯式時進行1次排序,極大地減少輸入結果表的數據量。只需要在外層查詢中將rownum字段裁剪掉即可

// 最外層的字段,不寫 rownum
SELECT col1, col2, col3
FROM (
 SELECT col1, col2, col3
   ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
   ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
 FROM table_name)
WHERE rownum <= N [AND conditions]

    在無rownum的場景中,對於結果表主鍵的定義需要特別小心。如果定義有誤,會直接導致TopN結果的不正確。 無rownum場景中,主鍵應為TopN上游GROUP BY節點的KEY列表。

5.2.3 增加TopN的Cache大小

  TopN為了提升性能有一個State Cache層,Cache層能提升對State的訪問效率。TopN的Cache命中率的計算公式為:

cache_hit = cache_size*parallelism/top_n/partition_key_num

  例如,Top100配置緩存10000條,並發50,當PatitionBy的key維度較大時,例如10萬級別時,Cache命中率只有10000*50/100/100000=5%,命中率會很低,導致大量的請求都會擊中State(磁盤),性能會大幅下降。因此當PartitionKey維度特別大時,可以適當加大TopN的CacheS ize,相對應的也建議適當加大TopN節點的Heap Memory。

  使用方式

// 初始化table environment
TableEnvironment tEnv = ...

// 獲取 tableEnv的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();

// 設置參數:
// 默認10000條,調整TopN cahce到20萬,那么理論命中率能達200000*50/100/100000 = 100%
configuration.setString("table.exec.topn.cache-size", "200000");

  注意:目前源碼中標記為實驗項,官網中未列出該參數

5.2.4 PartitionBy的字段中要有時間類字段

  例如每天的排名,要帶上Day字段。否則TopN的結果到最后會由於State ttl有錯亂。

5.2.5 優化后的SQL示例

insert
  into print_test
SELECT
  cate_id,
  seller_id,
  stat_date,
  pay_ord_amt  --不輸出rownum字段,能減小結果表的輸出量(無排名優化)
FROM (
    SELECT
      *,
      ROW_NUMBER () OVER (
        PARTITION BY cate_id,
        stat_date  --注意要有時間字段,否則state過期會導致數據錯亂(分區字段優化)
        ORDER
          BY pay_ord_amt DESC  --根據上游sum結果排序。排序字段的更新是單調的,且單調方向與排序方向相反(走最優算法)
      ) as rownum  
    FROM (
        SELECT
          cate_id,
          seller_id,
          stat_date,
          --重點。聲明Sum的參數都是正數,所以Sum的結果是單調遞增的,因此TopN能使用優化算法,只獲取前100個數據(走最優算法)
          sum (total_fee) filter (
            where
              total_fee >= 0
          ) as pay_ord_amt
        FROM
          random_test
        WHERE
          total_fee >= 0
        GROUP
          BY cate_name,
          seller_id,
          stat_date
      ) a
    WHERE
      rownum <= 100
  );

5.3 高效去重方案

  由於SQL上沒有直接支持去重的語法,還要靈活的保留第一條或保留最后一條。因此我們使用了SQL的ROW_NUMBER OVER WINDOW功能來實現去重語法。去重本質上是一種特殊的TopN。

5.3.1 保留首行的去重策略(Deduplicate Keep FirstRow

  保留KEY下第一條出現的數據,之后出現該KEY下的數據會被丟棄掉。因為STATE中只存儲了KEY數據,所以性能較優,示例如下:

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
  FROM T
)
WHERE rowNum = 1

  以上示例是將T表按照b字段進行去重,並按照系統時間保留第一條數據。Proctime在這里是源表T中的一個具有Processing Time屬性的字段。如果按照系統時間去重,也可以將Proctime字段簡化PROCTIME()函數調用,可以省略Proctime字段的聲明。

5.3.2 保留末行的去重策略(Deduplicate Keep LastRow

  保留KEY下最后一條出現的數據。保留末行的去重策略性能略優於LAST_VALUE函數,示例如下:

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
  FROM T
)
WHERE rowNum = 1

  以上示例是將T表按照b和d字段進行去重,並按照業務時間保留最后一條數據。Rowtime在這里是源表T中的一個具有Event Time屬性的字段。

5.4 高效的內置函數

5.4.1 使用內置函數替換自定義函數

  Flink的內置函數在持續的優化當中,請盡量使用內部函數替換自定義函數。使用內置函數好處:

    1)優化數據序列化和反序列化的耗時。

    2)新增直接對字節單位進行操作的功能。

  支持的系統內置函數:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html

5.4.2 LIKE操作注意事項

  1)如果需要進行StartWith操作,使用LIKE 'xxx%'

  2)如果需要進行EndWith操作,使用LIKE '%xxx'。

  3)如果需要進行Contains操作,使用LIKE '%xxx%'。

  4)如果需要進行Equals操作,使用LIKE 'xxx',等價於str = 'xxx'。

  5)如果需要匹配 _ 字符,請注意要完成轉義LIKE '%seller/id%' ESCAPE '/'。_在SQL中屬於單字符通配符,能匹配任何字符。

  6)如果聲明為 LIKE '%seller_id%',則不單會匹配seller_id還會匹配seller#id、sellerxid或seller1id 等,導致結果錯誤。

5.4.3 慎用正則函數(REGEXP)

  正則表達式是非常耗時的操作,對比加減乘除通常有百倍的性能開銷,而且正則表達式在某些極端情況下可能會進入無限循環,導致作業阻塞。建議使用LIKE。正則函數包括:

  1)REGEXP

  2)REGEXP_EXTRACT

  3)REGEXP_REPLACE

5.5 指定時區

  本地時區定義了當前會話時區id。當本地時區的時間戳進行轉換時使用。在內部,帶有本地時區的時間戳總是以UTC時區表示。但是,當轉換為不包含時區的數據類型時(例如TIMESTAMP, TIME或簡單的STRING),會話時區在轉換期間被使用。為了避免時區錯亂的問題,可以參數指定時區。

// 初始化table environment
TableEnvironment tEnv = ...

// 獲取 tableEnv的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();

// 設置參數:
// 指定時區
configuration.setString("table.local-time-zone", "Asia/Shanghai");

5.6 設置參數總結

  總結以上的調優參數,代碼如下:

// 初始化table environment
TableEnvironment tEnv = ...

// 獲取 tableEnv的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();

// 設置參數:
// 開啟miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量輸出的間隔時間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止OOM設置每個批次最多緩存數據的條數,可以設為2萬條
configuration.setString("table.exec.mini-batch.size", "20000");
// 開啟LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
// 開啟Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一層打散的bucket數目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
// TopN 的緩存條數
configuration.setString("table.exec.topn.cache-size", "200000");
// 指定時區
configuration.setString("table.local-time-zone", "Asia/Shanghai");


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM