flink入門到實戰(8)flink 有狀態 udf引發的大坑一


場景

最近在做一個畫像的任務,sql實現的,其中有一個udf,會做很多事情,包括將從redis讀出歷史值加權,並將中間結果和加權后的結果更新到redis。

大家都知道,flink 是可以支持事件處理的,也就是可以沒有時間的概念,那么在聚合,join等操作的時候,flink內部會維護一個狀態,假如此時你也用redis維護了歷史狀態,也即是類似 result = currentState(flink)+lastState(redis),且此時要針對計算的結果用where進行篩選.

SQL如下

CREATE VIEW view_count AS
select
 `time`,
 gid,
 cid,
 count(feed_id) * 1 as strength
FROM
 view_cid
GROUP BY
 gid,
 cid,`time`;

CREATE VIEW view_strength AS select
 `time`,
 gid,
 cid ,
 Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95)  as `result`
FROM
 view_count
;

insert into
 hx_app_server_sink_common
SELECT
 gid,
 cid,
 `result`
FROM
 view_strength
where `result` <> '0.0' 
GROUP BY
 gid,
 cid,
 `result`;

 

業務分析

第一個sql視圖完成的是首先分組,然后統計某一個字段並乘以權重;

第二個sql視圖,udf :Get_Strength_Weaken完成當前值和歷史值疊加工作,歷史值存儲在redis,同時將結果返回並更新redis,返回值作為result字段。

第三個sql在輸出的時候,result字段作為了where的條件和group by里的字段。

這時候生成的flink概圖如下:

觀察中間的結構圖可以發現,Get_Strength_Weaken被調用兩次:

1. where條件,這個的生成是由於第三條sql

產生的執行計划,是不是看起來很懵逼。。。

2. select里面還有一次調用Get_Strength_Weaken,這個很明顯。

當然,可以打印一下flink udf里eval函數的調用細節日志,很容易發現重復調用的問題,浪院長這個也是通過分析日志,對比輸出結果來得出的論。

綜合上面分析和udf調用日志,結論就是udf被調用了兩次。

對於這個flink的udf被多次調用引起的結果偏大,整整調試了一下午。

由於上面分析可以得出結論,flink將where條件下推了,where 條件判斷會先執行,而select里后執行,那么可以調整SQL,如下:

CREATE VIEW view_count AS
select
`time`,
gid,
cid,
count(feed_id) * 1 as strength
FROM
view_cid
GROUP BY
gid,
cid,`time`;

CREATE VIEW view_strength AS select
`time`,
gid,
cid ,
getResult(gid,cid) as `result`
FROM
view_count
where Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95)  as `result` <> '0.0'
;

insert into
hx_app_server_sink_common
SELECT
gid,
cid,
`result`
FROM
view_strength
GROUP BY
gid,
cid,
`result`;

 

那么實際上,select里的udf主要目的是取出來計算結果,那么這個時候可以寫個簡單的udf--getResult,只讓他從redis獲取 where條件里更新到redis里的結果,由於該udf是無狀態的即使多次調用,也無所謂。

所以,總結一下,對於flink 來說,由於基於事件的處理,聚合、join等操作會有狀態緩存,那么此時再用到含有外部存儲狀態的udf,一定要慎重,結合執行計划,來合理放置udf的位置,避免出錯。

當然,調試階段最好是有詳細的日志,便於分析和定位問題。

flink 狀態刪除

其實,flink聚合等內部狀態有配置可以使其自動刪除的,具體配置使用如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));

// define query
Table result = ...

// create TableSink
TableSink<Row> sink = ...

// register TableSink
tableEnv.registerTableSink(
  "outputTable",               // table name
  new String[]{...},           // field names
  new TypeInformation[]{...},  // field types
  sink);                       // table sink

// emit result Table via a TableSink
result.insertInto("outputTable", qConfig);

// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM