隨着公司平台用戶數量與表數量的不斷增多,各種表之間的數據流向也變得更加復雜,特別是某個任務中會對源表讀取並進行一系列復雜的變換后又生成新的數據表,因此需要一套表血緣關系解析機制能清晰地解析出每個任務所形成的表血緣關系鏈。
實現思路:
spark對sql的操作會形成一個dataframe,dataframe中的logicplan包含了sql的語法樹,通過對logicplan的語法樹解析可以獲取當前stage所操作的輸入表和輸出表,將整套表關系鏈連接起來,再去除中間表即可獲取當前作業的輸入表和輸出表信息。
核心代碼:
def resolveLogicPlan(plan: LogicalPlan, currentDB:String): (util.Set[DcTable], util.Set[DcTable]) ={ val inputTables = new util.HashSet[DcTable]() val outputTables = new util.HashSet[DcTable]() resolveLogic(plan, currentDB, inputTables, outputTables) Tuple2(inputTables, outputTables) } def resolveLogic(plan: LogicalPlan, currentDB:String, inputTables:util.Set[DcTable], outputTables:util.Set[DcTable]): Unit ={ plan match { case plan: Project => val project = plan.asInstanceOf[Project] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: Union => val project = plan.asInstanceOf[Union] for(child <- project.children){ resolveLogic(child, currentDB, inputTables, outputTables) } case plan: Join => val project = plan.asInstanceOf[Join] resolveLogic(project.left, currentDB, inputTables, outputTables) resolveLogic(project.right, currentDB, inputTables, outputTables) case plan: Aggregate => val project = plan.asInstanceOf[Aggregate] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: Filter => val project = plan.asInstanceOf[Filter] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: Generate => val project = plan.asInstanceOf[Generate] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: RepartitionByExpression => val project = plan.asInstanceOf[RepartitionByExpression] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: SerializeFromObject => val project = plan.asInstanceOf[SerializeFromObject] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: MapPartitions => val project = plan.asInstanceOf[MapPartitions] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: DeserializeToObject => val project = plan.asInstanceOf[DeserializeToObject] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: Repartition => val project = plan.asInstanceOf[Repartition] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: Deduplicate => val project = plan.asInstanceOf[Deduplicate] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: Window => val project = plan.asInstanceOf[Window] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: MapElements => val project = plan.asInstanceOf[MapElements] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: TypedFilter => val project = plan.asInstanceOf[TypedFilter] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: Distinct => val project = plan.asInstanceOf[Distinct] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: SubqueryAlias => val project = plan.asInstanceOf[SubqueryAlias] val childInputTables = new util.HashSet[DcTable]() val childOutputTables = new util.HashSet[DcTable]() resolveLogic(project.child, currentDB, childInputTables, childOutputTables) if(childInputTables.size()>0){ for(table <- childInputTables) inputTables.add(table) }else{ inputTables.add(DcTable(currentDB, project.alias)) } case plan: CatalogRelation => val project = plan.asInstanceOf[CatalogRelation] val identifier = project.tableMeta.identifier val dcTable = DcTable(identifier.database.getOrElse(currentDB), identifier.table) inputTables.add(dcTable) case plan: UnresolvedRelation => val project = plan.asInstanceOf[UnresolvedRelation] val dcTable = DcTable(project.tableIdentifier.database.getOrElse(currentDB), project.tableIdentifier.table) inputTables.add(dcTable) case plan: InsertIntoTable => val project = plan.asInstanceOf[InsertIntoTable] resolveLogic(project.table, currentDB, outputTables, inputTables) resolveLogic(project.query, currentDB, inputTables, outputTables) case plan: CreateTable => val project = plan.asInstanceOf[CreateTable] if(project.query.isDefined){ resolveLogic(project.query.get, currentDB, inputTables, outputTables) } val tableIdentifier = project.tableDesc.identifier val dcTable = DcTable(tableIdentifier.database.getOrElse(currentDB), tableIdentifier.table) outputTables.add(dcTable) case plan: GlobalLimit => val project = plan.asInstanceOf[GlobalLimit] resolveLogic(project.child, currentDB, inputTables, outputTables) case plan: LocalLimit => val project = plan.asInstanceOf[LocalLimit] resolveLogic(project.child, currentDB, inputTables, outputTables) case `plan` => logger.info("******child plan******:\n"+plan) } }
上述代碼是對logicplan做遞歸解析的,當logicplan為LocalLimit、GlobalLimit、Window等類型時,繼續解析其子類型;當logicplan為CataLogRelation、UnresolvedRelation時,解析出的表名作為輸入表;當logicplan為CreateTable、InsertIntoTable時,解析出的表名為輸出表。
這里需要考慮一種特殊情況,某些源表是通過spark.read加載得到的,這樣logicplan解析出來的類型為LogicRDD,不能直接獲取到表名,以下面的python代碼為例:
schema = StructType([StructField('id', IntegerType(), True), StructField('name', StringType(), True), StructField('age', IntegerType(), True)]) rdd = sparkSession.sparkContext.textFile('/user/hive/warehouse/bigdata.db/tdl_spark_test/testdata.txt').map(lambda r:r.split(',')).map(lambda p: Row(int(p[0]), p[1], int(p[2]))) df = sparkSession.createDataFrame(rdd, schema) df.createOrReplaceTempView('tdl_spark_test') sparkSession.sql('create table tdl_file_test as select * from tdl_spark_test')
上述代碼首先通過textFile讀取文件得到rdd,再對rdd進行變換,最后將rdd注冊成dataframe,這里對df的logicplan進行解析會得到LogicRDD,對於這種情況的解決思路是在調用textFile時記錄產生的rdd,解析df的logicplan時獲取其rdd,判斷之前產生的rdd是否為當前rdd的祖先,如果是,則將之前rdd對應的表名計入。
判斷rdd依賴關系的邏輯為:
def checkRddRelationShip(rdd1:RDD[_], rdd2:RDD[_]): Boolean ={ if (rdd1.id == rdd2.id) return true dfsSearch(rdd1, rdd2.dependencies) } def dfsSearch(rdd1:RDD[_], dependencies:Seq[Dependency[_]]): Boolean ={ for(dependency <- dependencies){ if(dependency.rdd.id==rdd1.id) return true
if(dfsSearch(rdd1, dependency.rdd.dependencies)) return true } false }
對LogicRDD的解析為:
case plan: LogicalRDD => val project = plan.asInstanceOf[LogicalRDD] try{ for(rdd <- rddTableMap.keySet()){ if(checkRddRelationShip(rdd, project.rdd)){ val tableName = rddTableMap.get(rdd) val db = StringUtils.substringBefore(tableName, ".") val table = StringUtils.substringAfter(tableName, ".") inputTables.add(DcTable(db, table)) } } }catch { case e:Throwable => logger.error("resolve LogicalRDD error:", e) }
在spark中會生成dataframe的代碼段中通過aspect進行攔截,並且解析dataframe得到表的關系鏈,此時的關系鏈是一張有向無環圖,圖中可能包含中間表,去除掉中間表節點,則得到最終的數據流向圖。
例如上圖的左邊是一張原始的表數據流向,其中tempC和tempE為臨時表,去除這個圖中的臨時表節點,得到右圖的數據流向圖。對於前面給出的python代碼,執行過后獲取的數據流向為:
[bigdata.tdl_spark_test]--->bigdata.tdl_file_test
當然這種解析方式也存在一些缺點,比如首先通過spark.read讀取數據注冊一張臨時表,再將臨時表中的某些字段值拉到本地緩存,然后創建一個空的datadrame,將緩存的字段值直接插入到該df中,由於當前創建的df與之前創建的df已經沒有依賴關系,因此這種情況將無法解析出准確的數據流向。