Flink Heartbeat of TaskManager和Heartbeat of ResourceManager timed out問題


最近上了個Flink任務,運行一段時間后就自動停止了,很是郁悶,查看最后一個chekpoint時間點,翻看時間日志

2019-12-13 07:25:24.566 flink [flink-akka.actor.default-dispatcher-41] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - Job PayOrder (88c9cc0c85875332cc5e4ed6418cd667) switched from state RUNNING to FAILING.java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_1566481621886_4397244_01_000004 timed out. at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1656) at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2019-12-13 07:25:24.519 flink [flink-akka.actor.default-dispatcher-20] INFO  org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection 9b7931812dbed76060b48a696d72a869: The heartbeat of ResourceManager with id 9b7931812dbed76060b48a696d72a869 timed out..

根據Heartbeat of TaskManager with id和The heartbeat of ResourceManager with id在源碼中找出這樣的代碼

    private class TaskManagerHeartbeatListener implements HeartbeatListener<AccumulatorReport, Void> { private final JobMasterGateway jobMasterGateway; private TaskManagerHeartbeatListener(JobMasterGateway jobMasterGateway) { this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); } @Override public void notifyHeartbeatTimeout(ResourceID resourceID) { jobMasterGateway.disconnectTaskManager( resourceID, new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out.")); } @Override public void reportPayload(ResourceID resourceID, AccumulatorReport payload) { for (AccumulatorSnapshot snapshot : payload.getAccumulatorSnapshots()) { schedulerNG.updateAccumulators(snapshot); } } @Override public CompletableFuture<Void> retrievePayload(ResourceID resourceID) { return CompletableFuture.completedFuture(null); } } private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> { @Override public void notifyHeartbeatTimeout(final ResourceID resourceId) { runAsync(() -> { log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId); if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) { reconnectToResourceManager( new JobMasterException( String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId))); } }); } @Override public void reportPayload(ResourceID resourceID, Void payload) { // nothing to do since the payload is of type Void
 } @Override public CompletableFuture<Void> retrievePayload(ResourceID resourceID) { return CompletableFuture.completedFuture(null); } }

然后在這實例化

this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceId,new TaskManagerHeartbeatListener(selfGateway),rpcService.getScheduledExecutor(),log);

順着去heartbeatServices瞅瞅了

/** * HeartbeatServices gives access to all services needed for heartbeating. This includes the * creation of heartbeat receivers and heartbeat senders. */
public class HeartbeatServices { /** Heartbeat interval for the created services. */
    protected final long heartbeatInterval; /** Heartbeat timeout for the created services. */
    protected final long heartbeatTimeout; public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) { Preconditions.checkArgument(0L < heartbeatInterval, "The heartbeat interval must be larger than 0."); Preconditions.checkArgument(heartbeatInterval <= heartbeatTimeout, "The heartbeat timeout should be larger or equal than the heartbeat interval."); this.heartbeatInterval = heartbeatInterval; this.heartbeatTimeout = heartbeatTimeout; } /** * Creates a heartbeat manager which does not actively send heartbeats. * * @param resourceId Resource Id which identifies the owner of the heartbeat manager * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered * targets * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts * @param log Logger to be used for the logging * @param <I> Type of the incoming payload * @param <O> Type of the outgoing payload * @return A new HeartbeatManager instance */
    public <I, O> HeartbeatManager<I, O> createHeartbeatManager( ResourceID resourceId, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor scheduledExecutor, Logger log) { return new HeartbeatManagerImpl<>( heartbeatTimeout, resourceId, heartbeatListener, scheduledExecutor, scheduledExecutor, log); } /** * Creates a heartbeat manager which actively sends heartbeats to monitoring targets. * * @param resourceId Resource Id which identifies the owner of the heartbeat manager * @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered * targets * @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts * @param log Logger to be used for the logging * @param <I> Type of the incoming payload * @param <O> Type of the outgoing payload * @return A new HeartbeatManager instance which actively sends heartbeats */
    public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender( ResourceID resourceId, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor scheduledExecutor, Logger log) { return new HeartbeatManagerSenderImpl<>( heartbeatInterval, heartbeatTimeout, resourceId, heartbeatListener, scheduledExecutor, scheduledExecutor, log); } /** * Creates an HeartbeatServices instance from a {@link Configuration}. * * @param configuration Configuration to be used for the HeartbeatServices creation * @return An HeartbeatServices instance created from the given configuration */
    public static HeartbeatServices fromConfiguration(Configuration configuration) { long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL); long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT); return new HeartbeatServices(heartbeatInterval, heartbeatTimeout); } }

沒錯超時時間就在HeartbeatManagerOptions.HEARTBEAT_TIMEOUT

    /** Timeout for requesting and receiving heartbeat for both sender and receiver sides. */
    public static final ConfigOption<Long> HEARTBEAT_TIMEOUT = key("heartbeat.timeout") .defaultValue(50000L) .withDescription("Timeout for requesting and receiving heartbeat for both sender and receiver sides.");

引起心跳超時有可能是yarn壓力比較大引起的,先暫時在conf/flink-conf.yaml將這個值調大一點,再觀察。

#Timeout for requesting and receiving heartbeat for both sender and receiver sides.
heartbeat.timeout: 180000

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM