場景:
在spark structured streaming讀取kafka上的topic,然后將統計結果寫入到hdfs,hdfs保存目錄按照month,day,hour進行分區:
1)程序放到spark上使用yarn開始運行(yarn-client或yarn-cluster),可以正常sink結果到目錄下(分配了executor,executor上有task分配,hdfs有結果輸出);
2)程序出現問題,然后修改bug,將checkpoint刪除了(為了重新消費kafka的topic上的數據),hdfs sink hdfs結果目錄不清理,此時問題出現了:程序啟動起來后,一直在觸發批次,通過查看日志“Monitoring Streaming Queries-》Metrics”信息發現topic的offset隨着批次觸發增加偏移位置也在增加,並成功commit,記錄offset到checkpoint下。但是一直未DAGScheduler划分stage、提交stage、提交任務。
3)短時間測試發現:executor在任務提交成功后,成功分配了executor,但是executor一直未分配任何任務;
4)長時間測試發現:2019-07-25 00:00:00開始提交任務,在2019-07-25 03:06:00開始執行代碼解析,DAGScheduler划分stage、提交stage、提交任務。
程序如下:
//輸出2個文件 Dataset<Row> dataset = this.sparkSession.readStream().format("kafka") .options(this.getSparkKafkaCommonOptions(sparkSession)) //讀取spark-testapp.conf,自定義配置信息。 .option("kafka.bootstrap.servers", "192.168.1.1:9092,192.168.1.2:9092") .option("subscribe", "myTopic1,myTopic2") .option("startingOffsets", "earliest") .load(); String mdtTempView = "mdtbasetemp"; ExpressionEncoder<Row> Rowencoder = this.getSchemaEncoder(new Schema.Parser().parse(baseschema.getValue())); Dataset<Row> parseVal = dataset.select("value").as(Encoders.BINARY()) .map(new MapFunction<Row>(){ .... }, Rowencoder) .createOrReplaceGlobalTempView(mdtTempView); Dataset<Row> queryResult = this.sparkSession.sql("select 。。。 from global_temp." + mdtTempView + " where start_time<>\"\""); /*輸出路徑*/ String outputPath = "/user/dx/streaming/data/testapp"; String checkpointLocation= "/user/dx/streaming/checkpoint/testapp"; // Sink方法1: StreamingQuery query = queryResult.writeStream().format("parquet") .option("path", outputPath) .option("checkpointLocation", checkpointLocation) .partitionBy("month", "day", "hour") .outputMode(OutputMode.Append()) .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) .start(); try { query.awaitTermination(); } catch (StreamingQueryException e) { e.printStackTrace(); }
長時間執行日志:
程序在00:00左右開始提交,分配executor,然后開始讀取kafka數據,開始多次觸發批次,在03:06開始生成自行代碼,DAGScheduler划分stage、提交stage、提交task到executor,回落正常情況。
...... 9/07/25 03:05:00 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-032985c5-b382-41ab-a115-ec44d9ba26bc-462898594-driver-0] Resetting offset for partition myTopic-97 to offset 147327. 19/07/25 03:05:00 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-032985c5-b382-41ab-a115-ec44d9ba26bc-462898594-driver-0] Resetting offset for partition myTopic-101 to offset 147329. 19/07/25 03:05:00 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-032985c5-b382-41ab-a115-ec44d9ba26bc-462898594-driver-0] Resetting offset for partition myTopic-37 to offset 147327. 19/07/25 03:05:00 INFO streaming.CheckpointFileManager: Writing atomically to hdfs://hadoop7:8020/user/dx/streaming/checkpoint/testapp/offsets/202 using temp file hdfs://hadoop7:8020/user/dx/streaming/checkpoint/testapp/offsets/.202.22f580ca-bfb3-422d-9e45-d83186088b42.tmp 19/07/25 03:05:00 INFO streaming.CheckpointFileManager: Renamed temp file hdfs://hadoop7:8020/user/dx/streaming/checkpoint/testapp/offsets/.202.22f580ca-bfb3-422d-9e45-d83186088b42.tmp to hdfs://hadoop7:8020/user/dx/streaming/checkpoint/testapp/offsets/202 19/07/25 03:05:00 INFO streaming.MicroBatchExecution: Committed offsets for batch 202. Metadata OffsetSeqMetadata(0,1563995100011,Map(spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider, spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion -> 2, spark.sql.streaming.multipleWatermarkPolicy -> min, spark.sql.streaming.aggregation.stateFormatVersion -> 2, spark.sql.shuffle.partitions -> 100)) 19/07/25 03:05:00 INFO kafka010.KafkaMicroBatchReader: Partitions added: Map() 19/07/25 03:05:00 INFO streaming.FileStreamSink: Skipping already committed batch 202 19/07/25 03:05:00 INFO streaming.CheckpointFileManager: Renamed temp file hdfs://hadoop7:8020/user/dx/streaming/checkpoint/testapp/commits/.202.0cb8f1ad-7b00-46fd-b65f-7bf055eda4ae.tmp to hdfs://hadoop7:8020/user/dx/streaming/checkpoint/testapp/commits/202 Trigger load count accumulator value: 192 19/07/25 03:05:00 INFO streaming.MicroBatchExecution: #這里實際上兩個topic,我只摘取了一個topic的metrics Streaming query made progress: { "id" : "c5537e0c-979d-4575-b4d2-1f8a746d2673", "runId" : "d7c5a8f6-e876-45c4-8a02-984045e031ec", "name" : null, "timestamp" : "2019-07-24T19:05:00.000Z", "batchId" : 202, "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "durationMs" : { "addBatch" : 52, "getBatch" : 1, "getEndOffset" : 1, "queryPlanning" : 319, "setOffsetRange" : 10, "triggerExecution" : 486, "walCommit" : 67 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaV2[Subscribe[myTopic-2]]", "startOffset" : { "myTopic-2" : { ...... "6" : 146978, "0" : 146978 } }, "endOffset" : { "myTopic-2" : { ...... "6" : 147329, "0" : 147329 } }, "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "FileSink[/user/dx/streaming/data/testapp]" } } .... 19/07/25 03:06:00 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-032985c5-b382-41ab-a115-ec44d9ba26bc-462898594-driver-0] Resetting offset for partition myTopic-2 to offset 147659. 19/07/25 03:06:00 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-032985c5-b382-41ab-a115-ec44d9ba26bc-462898594-driver-0] Resetting offset for partition myTopic-66 to offset 147662. 19/07/25 03:06:00 INFO streaming.CheckpointFileManager: Writing atomically to hdfs://hadoop7:8020/user/dx/streaming/checkpoint/testapp/offsets/203 using temp file hdfs://hadoop7:8020/user/dx/streaming/checkpoint/testapp/offsets/.203.72235ff6-aa6d-4e8f-b924-888335fe2035.tmp 19/07/25 03:06:00 INFO streaming.CheckpointFileManager: Renamed temp file hdfs://hadoop7:8020/user/dx/streaming/checkpoint/testapp/offsets/.203.72235ff6-aa6d-4e8f-b924-888335fe2035.tmp to hdfs://hadoop7:8020/user/dx/streaming/checkpoint/testapp/offsets/203 19/07/25 03:06:00 INFO streaming.MicroBatchExecution: Committed offsets for batch 203. Metadata OffsetSeqMetadata(0,1563995160011,Map(spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider, spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion -> 2, spark.sql.streaming.multipleWatermarkPolicy -> min, spark.sql.streaming.aggregation.stateFormatVersion -> 2, spark.sql.shuffle.partitions -> 100)) 19/07/25 03:06:00 INFO kafka010.KafkaMicroBatchReader: Partitions added: Map() 19/07/25 03:06:00 INFO parquet.ParquetFileFormat: Using default output committer for Parquet: parquet.hadoop.ParquetOutputCommitter 19/07/25 03:06:01 INFO codegen.CodeGenerator: Code generated in 316.896471 ms 19/07/25 03:06:01 INFO spark.SparkContext: Starting job: start at MdtStreamDriver.java:184 19/07/25 03:06:01 INFO scheduler.DAGScheduler: Got job 0 (start at MdtStreamDriver.java:184) with 128 output partitions 19/07/25 03:06:01 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (start at MdtStreamDriver.java:184) 19/07/25 03:06:01 INFO scheduler.DAGScheduler: Parents of final stage: List() 19/07/25 03:06:01 INFO scheduler.DAGScheduler: Missing parents: List() 19/07/25 03:06:01 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[7] at start at MdtStreamDriver.java:184), which has no missing parents 19/07/25 03:06:02 INFO memory.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 255.5 KB, free 5.6 GB) 19/07/25 03:06:02 INFO memory.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 84.7 KB, free 5.6 GB) 19/07/25 03:06:02 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop7:35113 (size: 84.7 KB, free: 6.2 GB) 19/07/25 03:06:02 INFO spark.SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1161 19/07/25 03:06:02 INFO scheduler.DAGScheduler: Submitting 128 missing tasks from ResultStage 0 (MapPartitionsRDD[7] at start at MdtStreamDriver.java:184) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)) 19/07/25 03:06:02 INFO cluster.YarnScheduler: Adding task set 0.0 with 128 tasks 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 11.0 in stage 0.0 (TID 0, hadoop6, executor 6, partition 11, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 6.0 in stage 0.0 (TID 1, hadoop6, executor 2, partition 6, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 21.0 in stage 0.0 (TID 2, hadoop37, executor 9, partition 21, PROCESS_LOCAL, 8822 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 3, hadoop37, executor 10, partition 2, PROCESS_LOCAL, 8823 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 4, hadoop6, executor 1, partition 0, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 7.0 in stage 0.0 (TID 5, hadoop6, executor 3, partition 7, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 0.0 (TID 6, hadoop6, executor 4, partition 4, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 7, hadoop37, executor 8, partition 1, PROCESS_LOCAL, 8822 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 0.0 (TID 8, hadoop37, executor 7, partition 3, PROCESS_LOCAL, 8822 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 0.0 (TID 9, hadoop6, executor 5, partition 5, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 41.0 in stage 0.0 (TID 10, hadoop6, executor 6, partition 41, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 31.0 in stage 0.0 (TID 11, hadoop6, executor 2, partition 31, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 27.0 in stage 0.0 (TID 12, hadoop37, executor 9, partition 27, PROCESS_LOCAL, 8822 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 12.0 in stage 0.0 (TID 13, hadoop37, executor 10, partition 12, PROCESS_LOCAL, 8823 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 9.0 in stage 0.0 (TID 14, hadoop6, executor 1, partition 9, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 19.0 in stage 0.0 (TID 15, hadoop6, executor 3, partition 19, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 23.0 in stage 0.0 (TID 16, hadoop6, executor 4, partition 23, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 8.0 in stage 0.0 (TID 17, hadoop37, executor 8, partition 8, PROCESS_LOCAL, 8822 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 20.0 in stage 0.0 (TID 18, hadoop37, executor 7, partition 20, PROCESS_LOCAL, 8822 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 10.0 in stage 0.0 (TID 19, hadoop6, executor 5, partition 10, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 42.0 in stage 0.0 (TID 20, hadoop6, executor 6, partition 42, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 39.0 in stage 0.0 (TID 21, hadoop6, executor 2, partition 39, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 53.0 in stage 0.0 (TID 22, hadoop37, executor 9, partition 53, PROCESS_LOCAL, 8822 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 29.0 in stage 0.0 (TID 23, hadoop37, executor 10, partition 29, PROCESS_LOCAL, 8823 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 15.0 in stage 0.0 (TID 24, hadoop6, executor 1, partition 15, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 26.0 in stage 0.0 (TID 25, hadoop6, executor 3, partition 26, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 32.0 in stage 0.0 (TID 26, hadoop6, executor 4, partition 32, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 13.0 in stage 0.0 (TID 27, hadoop37, executor 8, partition 13, PROCESS_LOCAL, 8822 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 22.0 in stage 0.0 (TID 28, hadoop37, executor 7, partition 22, PROCESS_LOCAL, 8822 bytes) 19/07/25 03:06:02 INFO scheduler.TaskSetManager: Starting task 16.0 in stage 0.0 (TID 29, hadoop6, executor 5, partition 16, PROCESS_LOCAL, 8821 bytes) 19/07/25 03:06:02 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop37:49766 (size: 84.7 KB, free: 4.6 GB) 19/07/25 03:06:02 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop37:59401 (size: 84.7 KB, free: 4.6 GB) 19/07/25 03:06:02 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop37:39051 (size: 84.7 KB, free: 4.6 GB) 19/07/25 03:06:02 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop37:53105 (size: 84.7 KB, free: 4.6 GB) 19/07/25 03:06:02 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop6:60796 (size: 84.7 KB, free: 4.6 GB) 19/07/25 03:06:02 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop6:40022 (size: 84.7 KB, free: 4.6 GB) 19/07/25 03:06:02 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop6:37348 (size: 84.7 KB, free: 4.6 GB) 19/07/25 03:06:02 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop6:40556 (size: 84.7 KB, free: 4.6 GB) 19/07/25 03:06:02 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop6:58914 (size: 84.7 KB, free: 4.6 GB) 19/07/25 03:06:02 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on hadoop6:38491 (size: 84.7 KB, free: 4.6 GB) 19/07/25 03:06:07 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on hadoop37:49766 (size: 1527.0 B, free: 4.6 GB) 19/07/25 03:06:07 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on hadoop37:49766 (size: 1527.0 B, free: 4.6 GB) 19/07/25 03:06:07 INFO scheduler.TaskSetManager: Starting task 33.0 in stage 0.0 (TID 30, hadoop37, executor 7, partition 33, PROCESS_LOCAL, 8822 bytes) 19/07/25 03:06:07 INFO scheduler.TaskSetManager: Starting task 40.0 in stage 0.0 (TID 31, hadoop37, executor 7, partition 40, PROCESS_LOCAL, 8822 bytes) 19/07/25 03:06:07 INFO scheduler.TaskSetManager: Starting task 49.0 in stage 0.0 (TID 32, hadoop37, executor 7, partition 49, PROCESS_LOCAL, 8822 bytes)
其中代碼中一個比較重要的日志:
19/07/25 03:05:00 INFO streaming.FileStreamSink: Skipping already committed batch 202
上邊代碼包含兩個批次的觸發:
觸發時刻1):“19/07/25 03:05:00”成功分配task之前的空觸發(未分配task的最后一次觸發,這是一次錯誤的出發批次。注意的:實際上每次錯誤觸發批次中都包含上邊紅色日志“INFO streaming.FileStreamSink: Skipping already committed batch $batchId”
觸發時刻2):“19/07/25 03:06:00”這批觸發可以從上邊日志清晰看到,生成了執行代碼,DAGScheduler成功划分了stage、提交stage、提交task、executor中開始運行task等信息。這是一次正常的觸發批次。
問題解決:
方案1)(采用foreachBatch方式來sink file替換):
經過測試發現如果把最后sink的format修改為console方式,每次都可以正常觸發,第一次觸發就分配stage,給executor分配了task;如果把sink file的format設置為csv、parquet就不能正常觸發,猜測是觸發方式的問題:修改觸發方式為foreachBatch。
// Sink方法2: /**.repartition(1) 可以考慮加上,能避免小文件,repartition越小,小文件相對會少,但是性能會差點。*/ StreamingQuery query = queryResult.writeStream() .option("checkpointLocation", checkpointLocation) .foreachBatch(new VoidFunction2<Dataset<Row>,Long>(){ private static final long serialVersionUID = 2689158908273637554L; @Override public void call(Dataset<Row> v1, Long v2) throws Exception { v1.write().partitionBy("month", "day", "hour").mode(SaveMode.Append).save(outputPath); }}) .outputMode(OutputMode.Append()) .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) .start();
經過多次測試發現,問題確實解決了。但是上邊那種sink file(parquet/csv)方式也是官方推出的,不應該有bug、
方案2)(未驗證):
上邊日志中“INFO streaming.FileStreamSink: Skipping already committed batch $batchId”這個信息很重要,這個日志每次未正常sink數據到hdfs的觸發打印日志中都有打印,這個日志是org/apache/spark/sql/execution/streaming/FileStreamSink.scala的108行代碼(spark2.4)中出現輸出的。
/** * A sink that writes out results to parquet files. Each batch is written out to a unique * directory. After all of the files in a batch have been successfully written, the list of * file paths is appended to the log atomically. In the case of partial failures, some duplicate * data may be present in the target directory, but only one copy of each file will be present * in the log. */ class FileStreamSink( sparkSession: SparkSession, path: String, fileFormat: FileFormat, partitionColumnNames: Seq[String], options: Map[String, String]) extends Sink with Logging { private val basePath = new Path(path) private val logPath = new Path(basePath, FileStreamSink.metadataDir) private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString) private val hadoopConf = sparkSession.sessionState.newHadoopConf() private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = { val serializableHadoopConf = new SerializableConfiguration(hadoopConf) new BasicWriteJobStatsTracker(serializableHadoopConf, BasicWriteJobStatsTracker.metrics) } override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { logInfo(s"Skipping already committed batch $batchId") } else { val committer = FileCommitProtocol.instantiate( className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass, jobId = batchId.toString, outputPath = path) committer match { case manifestCommitter: ManifestFileCommitProtocol => manifestCommitter.setupManifestOptions(fileLog, batchId) case _ => // Do nothing } // Get the actual partition columns as attributes after matching them by name with // the given columns names. val partitionColumns: Seq[Attribute] = partitionColumnNames.map { col => val nameEquality = data.sparkSession.sessionState.conf.resolver data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse { throw new RuntimeException(s"Partition column $col not found in schema ${data.schema}") } } val qe = data.queryExecution FileFormatWriter.write( sparkSession = sparkSession, plan = qe.executedPlan, fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec(path, Map.empty, qe.analyzed.output), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = None, statsTrackers = Seq(basicWriteJobStatsTracker), options = options) } } override def toString: String = s"FileSink[$path]" }
從上邊代碼可以確定代碼中做了特殊處理:當前batchId小於當日志中記錄的最新的batchId時,將不觸發寫入信息到hdfs中,只是打印了這個“Skipping already committed batch $batchId”
上邊日志是保存到sink file的path下的,猜測是保存到了path/_spark_metadata下了。
如果重新執行時,刪除了checkpoint,應該也必須刪除sink file的保存hdfs目錄下的日志,日志中保存batchId信息,刪除batchId日志信息才能從頭開始觸發,按照代碼分析應該是這么一個原因,但是還未驗證。