flink 有狀態udf 引起血案一


版權聲明:本文為博主原創文章,未經博主同意不得轉載。 https://blog.csdn.net/rlnLo2pNEfx9c/article/details/83422587

640

場景

近期在做一個畫像的任務,sql實現的,當中有一個udf,會做非常多事情,包含將從redis讀出歷史值加權,並將中間結果和加權后的結果更新到redis。

大家都知道,flink 是能夠支持事件處理的。也就是能夠沒有時間的概念,那么在聚合,join等操作的時候,flink內部會維護一個狀態,假如此時你也用redis維護了歷史狀態,也即是相似 result = currentState(flink)+lastState(redis)。且此時要針對計算的結果用where進行篩選.

SQL例如以下

 
          

CREATE VIEW view_count AS
select
 `time`,
 gid,
 cid,
 count(feed_id) * 1 as strength
FROM
 view_cid
GROUP BY
 gid,
 cid,`time`;

CREATE VIEW view_strength AS select
 `time`,
 gid,
 cid ,
 Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95)  as `result`
FROM
 view_count
;

insert into
 hx_app_server_sink_common
SELECT
 gid,
 cid,
 `result`
FROM
 view_strength
where `result` <> '0.0'
GROUP BY
 gid,
 cid,
 `result`;

業務分析

第一個sql視圖完畢的是首先分組,然后統計某一個字段並乘以權重;

第二個sql視圖。udf :Get_Strength_Weaken完畢當前值和歷史值疊加工作,歷史值存儲在redis。同一時候將結果返回並更新redis,返回值作為result字段。

第三個sql在輸出的時候,result字段作為了where的條件和group by里的字段。

這時候生成的flink概圖例如以下:

640

觀察中間的結構圖能夠發現。Get_Strength_Weaken被調用兩次:

1. where條件。這個的生成是因為第三條sql

 
          

where `result` <> '0.0'

產生的運行計划,是不是看起來非常懵逼。。

2. select里面另一次調用Get_Strength_Weaken。這個非常明顯。

當然。能夠打印一下flink udf里eval函數的調用細節日志,非常easy發現反復調用的問題。浪院長這個也是通過分析日志。對照輸出結果來得出的論。

綜合上面分析和udf調用日志,結論就是udf被調用了兩次。

對於這個flink的udf被多次調用引起的結果偏大。整整調試了一下午。

因為上面分析能夠得出結論,flink將where條件下推了,where 條件推斷會先運行,而select里后運行,那么能夠調整SQL。例如以下:

 
          

CREATE VIEW view_count AS
select
`time`,
gid,
cid,
count(feed_id) * 1 as strength
FROM
view_cid
GROUP BY
gid,
cid,`time`;

CREATE VIEW view_strength AS select
`time`,
gid,
cid ,
getResult(gid,cid) as `result`
FROM
view_count
where Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95)  as `result` <> '0.0'
;

insert into
hx_app_server_sink_common
SELECT
gid,
cid,
`result`
FROM
view_strength
GROUP BY
gid,
cid,
`result`;

那么實際上。select里的udf主要目的是取出來計算結果。那么這個時候能夠寫個簡單的udf--getResult,僅僅讓他從redis獲取 where條件里更新到redis里的結果,因為該udf是無狀態的即使多次調用。也無所謂。

所以。總結一下,對於flink 來說,因為基於事件的處理,聚合、join等操作會有狀態緩存,那么此時再用到含有外部存儲狀態的udf,一定要謹慎,結合運行計划,來合理放置udf的位置,避免出錯。

當然。調試階段最好是有具體的日志。便於分析和定位問題。

flink 狀態刪除

事實上。flink聚合等內部狀態有配置能夠使其自己主動刪除的,具體配置使用例如以下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12));

// define query
Table result = ...

// create TableSink
TableSink<Row> sink = ...

// emit result Table via a TableSink
result.writeToSink(sink, qConfig);

// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);

[完]

推薦閱讀:

推薦兩個不錯的flink項目

Spark SQL從入門到精通

重要 : 優化flink的四種方式

flink超越Spark的Checkpoint機制

640


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM