接到新的需求,在spark中增加埋點,解析任務的血緣關系,包括sql和代碼方式,不包括中間臨時視圖(createOrReplaceTempView(XXX表))。
有位同學已經https://www.cnblogs.com/wuxilc/p/9326130.html 做了hive解析相關的,但是spark部分因為hive parseDriver解析不了。
還是在spark中搞搞吧。
spark BaseSessionStateBuilder 類
/**
* Build the [[SessionState]].
*/
def build(): SessionState = {
new SessionState(
session.sharedState,
conf,
experimentalMethods,
functionRegistry,
udfRegistration,
() => catalog,
sqlParser,
() => analyzer,
() => optimizer,
planner,
streamingQueryManager,
listenerManager,
() => resourceLoader,
createQueryExecution,
createClone)
}
}
optimizer 是spark語法優化器,HiveSessionStateBilder 繼承了BaseSessionStateBuilder類
Spark 所有優化器都繼承抽象類Optimizer
/**
* Abstract class all optimizers should inherit of, contains the standard batches (extending
* Optimizers can override this.
*/
abstract class Optimizer(sessionCatalog: SessionCatalog)
extends RuleExecutor[LogicalPlan] {
在優化器 rule添加新的匹配規則
HiveSessionStateBuilder 類中修改
override lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods) {
override def batches: Seq[Batch] = super.batches :+
Batch("Determine stats of partitionedTable", Once,
DeterminePartitionedTableStats(sparkSession)) :+
Batch("Collect read and write tables", Once, DependencyCollect(sparkSession))
}
添加如下代碼 匹配inset、create等語句解析出輸入表輸出表。
case class DependencyCollect(sparkSession: SparkSession) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
if (sparkSession.sparkContext.conf.getBoolean("spark.collectDependencies", true)) {
val readTables = mutable.HashSet[String]()
val writeTables = mutable.HashSet[String]()
plan transformDown {
case a@InsertIntoHiveTable(table: CatalogTable,_,_,_,_,_) =>
writeTables += s"${fillBlankDatabase(table)}.${table.identifier.table}"
a
case i@InsertIntoTable(table: HiveTableRelation, _, _, _, _) =>
writeTables += s"${table.tableMeta.database}.${table.tableMeta.identifier.table}"
i
case c@CreateTable(table: CatalogTable, _, _) =>
writeTables += s"${fillBlankDatabase(table)}.${table.identifier.table}"
c
case d@CreateTableCommand(table: CatalogTable, _) =>
writeTables += s"${fillBlankDatabase(table)}.${table.identifier.table}"
d
case p@PhysicalOperation(_, _, table: HiveTableRelation) =>
readTables += s"${table.tableMeta.database}.${table.tableMeta.identifier.table}"
p
}
if (readTables.size > 0 || writeTables.size > 0) {
logInfo(String.format("src table -> %s target table -> %s", readTables.mkString(","), writeTables.mkString(",")))
AsyncExecution.AsycnHandle(new CallChain.Event(s"${readTables.mkString(",")}#${writeTables.mkString(",")}", AsyncExecution.getSparkAppName(sparkSession.sparkContext.conf), "bloodlineage"))
sparkSession.sparkContext.listenerBus.post(DependencyEvent(readTables, writeTables))
}
}
plan
}
private def fillBlankDatabase(table: CatalogTable): String = {
var database = ""
if (table.database.isEmpty) {
database = sparkSession.sessionState.catalog.getCurrentDatabase
} else {
database = table.database
}
database
}