Delta Lake源碼分析
本文主要從代碼的具體實現方面進行講解,關於delta lake的事務日志原理,可以看這篇博客,講解的很詳細。
Delta Lake元數據
delta lake 包含Protocol、Metadata、FileAction(AddFile、RemoveFile)、CommitInfo和SetTransaction這幾種元數據action。
- Protocol:這是delta lake自身的版本管理,一般只出現在第一次的commit日志里(之后版本升級應該也會有);
- Metadata:存儲delta表的schema信息,第一次commit和每次修改schema時出現,以最后一次出現的為准;
- FileAction:文件的相關操作,delta lake的文件操作只有添加文件和刪除文件;
- CommitInfo:保存關於本次更改的原始信息,如修改時間,操作類型,讀取的數據版本等;
- SetTransaction:設置application的提交版本,一般用於流式計算的一致性控制(exactlyOnce)。
//初始的commit log會包含protocol和metaData的信息
{"commitInfo":{"timestamp":1576480709055,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"fe0948b9-8253-4942-9e28-3a89321a004d","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"azkaban_tag\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"project_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"flow_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"job_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"application_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"queue_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"master_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1576480707164}}
{"add":{"path":"part-00000-dc1d1431-1e4b-4337-b111-6a447bad15fc-c000.snappy.parquet","partitionValues":{},"size":1443338,"modificationTime":1576480711000,"dataChange":true}}
//之后的commit log會記錄下當前操作的信息
{"commitInfo":{"timestamp":1576481270646,"operation":"DELETE","operationParameters":{"predicate":"[\"(`master_name` = 'mob_analyse')\"]"},"readVersion":0,"isBlindAppend":false}}
{"remove":{"path":"part-00000-dc1d1431-1e4b-4337-b111-6a447bad15fc-c000.snappy.parquet","deletionTimestamp":1576481270643,"dataChange":true}}
{"add":{"path":"part-00000-d6431884-390d-4837-865c-f6e52f0e2cf5-c000.snappy.parquet","partitionValues":{},"size":1430267,"modificationTime":1576481273000,"dataChange":true}}
snapshot生成
當存在checkpoint文件時,DeltaLog類的currentSnapshot會根據checkpoint和之后的json日志來計算快照。
- 通過loadMetadataFromFile()方法讀取_last_checkpoint文件獲得最新的checkpoint路徑;
- 通過LogStore.listFrom()方法獲得checkpoint之后版本的delta log文件;
- 使用verifyDeltaVersions方法驗證delta log的文件是否是連續的(日志版本必須是連續的,每個commit log都需要被計算);
- 解析並聚合checkpoint和delta log為Seq[DeltaLogFileIndex],然后new Snapshot();
- Snapshot里的stateReconstruction會使用InMemoryLogReplay來計算日志中的各種action,獲得最終的狀態信息。
當沒有checkpoint文件時,通過DeltaLog類的update方法來計算快照。
- 當不存在_last_checkpoint文件時,new一個版本號為-1的Snapshot;
- 檢測到currentSnapshot的版本為-1,調用update方法,實際工作的是updateInternal方法,它會把當前的快照更新到最新版本;
- updateInternal會遍歷出版本號小於等於 max(當前版本,0)的checkpoint文件和delta log,並通過new Snapshot將這些更新添加到當前快照中。
@volatile private var currentSnapshot: Snapshot = lastCheckpoint.map { c =>
val checkpointFiles = c.parts
.map(p => checkpointFileWithParts(logPath, c.version, p)) //目前版本沒用到parts,疑似商業版功能
.getOrElse(Seq(checkpointFileSingular(logPath, c.version))) //返回最新checkpoint文件路徑
val deltas = store.listFrom(deltaFile(logPath, c.version + 1)) //返回checkpoint之后版本的json文件
.filter(f => isDeltaFile(f.getPath))
.toArray
val deltaVersions = deltas.map(f => deltaVersion(f.getPath))
verifyDeltaVersions(deltaVersions) //驗證版本日志是否連續
val newVersion = deltaVersions.lastOption.getOrElse(c.version)
logInfo(s"Loading version $newVersion starting from checkpoint ${c.version}")
try {
val deltaIndex = DeltaLogFileIndex(DeltaLog.COMMIT_FILE_FORMAT, deltas)
val checkpointIndex = DeltaLogFileIndex(DeltaLog.CHECKPOINT_FILE_FORMAT, fs, checkpointFiles)
val snapshot = new Snapshot( //創建快照
logPath,
newVersion,
None,
checkpointIndex :: deltaIndex :: Nil,
minFileRetentionTimestamp,
this,
// we don't want to make an additional RPC here to get commit timestamps when "deltas" is
// empty. The next "update" call will take care of that if there are delta files.
deltas.lastOption.map(_.getModificationTime).getOrElse(-1L))
validateChecksum(snapshot) //通過crc文件校驗版本,但是目前delta版本並沒有生成crc文件,后續會更新或者又是商業版的坑?
lastUpdateTimestamp = clock.getTimeMillis()
snapshot
} catch {
case e: FileNotFoundException
if Option(e.getMessage).exists(_.contains("parquet does not exist")) =>
recordDeltaEvent(this, "delta.checkpoint.error.partial")
throw DeltaErrors.missingPartFilesException(c, e)
case e: AnalysisException if Option(e.getMessage).exists(_.contains("Path does not exist")) =>
recordDeltaEvent(this, "delta.checkpoint.error.partial")
throw DeltaErrors.missingPartFilesException(c, e)
}
}.getOrElse {
new Snapshot(logPath, -1, None, Nil, minFileRetentionTimestamp, this, -1L) //沒有checkpoint文件時,從頭開始讀delta log計算
}
// Reconstruct the state by applying deltas in order to the checkpoint.
// We partition by path as it is likely the bulk of the data is add/remove.
// Non-path based actions will be collocated to a single partition.
private val stateReconstruction = {
val implicits = spark.implicits
import implicits._
val numPartitions = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_SNAPSHOT_PARTITIONS)
val checkpointData = previousSnapshot.getOrElse(emptyActions)
val deltaData = load(files)
val allActions = checkpointData.union(deltaData)
val time = minFileRetentionTimestamp
val hadoopConf = new SerializableConfiguration(spark.sessionState.newHadoopConf())
val logPath = path.toUri // for serializability
allActions.as[SingleAction]
.mapPartitions { actions =>
val hdpConf = hadoopConf.value
actions.flatMap {
_.unwrap match {
case add: AddFile => Some(add.copy(path = canonicalizePath(add.path, hdpConf)).wrap)
case rm: RemoveFile => Some(rm.copy(path = canonicalizePath(rm.path, hdpConf)).wrap)
case other if other == null => None
case other => Some(other.wrap)
}
}
}
.withColumn("file", assertLogBelongsToTable(logPath)(input_file_name()))
.repartition(numPartitions, coalesce($"add.path", $"remove.path"))
.sortWithinPartitions("file")
.as[SingleAction]
.mapPartitions { iter =>
val state = new InMemoryLogReplay(time)
state.append(0, iter.map(_.unwrap))
state.checkpoint.map(_.wrap)
}
}
日志提交
日志的提交是在OptimisticTransactionImpl的commit()中實現的。
- 調用prepareCommit方法做各種檢查,如字段是否重復、是否第一次提交等;
- 判斷本次commit的隔離等級,目前只檢查是否修改了數據,若修改了數據則使用Serializable級別,否則用SnapshotIsolation,因為不修改數據的情況下,它可以提供和Serializable一樣的保證,且能在之后的沖突檢測中更容易通過(writeIsolation還沒有使用,后期會更新吧);
- 使用doCommit方法提交action日志,生成當前version+1的log文件,如果同名文件已存在,則提交失敗;
- doCommit失敗后會調用checkAndRetry進行重試,遍歷讀version后的所有commit log,進行沖突檢測,檢測通過后再次提交doCommit;
- 完成doCommit后,postCommit方法會檢查是否滿足checkpointInterval,如果滿足條件則調用deltaLog.checkpoint()方法生成新的checkpoint文件,並更新_last_checkpoint文件。
/**
* Modifies the state of the log by adding a new commit that is based on a read at
* the given `lastVersion`. In the case of a conflict with a concurrent writer this
* method will throw an exception.
*
* @param actions Set of actions to commit
* @param op Details of operation that is performing this transactional commit
*/
@throws(classOf[ConcurrentModificationException])
def commit(actions: Seq[Action], op: DeltaOperations.Operation): Long = recordDeltaOperation(
deltaLog,
"delta.commit") {
val version = try {
// Try to commit at the next version.
var finalActions = prepareCommit(actions, op) //各種檢查
// Find the isolation level to use for this commit
val noDataChanged = actions.collect { case f: FileAction => f.dataChange }.forall(_ == false)
val isolationLevelToUse = if (noDataChanged) { //0.5版本新特性,很簡單的隔離等級判定,writeIsolation還沒有使用,等后續更新吧
// If no data has changed (i.e. its is only being rearranged), then SnapshotIsolation
// provides Serializable guarantee. Hence, allow reduced conflict detection by using
// SnapshotIsolation of what the table isolation level is.
SnapshotIsolation
} else {
Serializable
}
val isBlindAppend = { //判斷是否不讀取delta數據且所有的文件操作都是AddFile
val dependsOnFiles = readPredicates.nonEmpty || readFiles.nonEmpty
val onlyAddFiles =
finalActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile])
onlyAddFiles && !dependsOnFiles
}
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) { //默認會將commitInfo記錄到commit log里
commitInfo = CommitInfo(
clock.getTimeMillis(),
op.name,
op.jsonEncodedValues,
Map.empty,
Some(readVersion).filter(_ >= 0),
None,
Some(isBlindAppend))
finalActions = commitInfo +: finalActions
}
// Register post-commit hooks if any
lazy val hasFileActions = finalActions.collect { case f: FileAction => f }.nonEmpty
if (DeltaConfigs.SYMLINK_FORMAT_MANIFEST_ENABLED.fromMetaData(metadata) && hasFileActions) {
registerPostCommitHook(GenerateSymlinkManifest) //生成manifest支持Presto和Athena
}
val commitVersion = doCommit(snapshot.version + 1, finalActions, 0, isolationLevelToUse) //提交action日志
logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}")
postCommit(commitVersion, finalActions) //檢測是否合並checkpoint
commitVersion
} catch {
case e: DeltaConcurrentModificationException =>
recordDeltaEvent(deltaLog, "delta.commit.conflict." + e.conflictType)
throw e
case NonFatal(e) =>
recordDeltaEvent(
deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e)))
throw e
}
runPostCommitHooks(version, actions) //0.5版本新特性,用來支持Presto和Amazon Athena
version
}
沖突檢測(並發控制)
- 如果后續commit升級了protocol版本,則不通過;
- 如果后續commit更改了metadata,則不通過;
- 如果后續commit更改了文件:
- 在0.5之前的版本,只要讀了delta表的文件,且后續其他commit log有FileAction操作,就不能通過檢測(除非是完全不依賴delta表,單純的灌數據才行,怪不得並發低);
- 0.5版本增加了Serializable,WriteSerializable,SnapshotIsolation三個隔離等級;(以下僅考慮源碼的具體實現,根據isolationLevels里的文檔注釋,它們應該具有更多的功能,尤其是WriteSerializable級別,目前的代碼並沒有使用,推測應該會在后續版本進行更新,或者在商業版里才有)
- Serializable最嚴格的,要求絕對的串行化,設置了這個級別,只要出現並發沖突,且后續commit log存在AddFile操作,就會報錯;
- WriteSerializable允許其他commit isBlindAppend時通過沖突檢測(即后續的commit僅AddFile,不RemoveFile),此種情況下最終結果和串行的結果可能不同;
- SnapshotIsolation最寬松,基本都可以通過這部分的沖突檢測,但是可能無法通過其他模塊的檢測。
- 如果后續commit刪除了本次讀取的文件,則不通過;
- 如果后續commit和本次commit刪除了同一個文件,則不通過;
- 如果冪等的事務發生了沖突(SetTransaction部分有相同的appId),則不通過。
(具體代碼在OptimisticTransaction.scala的checkAndRetry方法里,有興趣的可以看一下)
delete
調用DeltaTable里的delete方法可以刪除滿足指定條件的數據。
- DeltaTableOperations的executeDelete將任務解析成DeleteCommand,然后run;
- DeleteCommand.run會檢查目標delta表是否為appendOnly,若是,則禁止更新和刪除數據,否則performDelete;
- 在performDelete方法中,首先解析給定的刪除數據的條件,划分為只使用元數據就能計算的謂詞和其它謂詞兩類;(具體實現是檢查謂詞是否僅包含分區列和條件是否涉及子查詢表達式)
- 使用OptimisticTransaction里的filterFiles方法找出需要刪除的文件列表,
- 如果只有上述第一種情況,則不需要掃描文件數據,直接刪除文件就行,刪除調用的是removeWithTimestamp方法,返回RemoveFile action;
- 如果包含上述第二種情況,則需要掃描文件數據,找出文件列表中不需要被刪除的數據,使用TransactionalWrite.writeFiles方法寫到新的文件中,此時deleteActions包括AddFile和RemoveFile。
- 最后用commit方法提交deleteActions,並使用recordDeltaEvent記錄本次刪除操作的詳細信息。
(文件並沒有被物理刪除)
private def performDelete(
sparkSession: SparkSession, deltaLog: DeltaLog, txn: OptimisticTransaction) = {
import sparkSession.implicits._
var numTouchedFiles: Long = 0
var numRewrittenFiles: Long = 0
var scanTimeMs: Long = 0
var rewriteTimeMs: Long = 0
val startTime = System.nanoTime()
val numFilesTotal = deltaLog.snapshot.numOfFiles
val deleteActions: Seq[Action] = condition match {
case None => //沒有限定條件,需刪除整張表,此時遍歷所有文件,然后刪除就行
// Case 1: Delete the whole table if the condition is true
val allFiles = txn.filterFiles(Nil)
numTouchedFiles = allFiles.size
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
val operationTimestamp = System.currentTimeMillis()
allFiles.map(_.removeWithTimestamp(operationTimestamp)) //邏輯刪除數據文件
case Some(cond) => //有條件就需要區分不同情況了
val (metadataPredicates, otherPredicates) =
DeltaTableUtils.splitMetadataAndDataPredicates( //將條件解析成能用元數據定位的和其他
cond, txn.metadata.partitionColumns, sparkSession)
if (otherPredicates.isEmpty) { //第一種情況,只使用元數據就能定位所有數據
// Case 2: The condition can be evaluated using metadata only.
// Delete a set of files without the need of scanning any data files.
val operationTimestamp = System.currentTimeMillis()
val candidateFiles = txn.filterFiles(metadataPredicates) //返回涉及到的文件
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
numTouchedFiles = candidateFiles.size
candidateFiles.map(_.removeWithTimestamp(operationTimestamp)) //刪除
} else { //第二種情況,需要把文件中不需要刪除的數據重寫一份
// Case 3: Delete the rows based on the condition.
val candidateFiles = txn.filterFiles(metadataPredicates ++ otherPredicates)
numTouchedFiles = candidateFiles.size
val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles) //生成重寫后的文件名和對應的AddFile action
val fileIndex = new TahoeBatchFileIndex(
sparkSession, "delete", candidateFiles, deltaLog, tahoeFileIndex.path, txn.snapshot)
// Keep everything from the resolved target except a new TahoeFileIndex
// that only involves the affected files instead of all files.
val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex) //替換文件索引,更新LogicalPlan
val data = Dataset.ofRows(sparkSession, newTarget)
val filesToRewrite =
withStatusCode("DELTA", s"Finding files to rewrite for DELETE operation") {
if (numTouchedFiles == 0) {
Array.empty[String]
} else {
data.filter(new Column(cond)).select(new Column(InputFileName())).distinct()
.as[String].collect()
}
}
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
if (filesToRewrite.isEmpty) {
// Case 3.1: no row matches and no delete will be triggered
Nil
} else {
// Case 3.2: some files need an update to remove the deleted files
// Do the second pass and just read the affected files
val baseRelation = buildBaseRelation(
sparkSession, txn, "delete", tahoeFileIndex.path, filesToRewrite, nameToAddFileMap)
// Keep everything from the resolved target except a new TahoeFileIndex
// that only involves the affected files instead of all files.
val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location)
val targetDF = Dataset.ofRows(sparkSession, newTarget)
val filterCond = Not(EqualNullSafe(cond, Literal(true, BooleanType)))
val updatedDF = targetDF.filter(new Column(filterCond))
val rewrittenFiles = withStatusCode(
"DELTA", s"Rewriting ${filesToRewrite.size} files for DELETE operation") {
txn.writeFiles(updatedDF) //寫文件
}
numRewrittenFiles = rewrittenFiles.size
rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs
val operationTimestamp = System.currentTimeMillis()
removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++ //刪文件
rewrittenFiles //寫文件
}
}
}
if (deleteActions.nonEmpty) {
txn.commit(deleteActions, DeltaOperations.Delete(condition.map(_.sql).toSeq)) //提交commit日志
}
recordDeltaEvent( //記錄本次操作的詳細信息
deltaLog,
"delta.dml.delete.stats",
data = DeleteMetric(
condition = condition.map(_.sql).getOrElse("true"),
numFilesTotal,
numTouchedFiles,
numRewrittenFiles,
scanTimeMs,
rewriteTimeMs)
)
}
update
調用DeltaTable里的update()方法可以更新滿足指定條件的數據。(和delete有些相似)
- DeltaTableOperations的executeUpdate將任務解析成UpdateCommand,然后run;
- UpdateCommand.run檢查目標delta表是否為appendOnly,若是,則禁止更新和刪除數據,否則performUpdate;
- 解析給定條件,划分為只使用元數據就能計算的謂詞和其它謂詞兩類;
- 使用OptimisticTransaction里的filterFiles方法找出需要刪除的文件列表,
- 如果只有上述第一種情況,removeWithTimestamp直接刪除文件,然后調用rewriteFiles方法,使用buildUpdatedColumns更新受影響的列,最后writeFiles;
- 如果包含上述第二種情況,掃描數據,找出需要更新的數據,(邏輯)刪除原文件,更新受影響的數據,然后rewriteFiles。
- 最后用commit方法提交deleteActions,並使用recordDeltaEvent記錄本次刪除操作的詳細信息。
(關鍵代碼詳見UpdateCommand.scala的performUpdate方法,和delete相似)
merge
DeltaTable里merge直接調用DeltaMergeBuilder方法,后續的whenMatched和whenNotMatched都是向mergeBuilder里面添加從句,最后使用execute()啟動執行;
-
whenMatched時可以執行update操作。
- update調用addUpdateClause方法,它使用MergeIntoClause.toActions將解析后的列名和update的表達式轉化為MergeAction,MergeIntoUpdateClause將它與whenMatched的條件結合,通過withClause()添加到mergeBuilder里;
- updateAll也是同樣的流程,只是MergeIntoClause.toActions(Nil, Nil)參數為空(類似於update set * ),后續execute時resolveClause方法會予以解析。
private def addUpdateClause(set: Map[String, Column]): DeltaMergeBuilder = { if (set.isEmpty && matchCondition.isEmpty) { // Nothing to update = no need to add an update clause mergeBuilder } else { val setActions = set.toSeq val updateActions = MergeIntoClause.toActions( //轉化為MergeAction colNames = setActions.map(x => UnresolvedAttribute.quotedString(x._1)), exprs = setActions.map(x => x._2.expr), isEmptySeqEqualToStar = false) val updateClause = MergeIntoUpdateClause(matchCondition.map(_.expr), updateActions) //和條件一起打包 mergeBuilder.withClause(updateClause) //加到mergeBuilder里 } }
-
whenMatched時可以執行delete操作,直接用MergeIntoDeleteClause封裝一下matchCondition,然后withClause添加進mergeBuilder;
/** Delete a matched row from the table */ def delete(): DeltaMergeBuilder = { val deleteClause = MergeIntoDeleteClause(matchCondition.map(_.expr)) mergeBuilder.withClause(deleteClause) }
-
whenNotMatched時可以執行insert操作,流程類似update,MergeIntoClause.toActions轉化,MergeIntoInsertClause封裝,然后添加到mergeBuilder里;
private def addInsertClause(setValues: Map[String, Column]): DeltaMergeBuilder = { val values = setValues.toSeq val insertActions = MergeIntoClause.toActions( colNames = values.map(x => UnresolvedAttribute.quotedString(x._1)), exprs = values.map(x => x._2.expr), isEmptySeqEqualToStar = false) val insertClause = MergeIntoInsertClause(notMatchCondition.map(_.expr), insertActions) mergeBuilder.withClause(insertClause) }
-
調用execute來執行。
- 使用MergeInto.resolveReferences解析mergeClause。首先會檢查merge的語法規則;
- 一個merge中至少存在一個whenClauses;
- 如果存在兩個whenMatched,則第一個必須有條件;
- whenMatched最多有兩個;
- update、delete和insert都只能出現一次。
- 具體的解析工作是由resolveClause和resolveOrFail來完成的(resolveOrFail提供了一個遞歸的調用)。
- 使用PreprocessTableMerge進行預處理,將MergeIntoInsertClause(notMatch)和MergeIntoMatchedClause(match:MergeIntoUpdateClause和MergeIntoDeleteClause都繼承自它)封裝成MergeIntoCommand;
- 調用MergeIntoCommand.run。
- 如果只有whenNotMatched,則只需要insert數據,調用writeInsertsOnlyWhenNoMatchedClauses方法,此時只需要left anti join 找到需要插入的數據,然后寫就行了,相關方法是OptimisticTransaction.filterFiles和TransactionalWrite.writeFiles;
- 如果包含whenMatched,
- 調用findTouchedFiles找到所有需要更改的文件(用withColumn把列編號和文件名加到數據上,然后inner join找到match的數據);
- 然后調用writeAllChanges方法處理需要改變的數據,具體流程是對sourceDF(merge的目標df)和targetDF(上一步找出來的delta文件DF)做full join,然后使用JoinedRowProcessor.processPartition處理相應的邏輯,最后writeFiles寫數據,然后remove找出的delta文件。
- 提交commit,然后recordDeltaEvent記錄本次的MergeStats。
- 使用MergeInto.resolveReferences解析mergeClause。首先會檢查merge的語法規則;
def execute(): Unit = {
val sparkSession = targetTable.toDF.sparkSession
val resolvedMergeInto =
MergeInto.resolveReferences(mergePlan)(tryResolveReferences(sparkSession) _) //解析
if (!resolvedMergeInto.resolved) {
throw DeltaErrors.analysisException("Failed to resolve\n", plan = Some(resolvedMergeInto))
}
// Preprocess the actions and verify
val mergeIntoCommand = PreprocessTableMerge(sparkSession.sessionState.conf)(resolvedMergeInto) //封裝
sparkSession.sessionState.analyzer.checkAnalysis(mergeIntoCommand) //檢查LogicalPlan
mergeIntoCommand.run(sparkSession) //執行
}