hive血緣關系之輸入表與目標表的解析


  接了一個新需求:需要做數據倉庫的血緣關系。正所謂兵來將擋水來土掩,那咱就動手吧。

  血緣關系是數據治理的一塊,其實有專門的第三方數據治理框架,但考慮到目前的線上環境已經趨於穩定,引入新的框架無疑是勞民傷財,傷筋動骨,所以就想以最小的代價把這個事情給做了。目前我們考慮做的血緣關系呢只是做輸入表和輸出表,最后會形成一張表與表之間的鏈路圖。這個東西的好處就是有助於倉庫人員梳理業務,后面可能還會做字段之間的血緣關系等,后面做了再說,今天只是記錄一下輸入表和輸出表的血緣關系。

  我們線上的環境用來做etl的是hive sql和spark sql,所以想到的就是將hive sql和spark sql都攔截下來,然后通過語法解析,解析出其中的輸入表和輸出表。搞完這個這事就算是大功告成了。

  首先第一步是攔截hive sql和spark sql,這個呢我們是直接在hive和spark的源碼中切入了一小段代碼用來攔截sql,然后將攔截到的sql存入到mysql中,這個暫且不做記錄,后續補充,這里的重點是sql的解析。

  其實做hive的血緣關系分析在源碼中是有一個類可以參考的:org.apache.hadoop.hive.ql.tools.LineageInfo, 不過呢,這個例子不全面,不能覆蓋到我們線上的情況。比如 hive中的with語法,create table語法就不能覆蓋到,好巧不巧,跟倉庫的同事聊過之后,with這種語法也是用的很多的,所以只需要在這個例子上加上一些東東,就基本可以滿足我們的需求啦。

  總結一下:其實做表與表之間的血緣關系只需要考慮到下面幾種語法就差不多了:with, create table , insert, select; 可能我說的不全面,但我目前了解的情況大概就是這些。下面是hive的解析的核心代碼:

  

public class LineageUtils implements NodeProcessor { // 存放輸入表
    TreeSet<String> inputTableList = new TreeSet<String>(); // 存放目標表
    TreeSet<String> outputTableList = new TreeSet<String>(); //存放with子句中的別名, 最終的輸入表是 inputTableList減去withTableList
    TreeSet<String> withTableList = new TreeSet<String>(); public TreeSet getInputTableList() { return inputTableList; } public TreeSet getOutputTableList() { return outputTableList; } public TreeSet getWithTableList() { return withTableList; } public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { ASTNode pt = (ASTNode) nd; switch (pt.getToken().getType()) { //create語句
            case HiveParser.TOK_CREATETABLE: { String createName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) pt.getChild(0)); outputTableList.add(createName); break; } //insert語句
            case HiveParser.TOK_TAB: { // System.out.println(pt.getChildCount() + "tab");
                String insertName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) pt.getChild(0)); outputTableList.add(insertName); // System.out.println("insertName " + insertName);
                break; } //from語句
            case HiveParser.TOK_TABREF: { ASTNode tabTree = (ASTNode) pt.getChild(0); String fromName = (tabTree.getChildCount() == 1) ? BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) : BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) + "." + tabTree.getChild(1); inputTableList.add(fromName); break; } // with.....語句
            case HiveParser.TOK_CTE: { for (int i = 0; i < pt.getChildCount(); i++) { ASTNode temp = (ASTNode) pt.getChild(i); String cteName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) temp.getChild(1)); withTableList.add(cteName); } break; } } return null; } public void getLineageInfo(String query) throws ParseException, SemanticException { ParseDriver pd = new ParseDriver(); ASTNode tree = pd.parse(query); while ((tree.getToken() == null) && (tree.getChildCount() > 0)) { tree = (ASTNode) tree.getChild(0); } inputTableList.clear(); outputTableList.clear(); withTableList.clear(); Map<Rule, NodeProcessor> rules = new LinkedHashMap<Rule, NodeProcessor>(); Dispatcher disp = new DefaultRuleDispatcher(this, rules, null); GraphWalker ogw = new DefaultGraphWalker(disp); ArrayList topNodes = new ArrayList(); topNodes.add(tree); ogw.startWalking(topNodes, null); } //進行測試,sql語句是瞎寫的,但是語法是對的
    public static void main(String[] args) throws IOException, ParseException, SemanticException { //String query = "insert into qc.tables_lins_cnt partition(dt='2016-09-15') select a.x from (select x from cc group by x) a left join yy b on a.id = b.id left join (select * from zz where id=1) c on c.id=b.id"; // String query ="from (select id,name from xx where id=1) a insert overwrite table dsl.dwm_all_als_active_d partition (dt='main') select id group by id insert overwrite table dsl.dwm_all_als_active_d2 partition (dt='main') select name group by name";
        String query = "with q1 as ( select key from src where key = '5'), q2 as ( select key from with1 a inner join with2 b on a.id = b.id) insert overwrite table temp.dt_mobile_play_d_tmp2 partition(dt='2018-07-17') select * from q1 cross join q2"; LineageUtils lep = new LineageUtils(); lep.getLineageInfo(query); System.out.println("Input tables = " + lep.getInputTableList()); System.out.println("Output tables = " + lep.getOutputTableList()); System.out.println("with tables = " + lep.getWithTableList()); } }

  上述代碼的運行需要引入maven依賴:

  <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>1.1.0</version>
  </dependency>

  這樣核心的解析基本上就大功告成啦,spark sql雖然與hive sql有些差別,但是核心的解析還是照樣可以用到這個類的,只不過有些地方需要注意,后面會繼續進行記錄。

 

  注:這個類只是進行解析的工具類,還有一些細節需要進行考慮,比如sql語句中沒有帶 database,也就是如果sql中的表不是 database.tablename的形式,該怎么處理?這個時候就需要考慮上下文中的切庫(比如 use temp)處理了, 不過這不是什么大問題,是可以解決的。我們想到的解決步驟就在攔截sql的那一層進行的處理,如果有切庫的操作,就先把庫記錄下來,等sql解析完成之后,再去遍歷哪些表沒有帶數據庫,將沒有帶庫的表面拼接上先前記錄的庫即可。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM