date: 2020-07-29 09:54:00
updated: 2020-08-04 17:09:00
血緣分析
1. LineageLogger
首先需要對 org.apache.hadoop.hive.ql.hooks.LineageLogger 這個類進行改寫,原類會把 lineage 字段依賴信息打印到日志里去,但是現在我們需要把 lineage 信息直接 return 回來。
String lineage = out.toString();
if (testMode) {
log(lineage);
} else {
LOG.info(lineage);
}
=>
String lineage = out.toString();
return lineage;
2. 添加 hook
Hive 提供了多個 hook 給開發者調用,對於字段分析來說,需要在 conf 里添加 hConf.set("hive.exec.post.hooks", "org.apache.hadoop.hive.ql.hooks.LineageLogger")
3. LineageInfo
package cn.edata.StageTest;
import cn.edata.Lineage.LineageLogger;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveVariableSource;
import org.apache.hadoop.hive.conf.VariableSubstitution;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryDisplay;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx;
import org.apache.hadoop.hive.ql.parse.*;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.processors.SetProcessor;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.CommonDataSource;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* 改寫hive本身編譯過程,獲取字段依賴關系
*/
public class LineageInfo {
private static final String KEY_ADD = "add ";
private static final String KEY_DROP = "drop ";
private static final String KEY_SET = "set ";
private static final String KEY_FUNCTION = "create temporary function ";
private static final SetProcessor setProcessor = new SetProcessor();
static Logger LOG = LoggerFactory.getLogger("LineageInfo");
private static final LineageLogger lineageLogger = new LineageLogger();
public void getLineageInfo(HiveConf conf, String filePath, boolean testTableDenpendency) throws LockException, IOException, ParseException, SemanticException {
SessionState ss = SessionState.start(conf);
ss.initTxnMgr(conf);
System.out.println("filePath: " + filePath);
// TODO: 2020/8/3 后續添加對直接讀取文件的支持
String command2 = "select * from model_ennenergy_ccs.a_md_ccs_common_h limit 2";
List<String> commandList = new LinkedList<>();
commandList.add("use model_ennenergy_ccs");
commandList.add("select * from a_md_ccs_common_h limit 2");
for(String command : commandList){
String lowerSql = command.toLowerCase();
// add / drop 無需解析
if (lowerSql.startsWith(KEY_ADD) || lowerSql.startsWith(KEY_DROP)) {
continue;
}
// 設置參數
if (lowerSql.startsWith(KEY_SET)) {
setProcessor.run(command.substring(KEY_SET.length()));
continue;
}
command = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return ss.getHiveVariables();
}
}).substitute(conf, command);
Context ctx = new Context(conf);
ctx.setCmd(command);
System.out.println("ctx: " + ctx);
ASTNode tree;
try {
ParseDriver pd = new ParseDriver();
tree = pd.parse(command, ctx);
tree = ParseUtils.findRootNonNullToken(tree);
} catch (ParseException e) {
throw e;
}
System.out.println("tree: " + tree);
// 切換數據庫
if (tree.getToken().getType() == HiveParser.TOK_SWITCHDATABASE) {
ss.setCurrentDatabase(tree.getChild(0).getText());
continue;
}
ss.setupQueryCurrentTimestamp();
System.out.println("ss: " + ss);
// 6.2.0 版本
// QueryState queryState = new QueryState(conf);
// System.out.println("queryState: " + queryState);
// BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
sem.analyze(tree, ctx);
sem.validate();
System.out.println("sem: " + sem);
Schema schema = getSchame(sem, conf);
// System.out.println("schema: " + schema);
// 查詢計划
// QueryDisplay queryDisplay = new QueryDisplay();
// queryDisplay.setQueryStr(command);
// queryDisplay.setQueryId(QueryPlan.makeQueryId());
// QueryPlan queryPlan = new QueryPlan(command, sem, 0L, QueryPlan.makeQueryId(), SessionState.get().getHiveOperation(), schema, queryDisplay);
// System.out.println("queryPlan: " + queryPlan);
List<FieldSchema> fieldSchemas = schema.getFieldSchemas();
// System.out.println("fieldSchemas: " + fieldSchemas);
// 部分語句不能完整的分析出schema
// 例:ALTER TABLE model_icome_cheme.cheme_icome_kpi_month_h SET
// TBLPROPERTIES('comment' = '化工月指標')
// 可以針對 alter 開頭的語句進行過濾,無需解析
if (fieldSchemas == null) {
continue;
}
HashSet<WriteEntity> outputs = sem.getOutputs();
System.out.println("outputs: " + outputs);
// 字段血緣分析信息輸出
LineageCtx.Index index = ss.getLineageState().getIndex();
// 6.2.0 版本
// LineageCtx.Index index = queryState.getLineageState().getIndex();
System.out.println("index: " + index);
String result = lineageLogger.getJsonString(command, fieldSchemas, outputs, index);
System.out.println("result: " + result);
if(testTableDenpendency){
DependencyInfo dependencyInfo = new DependencyInfo();
dependencyInfo.getDependencyInfo(result);
}
}
}
// 拿過來Driver類下的方法
private Schema getSchame(BaseSemanticAnalyzer sem, HiveConf conf) {
Schema schema = null;
if (sem != null) {
if (sem.getResultSchema() != null) {
List<FieldSchema> lst = sem.getResultSchema();
schema = new Schema(lst, (Map) null);
} else if (sem.getFetchTask() != null) {
FetchTask ft = sem.getFetchTask();
TableDesc td = ft.getTblDesc();
if (td == null && ft.getWork() != null && ((FetchWork) ft.getWork()).getPartDesc() != null && ((FetchWork) ft.getWork()).getPartDesc().size() > 0) {
td = ((PartitionDesc) ((FetchWork) ft.getWork()).getPartDesc().get(0)).getTableDesc();
}
if (td != null) {
String tableName = "result";
List lst = null;
try {
lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer(conf));
} catch (Exception e) {
System.out.println("Error getting schema: " + StringUtils.stringifyException(e));
}
if (lst != null) {
schema = new Schema(lst, (Map) null);
}
}
}
}
if (schema == null) {
schema = new Schema();
}
// System.out.println("Returning Hive schema: " + schema);
return schema;
}
}
4. TableDependency
除了字段的血緣分析,還可以進一步去獲取表之間的依賴關系
package cn.edata.StageTest;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 拿到返回的字段依賴信息,解析數據,獲取表之間的依賴關系
*/
public class DependencyInfo {
public void getDependencyInfo(String lineageInfo){
JSONObject result = JSONObject.parseObject(lineageInfo);
System.out.println("#########");
JSONArray verticesArray = JSONArray.parseArray(result.getString("vertices"));
HashSet<String> modelTables = new HashSet<>();
HashSet<String> originTables = new HashSet<>();
verticesArray.forEach(data->{
JSONObject tmp = JSONObject.parseObject(data.toString());
String vertextId = tmp.getString("vertexId");
if(vertextId.startsWith("model"))
modelTables.add(vertextId.split("\\.")[0]+"."+vertextId.split("\\.")[1]);
if(vertextId.startsWith("origin"))
originTables.add(vertextId);
});
System.out.println("####");
System.out.println("modelTables: " + modelTables.toString());
System.out.println("originTables: " + originTables.toString());
}
}
5. pom.xml
<properties>
<!-- 查詢-->
<!-- <cdh.hadoop.version>3.0.0-cdh6.2.0</cdh.hadoop.version>-->
<!-- <cdh.hive.version>2.1.1-cdh6.2.0</cdh.hive.version>-->
<cdh.hadoop.version>2.6.0-cdh5.14.4</cdh.hadoop.version>
<cdh.hive.version>1.1.0-cdh5.14.4</cdh.hive.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${cdh.hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${cdh.hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${cdh.hive.version}</version>
<exclusions>
<exclusion>
<artifactId>
hadoop-yarn-server-resourcemanager
</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<!-- <exclusion>-->
<!-- <artifactId>gson</artifactId>-->
<!-- <groupId>com.google.code.gson</groupId>-->
<!-- </exclusion>-->
<exclusion>
<artifactId>hive-shims</artifactId>
<groupId>org.apache.hive</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${cdh.hive.version}</version>
<exclusions>
<exclusion>
<artifactId>hadoop-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hbase-server</artifactId>
<groupId>org.apache.hbase</groupId>
</exclusion>
<exclusion>
<artifactId>jasper-compiler</artifactId>
<groupId>tomcat</groupId>
</exclusion>
<exclusion>
<artifactId>jasper-runtime</artifactId>
<groupId>tomcat</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-hdfs</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-rewrite</artifactId>
<groupId>org.eclipse.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-server</artifactId>
<groupId>org.eclipse.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-runner</artifactId>
<groupId>org.eclipse.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-servlet</artifactId>
<groupId>org.eclipse.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-webapp</artifactId>
<groupId>org.eclipse.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-registry</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>jetty</artifactId>
<groupId>org.mortbay.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>hbase-hadoop2-compat</artifactId>
<groupId>org.apache.hbase</groupId>
</exclusion>
<exclusion>
<artifactId>gson</artifactId>
<groupId>com.google.code.gson</groupId>
</exclusion>
<exclusion>
<artifactId>hive-shims-0.23</artifactId>
<groupId>org.apache.hive.shims</groupId>
</exclusion>
<exclusion>
<artifactId>hive-common</artifactId>
<groupId>org.apache.hive</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${cdh.hadoop.version}</version>
</dependency>
</dependencies>