場景描述,前兩天給flink 程序做了savepoint 記錄但是我在代碼中改了flink程序的一個算子 這就導致了 我重新提交之前那個savepoint 的時候報錯。報錯信息如下
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://cn-northwest-1.compute.internal:8020/tmp/tmp/savepoint-a2dd6f-695e0cd41db7. Cannot map checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI. at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1132) at org.apache.flink.runtime.scheduler.LegacyScheduler.tryRestoreExecutionGraphFromSavepoint(LegacyScheduler.java:237) at org.apache.flink.runtime.scheduler.LegacyScheduler.createAndRestoreExecutionGraph(LegacyScheduler.java:196) at org.apache.flink.runtime.scheduler.LegacyScheduler.<init>(LegacyScheduler.java:176) at org.apache.flink.runtime.scheduler.LegacySchedulerFactory.createInstance(LegacySchedulerFactory.java:70) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:275) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:265) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146) ... 10 more
這句的大概的意思就是 我不能從我保存的checkpoint的位置進行讀取數據 好像要指定什么參數。然后找了一圈發現問題
允許未恢復狀態啟動
$ bin/flink run -s :savepointPath -n [:runArgs]
默認情況下,resume操作將嘗試將保存點的所有狀態映射回要恢復的程序。如果刪除了運算符,則可以通過--allowNonRestoredState(short -n:)選項跳過無法映射到新程序的狀態:
所以在恢復的時候,要加上一個 -n 參數 這個參數就是相當於重新check 狀態。
我試了一下 這樣的話 保存的數據也不會重復。這樣 這個問題就得到了解決。