壓測合理並行度的方法:
①獲得高峰期的qps,如每秒5w條
②消費該高峰期的數據,達到反壓狀態后查看每秒處理的數據量y,就是單並行度的處理上限
③x除以y,增加一點富余: 乘以1.2,就是合理的並行度。
在flink中,設置並行度的地方有:
①配置文件 ②提交任務時的參數 ③代碼env ④代碼算子
案例:
提交一個flink程序,內容是計算uv,設置並行度為5
測試時記得關閉chain來看到每一個算子的情況 : env.disableOperatorChaining();
在flink ui界面查看整個DAG圖,看到明顯產生了反壓情況(數據處理速度達到100%)
接着查看source的情況:
點擊subtask,因為並行度是5,可以看到5個task(ID為0-4)的數據接收量和發送量(流量大小和條數),因為是source端,所以接收都是0 。
從下圖看到ID為0的task發送量已經達到了35MB,共有62w條。這是總共的,我們要查看每秒的量
打開metric輸入persecond搜索:
從下圖中我們可以看到ID為4的子任務每秒發送的量在7000+
可以看到幾個task的發送量都差不多,由此可見單並行度的處理能力為7000條,假設該業務高峰期時每秒產生的數據量(qps)為7w,
所以7/0.7*1.2, 該業務合理的並行度為12
並行度的推薦配置
- source端
數據源端是kafka,source的並行度設置為kafka對應topic的分區數。
如果已經等於kafka的分區數,消費速度仍更不上數據生產速度,考慮下kafka要擴大分區,同時調大並行度等於分區數。
flink的一個並行度可以處理一至多個分區的數據,如果並行度多於kafka的分區數,那么就會造成有的並行度空閑,浪費資源
- Transform端
第一次keyby之前的算子,比如map、fliter、flatmap等處理較快的算子,並行度和source保持一致即可。
第一次keyby之后的算子,視具體情況而定,可以通過上面測試反壓的方法,得到keyby算子上游的數據發送量和該算子的處理能力來得到合理的並行度(在無傾斜情況下)
- sink端
sink端是數據流向下游的地方,可以根據sink端的數據量及下游的服務抗壓能力進行評估。
如果sink端是kafka,可以設為kafka對應topic的分區數。
sink端的數據量若比較小,比如一些高度聚合或者過濾比較大的數據(比如監控告警),可以將並行度設置的小一些。
如果source端的數據量最小,拿到source端流過來的數據后做了細粒度的拆分,數據量不斷的增加,到sink端的數據量非常大的這種情況,就需要提高並行度。