Flink akka AskTimeoutException問題排查


最近遇到一個很奇怪的問題,Flink任務正常啟動正常運行一段時間后就會報錯,,錯誤詳情如下

2019-12-11 17:20:57.757 flink [flink-scheduler-1] ERROR o.apache.flink.runtime.rest.handler.job.JobsOverviewHandler - Unhandled exception.akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#761962841]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
    at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) at java.lang.Thread.run(Thread.java:745)

初步判斷應該是觸發了akka的超時機制,那就先調整集群的akka超時間配置,在conf/flink-conf.yaml最后增加下面參數

akka.ask.timeout: 100 s

觀察Job Manager Configuration,配置參數已經生效

再將任務起起來,在運行十幾個小時之后還是出現同樣的錯誤,這就很奇怪了,明明更改了參數也生效了,為啥還是10000 ms就超時了

似乎有點邪門,只能Google一把了,在Apache Flink 中文用戶郵件列表找到類似的問題,給出的建議是調整akka.ask.timeout和web.timeout兩個參數

異常原因如上所說是 akka ask timeout 的問題,這個問題前兩天有人在部署 k8s 的時候也遇過[1] 他的情況是配置資源過少導致 JM 未能及時響應。除了調整上述參數外也可看看是不是這個問題。 Best, tison. [1] https://lists.apache.org/thread.html/84db9dca2e990dd0ebc30aa35390ac75a0e9c7cbfcdbc2029986d4d7@%3Cuser-zh.flink.apache.org%3E
 Biao Liu <[hidden email]> 於2019年8月8日周四 下午8:00寫道: > 你好, >
> 異常里可以看出 AskTimeoutException, 可以調整兩個參數 akka.ask.timeout 和 web.timeout > 再試一下,默認值如下 >
> akka.ask.timeout: 10 s > web.timeout: 10000
>
> PS: 搜 “AskTimeoutException Flink” 可以搜到很多相關答案 >
> Thanks, > Biao /'bɪ.aʊ/

這我就有點奇怪了,錯誤和web.timeout有啥關系啊,只能順着錯誤翻看Flink源碼了。在源碼重直接找到JobsOverviewHandler類,發現引用這個類就兩個:一個是JobsOverviewHeaders,主要是作為一個link;另外一個是WebMonitorEndpoint,

在這實例化了一個JobsOverviewHandler,傳入了timeout參數

protected final RestHandlerConfiguration restConfiguration;

final Time timeout = restConfiguration.getTimeout();

JobsOverviewHandler jobsOverviewHandler = new JobsOverviewHandler(leaderRetriever,timeout,responseHeaders,JobsOverviewHeaders.getInstance());

再追蹤timeout來源發現取自RestHandlerConfiguration Timeout字段,而這個值來自WebOptions.TIMEOUT

    public static RestHandlerConfiguration fromConfiguration(Configuration configuration) { final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL); final int maxCheckpointStatisticCacheEntries = configuration.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE); final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT)); final String rootDir = "flink-web-ui"; final File webUiDir = new File(configuration.getString(WebOptions.TMP_DIR), rootDir); return new RestHandlerConfiguration( refreshInterval, maxCheckpointStatisticCacheEntries, timeout, webUiDir); }

    /** * Timeout for asynchronous operations by the web monitor in milliseconds. */
    public static final ConfigOption<Long> TIMEOUT = key("web.timeout") .defaultValue(10L * 1000L) .withDescription("Timeout for asynchronous operations by the web monitor in milliseconds.");

這下真相大白,果然需要調整web.timeout參數,是web監視器的異步操作超時時間,默認10000ms。將該值提高到300000ms,繼續觀察。

 

參考

https://zhuanlan.zhihu.com/p/49095640

http://apache-flink.147419.n8.nabble.com/Fwd-need-help-td321.html#a349


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM