Persistence
Streaming沒有做特別的事情,DStream最終還是以其中的每個RDD作為job進行調度的,所以persistence就以RDD為單位按照原先Spark的方式去做就可以了,不同的是Streaming是無限,需要考慮Clear的問題
在clearMetadata時,在刪除過期的RDD的同時,也會做相應的unpersist
比較特別的是,NetworkInputDStream,是一定會做persistence的,因為會事先將流數據轉化為persist block,然后NetworkInputDStream直接從block中讀到數據
在design中看到NetworkInputDStream會將source data存兩份,防止丟失,但在代碼中沒有找到這段邏輯,只看到往blockManager寫入一份
Checkpoint
在Streaming中Checkpoint有特殊的意義
對於普通的Spark,沒有cp不會影響正確性,因為任何數據都是可以從source replay出來的,而source data往往在HDFS上,所以cp只是一種優化。
並且Spark也只在worker級別做了failover,worker掛了,沒事把上面的tasks換個worker重新replay出來即可, 但是並沒有做driver的failover,driver掛了就失敗了
因為Spark本身就看成是個query engine,query失敗了沒什么損失,again就ok
但是對於SparkStreaming,這個問題就沒有那么簡單了,如果driver掛掉,不做任何處理,恢復以后到底從哪里開始做?
首先一定會丟數據,影響正確性,因為流數據是無限的,你不可能像Spark一樣把所有數據replay一遍,即使source支持replay,比如kafka
所以對於Streaming的checkpoint分為兩部分,RDD的cp和DStreamGraph的cp
對於RDD的cp和Spark是一致的,沒有區別
下面談談對於DStreamGraph的cp,目的就是在StreamingContext被重啟后,可以從cp中恢復出之前Graph的執行時狀況
a. Graph對象是會整個被序列化到文件,而其中最關鍵的是outputStreams,看似這里只會persist最終的outputStreams,其實會persist整個graph上所有的DStream
因為在def dependencies: List[DStream[_]]會包含所有的上一層DStream,依次遞歸,就會包含所有的DStream對象
在恢復出DStream對象后,如何恢復當時的RDD狀況,可以看到generatedRDDs是@transient的,並不會被persist
答案在DStream.DStreamCheckpointData中,通過currentCheckpointFiles可以記錄下cp時,generatedRDDs中所有完成cp的RDD的(times,cpfilename)
所以在恢復時只需要將RDD從cpfile中讀出來,並加入到generatedRDDs即可
並且cpfile是需要清理的,當每次完成DStreamGraph的cp時,在該graph中的最老的RDD之前的所有RDD的cpfile都可以刪掉,因為這些老的RDD不可能再被用到
b. 在Checkpoint對象中除了graph對象,還有該比較重要的是pendingTimes,這個記錄在cp時,有多少的jobs沒有被提交
這樣當JobScheduler重新啟動的時候會重新提交這些jobs,這里是at-least once邏輯,因為不知道在cp完多久后crash,所以其中某些job有可能已經被成功執行
創建cp的過程,
1. 在JobGenerator中,每次提交一組jobs到Spark后,會執行對DoCheckpoint將Checkpoint對象序列化寫入文件(其中Checkpoint對象包含graph對象等信息)
2. 在完成DoCheckpoint后,會調用ClearCheckpointData清除過期的RDD的checkpoint文件
使用cp的過程,
1. 調用StreamingContext.getOrCreate,使用CheckpointReader.read從文件中反序列化出Checkpoint對象, 並使用Checkpoint對象去初始化StreamingContext對象
2. 在StreamingContext中調用cp_.graph.restoreCheckpointData來恢復每個DStream.generatedRDDs
3. 在JobGenerator中調用Restart,重新提交哪些在cp中未被提交的jobs
DStreamGraph
final private[streaming] class DStreamGraph extends Serializable with Logging { private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() var rememberDuration: Duration = null var checkpointInProgress = false var zeroTime: Time = null var startTime: Time = null var batchDuration: Duration = null def updateCheckpointData(time: Time) { logInfo("Updating checkpoint data for time " + time) this.synchronized { outputStreams.foreach(_.updateCheckpointData(time)) } logInfo("Updated checkpoint data for time " + time) } def clearCheckpointData(time: Time) { logInfo("Clearing checkpoint data for time " + time) this.synchronized { outputStreams.foreach(_.clearCheckpointData(time)) } logInfo("Cleared checkpoint data for time " + time) } def restoreCheckpointData() { logInfo("Restoring checkpoint data") this.synchronized { outputStreams.foreach(_.restoreCheckpointData()) } logInfo("Restored checkpoint data") } }
DStreamCheckpointData
private[streaming] class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) extends Serializable with Logging { protected val data = new HashMap[Time, AnyRef]() // Mapping of the batch time to the checkpointed RDD file of that time @transient private var timeToCheckpointFile = new HashMap[Time, String] // 保存所有被cp的RDD的(time,cpfilename) // Mapping of the batch time to the time of the oldest checkpointed RDD // in that batch's checkpoint data @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] //保存每次cp時的當前時間和其中最old RDD的時間的關系 @transient private var fileSystem : FileSystem = null protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] // 保存此次被cp的RDD的(time,cpfilename) /** * Updates the checkpoint data of the DStream. This gets called every time * the graph checkpoint is initiated. Default implementation records the * checkpoint files to which the generate RDDs of the DStream has been saved. */ def update(time: Time) { // Get the checkpointed RDDs from the generated RDDs val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) // 從當前的dstream.generatedRDDs過濾出已經完成cp的RDDs .map(x => (x._1, x._2.getCheckpointFile.get)) // Add the checkpoint files to the data to be serialized if (!checkpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles // 更新currentCheckpointFiles // Add the current checkpoint files to the map of all checkpoint files // This will be used to delete old checkpoint files timeToCheckpointFile ++= currentCheckpointFiles // Remember the time of the oldest checkpoint RDD in current state timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) // 找出此次cp中最old的那個RDD對應的時間 } } /** * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been * written to the checkpoint directory. */ def cleanup(time: Time) { // Get the time of the oldest checkpointed RDD that was written as part of the // checkpoint of `time` timeToOldestCheckpointFileTime.remove(time) match { //timeToOldestCheckpointFileTime中記錄了在time時的cp中最old的rdd的時間lastCheckpointFileTime case Some(lastCheckpointFileTime) => // Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime` // This is because checkpointed RDDs older than this are not going to be needed // even after master fails, as the checkpoint data of `time` does not refer to those files val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) //清除所有比lastCheckpointFileTime更老的cpFile logDebug("Files to delete:\n" + filesToDelete.mkString(",")) filesToDelete.foreach { case (time, file) => try { val path = new Path(file) if (fileSystem == null) { fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) } fileSystem.delete(path, true) timeToCheckpointFile -= time logInfo("Deleted checkpoint file '" + file + "' for time " + time) } catch { } } case None => logDebug("Nothing to delete") } } /** * Restore the checkpoint data. This gets called once when the DStream graph * (along with its DStreams) are being restored from a graph checkpoint file. * Default implementation restores the RDDs from their checkpoint files. */ def restore() { // Create RDDs from the checkpoint data currentCheckpointFiles.foreach { case(time, file) => { // 恢復,即從cpFile中反序列化出RDD,並加入dstream.generatedRDDs中 dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file))) } } } }
DStream
//DStream // Checkpoint details private[streaming] val mustCheckpoint = false private[streaming] var checkpointDuration: Duration = null private[streaming] val checkpointData = new DStreamCheckpointData(this) /** * Enable periodic checkpointing of RDDs of this DStream * @param interval Time interval after which generated RDD will be checkpointed */ def checkpoint(interval: Duration): DStream[T] = { if (isInitialized) { throw new UnsupportedOperationException( "Cannot change checkpoint interval of an DStream after streaming context has started") } persist() checkpointDuration = interval this } /** * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of * this stream. This is an internal method that should not be called directly. This is * a default implementation that saves only the file names of the checkpointed RDDs to * checkpointData. Subclasses of DStream (especially those of InputDStream) may override * this method to save custom checkpoint data. */ private[streaming] def updateCheckpointData(currentTime: Time) { checkpointData.update(currentTime) dependencies.foreach(_.updateCheckpointData(currentTime)) } private[streaming] def clearCheckpointData(time: Time) { checkpointData.cleanup(time) dependencies.foreach(_.clearCheckpointData(time)) } /** * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method * that should not be called directly. This is a default implementation that recreates RDDs * from the checkpoint file names stored in checkpointData. Subclasses of DStream that * override the updateCheckpointData() method would also need to override this method. */ private[streaming] def restoreCheckpointData() { // Create RDDs from the checkpoint data checkpointData.restore() dependencies.foreach(_.restoreCheckpointData()) }
JobGenerator
1. 在每次runJobs結束,即每次新提交一組jobs后,會執行對DoCheckpoint將Checkpoint對象寫入文件
2. 在restart的時候,會重新run pendingTimes + downTimes的jobs,保證at-least once邏輯
//JobGenerator private lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) } else { null } /** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { SparkEnv.set(ssc.env) Try(graph.generateJobs(time)) match { case Success(jobs) => jobScheduler.runJobs(time, jobs) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventActor ! DoCheckpoint(time) //在完成runJobs后,對DStreamGraph進行CP } /** Perform checkpoint for the give `time`. */ private def doCheckpoint(time: Time) = synchronized { if (checkpointWriter != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { ssc.graph.updateCheckpointData(time) //先更新graph中DStream的currentCheckpointFiles checkpointWriter.write(new Checkpoint(ssc, time)) //使用checkpointWriter將Checkpoint對象寫入文件 } } def onCheckpointCompletion(time: Time) { eventActor ! ClearCheckpointData(time) //在完成DStreamGraph的CP后,需要清除該DStream之前的RDD的CP文件 } /** Clear DStream checkpoint data for the given `time`. */ private def clearCheckpointData(time: Time) { ssc.graph.clearCheckpointData(time) } /** Restarts the generator based on the information in checkpoint */ private def restart() { // If manual clock is being used for testing, then // either set the manual clock to the last checkpointed time, // or if the property is defined set it to that time if (clock.isInstanceOf[ManualClock]) { val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0) clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } val batchDuration = ssc.graph.batchDuration // Batches when the master was down, that is, // between the checkpoint and current restart time val checkpointTime = ssc.initialCheckpoint.checkpointTime val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) val downTimes = checkpointTime.until(restartTime, batchDuration) //在最后一次checkpoint到restart之間這段時間內RDD的times logInfo("Batches during down time (" + downTimes.size + " batches): " + downTimes.mkString(", ")) // Batches that were unprocessed before failure val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering) // 在CP該graph時,jobsets仍沒有提交的jobset logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", ")) // Reschedule jobs for these times val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) // 需要Reschedule的為pendingTimes + downTimes logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach(time => jobScheduler.runJobs(time, graph.generateJobs(time)) ) // Restart the timer timer.start(restartTime.milliseconds) logInfo("JobGenerator restarted at " + restartTime) }
StreamingContext
在有checkpoint文件時,需要先讀出Checkpoint對象,然后去初始化StreamingContext
從而使用Checkpoint去恢復graph中所有的DStream
//StreamingContext class StreamingContext private[streaming] ( sc_ : SparkContext, cp_ : Checkpoint, batchDur_ : Duration ) extends Logging { private[streaming] val isCheckpointPresent = (cp_ != null) private[streaming] val graph: DStreamGraph = { if (isCheckpointPresent) { cp_.graph.setContext(this) cp_.graph.restoreCheckpointData() cp_.graph } else { assert(batchDur_ != null, "Batch duration for streaming context cannot be null") val newGraph = new DStreamGraph() newGraph.setBatchDuration(batchDur_) newGraph } } /** * Set the context to periodically checkpoint the DStream operations for driver * fault-tolerance. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored. * Note that this must be a fault-tolerant file system like HDFS for */ def checkpoint(directory: String) { //僅僅是創建checkpointDir,函數名起的不好 if (directory != null) { val path = new Path(directory) val fs = path.getFileSystem(sparkContext.hadoopConfiguration) fs.mkdirs(path) val fullPath = fs.getFileStatus(path).getPath().toString sc.setCheckpointDir(fullPath) checkpointDir = fullPath } else { checkpointDir = null } } private[streaming] def initialCheckpoint: Checkpoint = { if (isCheckpointPresent) cp_ else null } } object StreamingContext extends Logging { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be * recreated from the checkpoint data. If the data does not exist, then the StreamingContext * will be created by called the provided `creatingFunc`. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param creatingFunc Function to create a new StreamingContext * @param hadoopConf Optional Hadoop configuration if necessary for reading from the * file system * @param createOnError Optional, whether to create a new StreamingContext if there is an * error in reading checkpoint data. By default, an exception will be * thrown on error. */ def getOrCreate( checkpointPath: String, creatingFunc: () => StreamingContext, hadoopConf: Configuration = new Configuration(), createOnError: Boolean = false ): StreamingContext = { val checkpointOption = try { //從CPfile里面讀出Checkpoint對象 CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf) } catch { case e: Exception => if (createOnError) { None } else { throw e } } checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) //用Checkpoint對象去初始化StreamingContext } }
Checkpoint (org.apache.spark.streaming)
Checkpoint主要是為了cp DStreamGraph對象,通過CheckpointWriter將Checkpoint序列化到文件
private[streaming] class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) extends Logging with Serializable { val master = ssc.sc.master val framework = ssc.sc.appName val sparkHome = ssc.sc.getSparkHome.getOrElse(null) val jars = ssc.sc.jars val graph = ssc.graph //關鍵需要cp的graph信息 val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration //從JobScheduler的jobSets取出沒有被run的jobset的time列表 val pendingTimes = ssc.scheduler.getPendingTimes().toArray val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConfPairs = ssc.conf.getAll }
CheckpointWriter,用於將CP對象寫入文件
/** * Convenience class to handle the writing of graph checkpoint to file */ private[streaming] class CheckpointWriter( jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration ) extends Logging { val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) val compressionCodec = CompressionCodec.createCodec(conf) private var stopped = false private var fs_ : FileSystem = _ class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { def run() { var attempts = 0 val startTime = System.currentTimeMillis() val tempFile = new Path(checkpointDir, "temp") // 臨時文件 val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime) // 正式的Cp文件 val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime) // 備份文件 while (attempts < MAX_ATTEMPTS && !stopped) { attempts += 1 try { logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + checkpointFile + "'") // Write checkpoint to temp file,先寫到臨時文件 fs.delete(tempFile, true) // just in case it exists val fos = fs.create(tempFile) fos.write(bytes) fos.close() // If the checkpoint file exists, back it up // If the backup exists as well, just delete it, otherwise rename will fail if (fs.exists(checkpointFile)) { fs.delete(backupFile, true) // just in case it exists if (!fs.rename(checkpointFile, backupFile)) { // 將當前的CP rename成backup文件 logWarning("Could not rename " + checkpointFile + " to " + backupFile) } } // Rename temp file to the final checkpoint file,再將臨時文件rename成cp文件 if (!fs.rename(tempFile, checkpointFile)) { logWarning("Could not rename " + tempFile + " to " + checkpointFile) } // Delete old checkpoint files val allCheckpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs) if (allCheckpointFiles.size > 4) { allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => { logInfo("Deleting " + file) fs.delete(file, true) }) } // All done, print success val finishTime = System.currentTimeMillis() jobGenerator.onCheckpointCompletion(checkpointTime) // Checkpoint完成是,觸發jobGenerator.onCheckpointCompletion return } catch { } } } } def write(checkpoint: Checkpoint) { val bos = new ByteArrayOutputStream() val zos = compressionCodec.compressedOutputStream(bos) val oos = new ObjectOutputStream(zos) oos.writeObject(checkpoint) // 將Checkpoint對象序列化 oos.close() bos.close() try { executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) // 用線程去執行CheckpointWriteHandler將數據寫入文件 } catch { } } }
CheckpointReader
private[streaming] object CheckpointReader extends Logging { def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = { val checkpointPath = new Path(checkpointDir) def fs = checkpointPath.getFileSystem(hadoopConf) // Try to find the checkpoint files val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, fs).reverse if (checkpointFiles.isEmpty) { return None } // Try to read the checkpoint files in the order logInfo("Checkpoint files found: " + checkpointFiles.mkString(",")) val compressionCodec = CompressionCodec.createCodec(conf) checkpointFiles.foreach(file => { logInfo("Attempting to load checkpoint from file " + file) try { val fis = fs.open(file) // ObjectInputStream uses the last defined user-defined class loader in the stack // to find classes, which maybe the wrong class loader. Hence, a inherited version // of ObjectInputStream is used to explicitly use the current thread's default class // loader to find and load classes. This is a well know Java issue and has popped up // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) val zis = compressionCodec.compressedInputStream(fis) val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader) val cp = ois.readObject.asInstanceOf[Checkpoint] // 將文件內容反序列化成Checkpoint對象 ois.close() fs.close() cp.validate() return Some(cp) } catch { } }) } }