場景
近期在做一個畫像的任務,sql實現的,當中有一個udf,會做非常多事情,包含將從redis讀出歷史值加權,並將中間結果和加權后的結果更新到redis。
大家都知道,flink 是能夠支持事件處理的。也就是能夠沒有時間的概念,那么在聚合,join等操作的時候,flink內部會維護一個狀態,假如此時你也用redis維護了歷史狀態,也即是相似 result = currentState(flink)+lastState(redis)。且此時要針對計算的結果用where進行篩選.
SQL例如以下
CREATE VIEW view_count AS
select
`time`,
gid,
cid,
count(feed_id) * 1 as strength
FROM
view_cid
GROUP BY
gid,
cid,`time`;
CREATE VIEW view_strength AS select
`time`,
gid,
cid ,
Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95) as `result`
FROM
view_count
;
insert into
hx_app_server_sink_common
SELECT
gid,
cid,
`result`
FROM
view_strength
where `result` <> '0.0'
GROUP BY
gid,
cid,
`result`;
業務分析
第一個sql視圖完畢的是首先分組,然后統計某一個字段並乘以權重;
第二個sql視圖。udf :Get_Strength_Weaken完畢當前值和歷史值疊加工作,歷史值存儲在redis。同一時候將結果返回並更新redis,返回值作為result字段。
第三個sql在輸出的時候,result字段作為了where的條件和group by里的字段。
這時候生成的flink概圖例如以下:
觀察中間的結構圖能夠發現。Get_Strength_Weaken被調用兩次:
1. where條件。這個的生成是因為第三條sql
where `result` <> '0.0'
產生的運行計划,是不是看起來非常懵逼。。
。
2. select里面另一次調用Get_Strength_Weaken。這個非常明顯。
當然。能夠打印一下flink udf里eval函數的調用細節日志,非常easy發現反復調用的問題。浪院長這個也是通過分析日志。對照輸出結果來得出的論。
綜合上面分析和udf調用日志,結論就是udf被調用了兩次。
對於這個flink的udf被多次調用引起的結果偏大。整整調試了一下午。
因為上面分析能夠得出結論,flink將where條件下推了,where 條件推斷會先運行,而select里后運行,那么能夠調整SQL。例如以下:
CREATE VIEW view_count AS
select
`time`,
gid,
cid,
count(feed_id) * 1 as strength
FROM
view_cid
GROUP BY
gid,
cid,`time`;
CREATE VIEW view_strength AS select
`time`,
gid,
cid ,
getResult(gid,cid) as `result`
FROM
view_count
where Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95) as `result` <> '0.0'
;
insert into
hx_app_server_sink_common
SELECT
gid,
cid,
`result`
FROM
view_strength
GROUP BY
gid,
cid,
`result`;
那么實際上。select里的udf主要目的是取出來計算結果。那么這個時候能夠寫個簡單的udf--getResult,僅僅讓他從redis獲取 where條件里更新到redis里的結果,因為該udf是無狀態的即使多次調用。也無所謂。
所以。總結一下,對於flink 來說,因為基於事件的處理,聚合、join等操作會有狀態緩存,那么此時再用到含有外部存儲狀態的udf,一定要謹慎,結合運行計划,來合理放置udf的位置,避免出錯。
當然。調試階段最好是有具體的日志。便於分析和定位問題。
flink 狀態刪除
事實上。flink聚合等內部狀態有配置能夠使其自己主動刪除的,具體配置使用例如以下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12));
// define query
Table result = ...
// create TableSink
TableSink<Row> sink = ...
// emit result Table via a TableSink
result.writeToSink(sink, qConfig);
// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
[完]
推薦閱讀: