元數據管理是數據倉庫的核心,它不僅定義了數據倉庫有什么,還指明了數據倉庫中數據的內容和位置,刻畫了數據的提取和轉換規則,存儲了與數據倉庫主題有關的各種商業信息。本文主要介紹Hive Hook和MetaStore Listener,使用這些功能可以進行自動的元數據管理。通過本文你可以了解到:
- 元數據管理
- Hive Hooks 和 Metastore Listeners
- Hive Hooks基本使用
- Metastore Listeners基本使用
元數據管理
元數據定義
按照傳統的定義,元數據( Metadata )是關於數據的數據。元數據打通了源數據、數據倉庫、數據應用,記錄了數據從產生到消費的全過程。元數據主要記錄數據倉庫中模型的定義、各層級間的映射關系、監控數據倉庫的數據狀態及ETL 的任務運行狀態。在數據倉庫系統中,元數據可以幫助數據倉庫管理員和開發人員非常方便地找到他們所關心的數據,用於指導其進行數據管理和開發工作,提高工作效率。將元數據按用途的不同分為兩類:技術元數據( Technical Metadata)和業務元數據( Business Metadata )。技術元數據是存儲關於數據倉庫系統技術細節的數據,是用於開發和管理數據倉庫使用的數據。
元數據分類
技術元數據
- 分布式計算系統存儲元數據
如Hive表、列、分區等信息。記錄了表的表名。分區信息、責任人信息、文件大小、表類型,以及列的字段名、字段類型、字段備注、是否是分區字段等信息。
-
分布式計算系統運行元數據
類似於Hive 的Job 日志,包括作業類型、實例名稱、輸入輸出、SQL 、運行參數、執行時間等。
-
任務調度元數據
任務的依賴類型、依賴關系等,以及不同類型調度任務的運行日志等。
業務元數據
業務元數據從業務角度描述了數據倉庫中的數據,它提供了介於使用者和實際系統之間的語義層,使得不懂計算機技術的業務人員也能夠“ 讀懂”數據倉庫中的數據。常見的業務元數據有:如維度及屬性、業務過程、指標等的規范化定義,用於更好地管理和使用數據;數據應用元數據,如數據報表、數據產品等的配置和運行元數據。
元數據應用
數據的真正價值在於數據驅動決策,通過數據指導運營。通過數據驅動的方法,我們能夠判斷趨勢,從而展開有效行動,幫助自己發現問題,推動創新或解決方案的產生。這就是數據化運營。同樣,對於元數據,可以用於指導數據相關人員進行日常工作,實現數據化“運營”。比如對於數據使用者,可以通過元數據讓其快速找到所需要的數據;對於ETL 工程師,可以通過元數據指導其進行模型設計、任務優化和任務下線等各種日常ETL 工作;對於運維工程師,可以通過元數據指導其進行整個集群的存儲、計算和系統優化等運維工作。
Hive Hooks 和 Metastore Listeners
Hive Hooks
關於數據治理和元數據管理框架,業界有許多開源的系統,比如Apache Atlas,這些開源的軟件可以在復雜的場景下滿足元數據管理的需求。其實Apache Atlas對於Hive的元數據管理,使用的是Hive的Hooks。需要進行如下配置:
<property>
<name>hive.exec.post.hooks</name>
<value>org.apache.atlas.hive.hook.HiveHook<value/>
</property>
通過Hook監聽Hive的各種事件,比如創建表,修改表等,然后按照特定的格式把收集的數據推送到Kafka,最后消費元數據並存儲。
Hive Hooks分類
那么,究竟什么是Hooks呢?
Hooks 是一種事件和消息機制, 可以將事件綁定在內部 Hive 的執行流程中,而無需重新編譯 Hive。Hook 提供了擴展和繼承外部組件的方式。根據不同的 Hook 類型,可以在不同的階段運行。關於Hooks的類型,主要分為以下幾種:
- hive.exec.pre.hooks
從名稱可以看出,在執行引擎執行查詢之前被調用。這個需要在 Hive 對查詢計划進行過優化之后才可以使用。使用該Hooks需要實現接口:org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext,具體在hive-site.xml中的配置如下:
<property>
<name>hive.exec.pre.hooks</name>
<value>實現類的全限定名<value/>
</property>
- hive.exec.post.hooks
在執行計划執行結束結果返回給用戶之前被調用。使用時需要實現接口:org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext,具體在hive-site.xml中的配置如下:
<property>
<name>hive.exec.post.hooks</name>
<value>實現類的全限定名<value/>
</property>
- hive.exec.failure.hooks
在執行計划失敗之后被調用。使用時需要實現接口:org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext,具體在hive-site.xml中的配置如下:
<property>
<name>hive.exec.failure.hooks</name>
<value>實現類的全限定名<value/>
</property>
- hive.metastore.init.hooks
HMSHandler初始化是被調用。使用時需要實現接口:org.apache.hadoop.hive.metastore.MetaStoreInitListener,具體在hive-site.xml中的配置如下:
<property>
<name>hive.metastore.init.hooks</name>
<value>實現類的全限定名<value/>
</property>
- hive.exec.driver.run.hooks
在Driver.run開始或結束時運行,使用時需要實現接口:org.apache.hadoop.hive.ql.HiveDriverRunHook,具體在hive-site.xml中的配置如下:
<property>
<name>hive.exec.driver.run.hooks</name>
<value>實現類的全限定名<value/>
</property>
- hive.semantic.analyzer.hook
Hive 對查詢語句進行語義分析的時候調用。使用時需要集成抽象類:org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook,具體在hive-site.xml中的配置如下:
<property>
<name>hive.semantic.analyzer.hook</name>
<value>實現類的全限定名<value/>
</property>
Hive Hooks的優缺點
- 優點
- 可以很方便地在各種查詢階段嵌入或者運行自定義的代碼
- 可以被用作更新元數據
- 缺點
- 當使用Hooks時,獲取到的元數據通常需要進一步解析,否則很難理解
- 會影響查詢的過程
對於Hive Hooks,本文將給出hive.exec.post.hook的使用案例,該Hooks會在查詢執行之后,返回結果之前運行。
Metastore Listeners
所謂Metastore Listeners,指的是對Hive metastore的監聽。用戶可以自定義一些代碼,用來使用對元數據的監聽。
當我們看HiveMetaStore這個類的源碼時,會發現:在創建HiveMetaStore的init()方法中,同時創建了三種Listener,分別為MetaStorePreEventListener,MetaStoreEventListener和MetaStoreEndFunctionListener,這些Listener用於對每一步事件的監聽。
public class HiveMetaStore extends ThriftHiveMetastore {
// ...省略代碼
public static class HMSHandler extends FacebookBase implements
IHMSHandler {
// ...省略代碼
public void init() throws MetaException {
// ...省略代碼
// 獲取MetaStorePreEventListener
preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class,
hiveConf,
hiveConf.getVar(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS));
// 獲取MetaStoreEventListener
listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class,
hiveConf,
hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS));
listeners.add(new SessionPropertiesListener(hiveConf));
// 獲取MetaStoreEndFunctionListener
endFunctionListeners = MetaStoreUtils.getMetaStoreListeners(
MetaStoreEndFunctionListener.class,
hiveConf,
hiveConf.getVar(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS));
// ...省略代碼
}
}
}
Metastore Listeners分類
- hive.metastore.pre.event.listeners
需要擴展此抽象類,以提供在metastore上發生特定事件之前需要執行的操作實現。在metastore上發生事件之前,將調用這些方法。
使用時需要繼承抽象類:org.apache.hadoop.hive.metastore.MetaStorePreEventListener,在Hive-site.xml中的配置為:
<property>
<name>hive.metastore.pre.event.listeners</name>
<value>實現類的全限定名</value>
</property>
- hive.metastore.event.listeners
需要擴展此抽象類,以提供在metastore上發生特定事件時需要執行的操作實現。每當Metastore上發生事件時,就會調用這些方法。
使用時需要繼承抽象類:org.apache.hadoop.hive.metastore.MetaStoreEventListener,在Hive-site.xml中的配置為:
<property>
<name>hive.metastore.event.listeners</name>
<value>實現類的全限定名</value>
</property>
- hive.metastore.end.function.listeners
每當函數結束時,將調用這些方法。
使用時需要繼承抽象類:org.apache.hadoop.hive.metastore.MetaStoreEndFunctionListener ,在Hive-site.xml中的配置為:
<property>
<name>hive.metastore.end.function.listeners</name>
<value>實現類的全限定名</value>
</property>
Metastore Listeners優缺點
- 優點
- 元數據已經被解析好了,很容易理解
- 不影響查詢的過程,是只讀的
- 缺點
- 不靈活,僅僅能夠訪問屬於當前事件的對象
對於metastore listener,本文會給出MetaStoreEventListener的使用案例,具體會實現兩個方法:onCreateTable和onAlterTable
Hive Hooks基本使用
代碼
具體實現代碼如下:
public class CustomPostHook implements ExecuteWithHookContext {
private static final Logger LOGGER = LoggerFactory.getLogger(CustomPostHook.class);
// 存儲Hive的SQL操作類型
private static final HashSet<String> OPERATION_NAMES = new HashSet<>();
// HiveOperation是一個枚舉類,封裝了Hive的SQL操作類型
// 監控SQL操作類型
static {
// 建表
OPERATION_NAMES.add(HiveOperation.CREATETABLE.getOperationName());
// 修改數據庫屬性
OPERATION_NAMES.add(HiveOperation.ALTERDATABASE.getOperationName());
// 修改數據庫屬主
OPERATION_NAMES.add(HiveOperation.ALTERDATABASE_OWNER.getOperationName());
// 修改表屬性,添加列
OPERATION_NAMES.add(HiveOperation.ALTERTABLE_ADDCOLS.getOperationName());
// 修改表屬性,表存儲路徑
OPERATION_NAMES.add(HiveOperation.ALTERTABLE_LOCATION.getOperationName());
// 修改表屬性
OPERATION_NAMES.add(HiveOperation.ALTERTABLE_PROPERTIES.getOperationName());
// 表重命名
OPERATION_NAMES.add(HiveOperation.ALTERTABLE_RENAME.getOperationName());
// 列重命名
OPERATION_NAMES.add(HiveOperation.ALTERTABLE_RENAMECOL.getOperationName());
// 更新列,先刪除當前的列,然后加入新的列
OPERATION_NAMES.add(HiveOperation.ALTERTABLE_REPLACECOLS.getOperationName());
// 創建數據庫
OPERATION_NAMES.add(HiveOperation.CREATEDATABASE.getOperationName());
// 刪除數據庫
OPERATION_NAMES.add(HiveOperation.DROPDATABASE.getOperationName());
// 刪除表
OPERATION_NAMES.add(HiveOperation.DROPTABLE.getOperationName());
}
@Override
public void run(HookContext hookContext) throws Exception {
assert (hookContext.getHookType() == HookType.POST_EXEC_HOOK);
// 執行計划
QueryPlan plan = hookContext.getQueryPlan();
// 操作名稱
String operationName = plan.getOperationName();
logWithHeader("執行的SQL語句: " + plan.getQueryString());
logWithHeader("操作名稱: " + operationName);
if (OPERATION_NAMES.contains(operationName) && !plan.isExplain()) {
logWithHeader("監控SQL操作");
Set<ReadEntity> inputs = hookContext.getInputs();
Set<WriteEntity> outputs = hookContext.getOutputs();
for (Entity entity : inputs) {
logWithHeader("Hook metadata輸入值: " + toJson(entity));
}
for (Entity entity : outputs) {
logWithHeader("Hook metadata輸出值: " + toJson(entity));
}
} else {
logWithHeader("不在監控范圍,忽略該hook!");
}
}
private static String toJson(Entity entity) throws Exception {
ObjectMapper mapper = new ObjectMapper();
// entity的類型
// 主要包括:
// DATABASE, TABLE, PARTITION, DUMMYPARTITION, DFS_DIR, LOCAL_DIR, FUNCTION
switch (entity.getType()) {
case DATABASE:
Database db = entity.getDatabase();
return mapper.writeValueAsString(db);
case TABLE:
return mapper.writeValueAsString(entity.getTable().getTTable());
}
return null;
}
/** * 日志格式 * * @param obj */
private void logWithHeader(Object obj) {
LOGGER.info("[CustomPostHook][Thread: " + Thread.currentThread().getName() + "] | " + obj);
}
}
使用過程解釋
首先將上述代碼編譯成jar包,放在$HIVE_HOME/lib目錄下,或者使用在Hive的客戶端中執行添加jar包的命令:
0: jdbc:hive2://localhost:10000> add jar /opt/softwares/com.jmx.hive-1.0-SNAPSHOT.jar;
接着配置Hive-site.xml文件,為了方便,我們直接使用客戶端命令進行配置:
0: jdbc:hive2://localhost:10000> set hive.exec.post.hooks=com.jmx.hooks.CustomPostHook;
查看表操作
上面的代碼中我們對一些操作進行了監控,當監控到這些操作時會觸發一些自定義的代碼(比如輸出日志)。當我們在Hive的beeline客戶端中輸入下面命令時:
0: jdbc:hive2://localhost:10000> show tables;
在$HIVE_HOME/logs/hive.log文件可以看到:
[CustomPostHook][Thread: cab9a763-c63e-4f25-9f9a-affacb3cecdb main] | 執行的SQL語句: show tables
[CustomPostHook][Thread: cab9a763-c63e-4f25-9f9a-affacb3cecdb main] | 操作名稱: SHOWTABLES
[CustomPostHook][Thread: cab9a763-c63e-4f25-9f9a-affacb3cecdb main] |不在監控范圍,忽略該hook!
上面的查看表操作,不在監控范圍,所以沒有相對應的元數據日志。
建表操作
當我們在Hive的beeline客戶端中創建一張表時,如下:
CREATE TABLE testposthook(
id int COMMENT "id",
name string COMMENT "姓名"
)COMMENT "建表_測試Hive Hooks"
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/user/hive/warehouse/';
觀察hive.log日志:
上面的Hook metastore輸出值有兩個:第一個是數據庫的元數據信息,第二個是表的元數據信息
- 數據庫元數據
{
"name":"default",
"description":"Default Hive database",
"locationUri":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",
"parameters":{
},
"privileges":null,
"ownerName":"public",
"ownerType":"ROLE",
"setParameters":true,
"parametersSize":0,
"setOwnerName":true,
"setOwnerType":true,
"setPrivileges":false,
"setName":true,
"setDescription":true,
"setLocationUri":true
}
- 表元數據
{
"tableName":"testposthook",
"dbName":"default",
"owner":"anonymous",
"createTime":1597985444,
"lastAccessTime":0,
"retention":0,
"sd":{
"cols":[
],
"location":null,
"inputFormat":"org.apache.hadoop.mapred.SequenceFileInputFormat",
"outputFormat":"org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
"compressed":false,
"numBuckets":-1,
"serdeInfo":{
"name":null,
"serializationLib":"org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe",
"parameters":{
"serialization.format":"1"
},
"setSerializationLib":true,
"setParameters":true,
"parametersSize":1,
"setName":false
},
"bucketCols":[
],
"sortCols":[
],
"parameters":{
},
"skewedInfo":{
"skewedColNames":[
],
"skewedColValues":[
],
"skewedColValueLocationMaps":{
},
"skewedColNamesIterator":[
],
"skewedColValuesSize":0,
"skewedColValuesIterator":[
],
"skewedColValueLocationMapsSize":0,
"setSkewedColNames":true,
"setSkewedColValues":true,
"setSkewedColValueLocationMaps":true,
"skewedColNamesSize":0
},
"storedAsSubDirectories":false,
"colsSize":0,
"setParameters":true,
"parametersSize":0,
"setOutputFormat":true,
"setSerdeInfo":true,
"setBucketCols":true,
"setSortCols":true,
"setSkewedInfo":true,
"colsIterator":[
],
"setCompressed":false,
"setNumBuckets":true,
"bucketColsSize":0,
"bucketColsIterator":[
],
"sortColsSize":0,
"sortColsIterator":[
],
"setStoredAsSubDirectories":false,
"setCols":true,
"setLocation":false,
"setInputFormat":true
},
"partitionKeys":[
],
"parameters":{
},
"viewOriginalText":null,
"viewExpandedText":null,
"tableType":"MANAGED_TABLE",
"privileges":null,
"temporary":false,
"rewriteEnabled":false,
"partitionKeysSize":0,
"setDbName":true,
"setSd":true,
"setParameters":true,
"setCreateTime":true,
"setLastAccessTime":false,
"parametersSize":0,
"setTableName":true,
"setPrivileges":false,
"setOwner":true,
"setPartitionKeys":true,
"setViewOriginalText":false,
"setViewExpandedText":false,
"setTableType":true,
"setRetention":false,
"partitionKeysIterator":[
],
"setTemporary":false,
"setRewriteEnabled":false
}
我們發現上面的表元數據信息中,**cols[]**列沒有數據,即沒有建表時的字段id
和字段name
的信息。如果要獲取這些信息,可以執行下面的命令:
ALTER TABLE testposthook
ADD COLUMNS (age int COMMENT '年齡');
再次觀察日志信息:
上面的日志中,Hook metastore只有一個輸入和一個輸出:都表示table的元數據信息。
- 輸入
{
"tableName":"testposthook",
"dbName":"default",
"owner":"anonymous",
"createTime":1597985445,
"lastAccessTime":0,
"retention":0,
"sd":{
"cols":[
{
"name":"id",
"type":"int",
"comment":"id",
"setName":true,
"setType":true,
"setComment":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setName":true,
"setType":true,
"setComment":true
}
],
"location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",
"inputFormat":"org.apache.hadoop.mapred.TextInputFormat",
"outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"compressed":false,
"numBuckets":-1,
"serdeInfo":{
"name":null,
"serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"parameters":{
"serialization.format":" ",
"field.delim":" "
},
"setSerializationLib":true,
"setParameters":true,
"parametersSize":2,
"setName":false
},
"bucketCols":[
],
"sortCols":[
],
"parameters":{
},
"skewedInfo":{
"skewedColNames":[
],
"skewedColValues":[
],
"skewedColValueLocationMaps":{
},
"skewedColNamesIterator":[
],
"skewedColValuesSize":0,
"skewedColValuesIterator":[
],
"skewedColValueLocationMapsSize":0,
"setSkewedColNames":true,
"setSkewedColValues":true,
"setSkewedColValueLocationMaps":true,
"skewedColNamesSize":0
},
"storedAsSubDirectories":false,
"colsSize":2,
"setParameters":true,
"parametersSize":0,
"setOutputFormat":true,
"setSerdeInfo":true,
"setBucketCols":true,
"setSortCols":true,
"setSkewedInfo":true,
"colsIterator":[
{
"name":"id",
"type":"int",
"comment":"id",
"setName":true,
"setType":true,
"setComment":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setName":true,
"setType":true,
"setComment":true
}
],
"setCompressed":true,
"setNumBuckets":true,
"bucketColsSize":0,
"bucketColsIterator":[
],
"sortColsSize":0,
"sortColsIterator":[
],
"setStoredAsSubDirectories":true,
"setCols":true,
"setLocation":true,
"setInputFormat":true
},
"partitionKeys":[
],
"parameters":{
"transient_lastDdlTime":"1597985445",
"comment":"建表_測試Hive Hooks",
"totalSize":"0",
"numFiles":"0"
},
"viewOriginalText":null,
"viewExpandedText":null,
"tableType":"MANAGED_TABLE",
"privileges":null,
"temporary":false,
"rewriteEnabled":false,
"partitionKeysSize":0,
"setDbName":true,
"setSd":true,
"setParameters":true,
"setCreateTime":true,
"setLastAccessTime":true,
"parametersSize":4,
"setTableName":true,
"setPrivileges":false,
"setOwner":true,
"setPartitionKeys":true,
"setViewOriginalText":false,
"setViewExpandedText":false,
"setTableType":true,
"setRetention":true,
"partitionKeysIterator":[
],
"setTemporary":false,
"setRewriteEnabled":true
}
從上面的json中可以看出**“cols”**列的字段元數據信息,我們再來看一下輸出json:
- 輸出
{
"tableName":"testposthook",
"dbName":"default",
"owner":"anonymous",
"createTime":1597985445,
"lastAccessTime":0,
"retention":0,
"sd":{
"cols":[
{
"name":"id",
"type":"int",
"comment":"id",
"setName":true,
"setType":true,
"setComment":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setName":true,
"setType":true,
"setComment":true
}
],
"location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",
"inputFormat":"org.apache.hadoop.mapred.TextInputFormat",
"outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"compressed":false,
"numBuckets":-1,
"serdeInfo":{
"name":null,
"serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"parameters":{
"serialization.format":" ",
"field.delim":" "
},
"setSerializationLib":true,
"setParameters":true,
"parametersSize":2,
"setName":false
},
"bucketCols":[
],
"sortCols":[
],
"parameters":{
},
"skewedInfo":{
"skewedColNames":[
],
"skewedColValues":[
],
"skewedColValueLocationMaps":{
},
"skewedColNamesIterator":[
],
"skewedColValuesSize":0,
"skewedColValuesIterator":[
],
"skewedColValueLocationMapsSize":0,
"setSkewedColNames":true,
"setSkewedColValues":true,
"setSkewedColValueLocationMaps":true,
"skewedColNamesSize":0
},
"storedAsSubDirectories":false,
"colsSize":2,
"setParameters":true,
"parametersSize":0,
"setOutputFormat":true,
"setSerdeInfo":true,
"setBucketCols":true,
"setSortCols":true,
"setSkewedInfo":true,
"colsIterator":[
{
"name":"id",
"type":"int",
"comment":"id",
"setName":true,
"setType":true,
"setComment":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setName":true,
"setType":true,
"setComment":true
}
],
"setCompressed":true,
"setNumBuckets":true,
"bucketColsSize":0,
"bucketColsIterator":[
],
"sortColsSize":0,
"sortColsIterator":[
],
"setStoredAsSubDirectories":true,
"setCols":true,
"setLocation":true,
"setInputFormat":true
},
"partitionKeys":[
],
"parameters":{
"transient_lastDdlTime":"1597985445",
"comment":"建表_測試Hive Hooks",
"totalSize":"0",
"numFiles":"0"
},
"viewOriginalText":null,
"viewExpandedText":null,
"tableType":"MANAGED_TABLE",
"privileges":null,
"temporary":false,
"rewriteEnabled":false,
"partitionKeysSize":0,
"setDbName":true,
"setSd":true,
"setParameters":true,
"setCreateTime":true,
"setLastAccessTime":true,
"parametersSize":4,
"setTableName":true,
"setPrivileges":false,
"setOwner":true,
"setPartitionKeys":true,
"setViewOriginalText":false,
"setViewExpandedText":false,
"setTableType":true,
"setRetention":true,
"partitionKeysIterator":[
],
"setTemporary":false,
"setRewriteEnabled":true
}
該
output
對象不包含新列age
,它表示修改表之前的元數據信息
Metastore Listeners基本使用
代碼
具體實現代碼如下:
public class CustomListener extends MetaStoreEventListener {
private static final Logger LOGGER = LoggerFactory.getLogger(CustomListener.class);
private static final ObjectMapper objMapper = new ObjectMapper();
public CustomListener(Configuration config) {
super(config);
logWithHeader(" created ");
}
// 監聽建表操作
@Override
public void onCreateTable(CreateTableEvent event) {
logWithHeader(event.getTable());
}
// 監聽修改表操作
@Override
public void onAlterTable(AlterTableEvent event) {
logWithHeader(event.getOldTable());
logWithHeader(event.getNewTable());
}
private void logWithHeader(Object obj) {
LOGGER.info("[CustomListener][Thread: " + Thread.currentThread().getName() + "] | " + objToStr(obj));
}
private String objToStr(Object obj) {
try {
return objMapper.writeValueAsString(obj);
} catch (IOException e) {
LOGGER.error("Error on conversion", e);
}
return null;
}
}
使用過程解釋
使用方式與Hooks有一點不同,Hive Hook是與Hiveserver進行交互的,而Listener是與Metastore交互的,即Listener運行在Metastore進程中的。具體使用方式如下:
首先將jar包放在$HIVE_HOME/lib目錄下,然后配置hive-site.xml文件,配置內容為:
<property>
<name>hive.metastore.event.listeners</name>
<value>com.jmx.hooks.CustomListener</value>
<description/>
</property>
配置完成之后,需要重新啟動元數據服務:
bin/hive --service metastore &
建表操作
CREATE TABLE testlistener(
id int COMMENT "id",
name string COMMENT "姓名"
)COMMENT "建表_測試Hive Listener"
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/user/hive/warehouse/';
觀察hive.log日志:
{
"tableName":"testlistener",
"dbName":"default",
"owner":"anonymous",
"createTime":1597989316,
"lastAccessTime":0,
"retention":0,
"sd":{
"cols":[
{
"name":"id",
"type":"int",
"comment":"id",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setComment":true,
"setType":true,
"setName":true
}
],
"location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",
"inputFormat":"org.apache.hadoop.mapred.TextInputFormat",
"outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"compressed":false,
"numBuckets":-1,
"serdeInfo":{
"name":null,
"serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"parameters":{
"serialization.format":" ",
"field.delim":" "
},
"setSerializationLib":true,
"setParameters":true,
"parametersSize":2,
"setName":false
},
"bucketCols":[
],
"sortCols":[
],
"parameters":{
},
"skewedInfo":{
"skewedColNames":[
],
"skewedColValues":[
],
"skewedColValueLocationMaps":{
},
"setSkewedColNames":true,
"setSkewedColValues":true,
"setSkewedColValueLocationMaps":true,
"skewedColNamesSize":0,
"skewedColNamesIterator":[
],
"skewedColValuesSize":0,
"skewedColValuesIterator":[
],
"skewedColValueLocationMapsSize":0
},
"storedAsSubDirectories":false,
"setCols":true,
"setOutputFormat":true,
"setSerdeInfo":true,
"setBucketCols":true,
"setSortCols":true,
"colsSize":2,
"colsIterator":[
{
"name":"id",
"type":"int",
"comment":"id",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setComment":true,
"setType":true,
"setName":true
}
],
"setCompressed":true,
"setNumBuckets":true,
"bucketColsSize":0,
"bucketColsIterator":[
],
"sortColsSize":0,
"sortColsIterator":[
],
"setStoredAsSubDirectories":true,
"setParameters":true,
"setLocation":true,
"setInputFormat":true,
"parametersSize":0,
"setSkewedInfo":true
},
"partitionKeys":[
],
"parameters":{
"transient_lastDdlTime":"1597989316",
"comment":"建表_測試Hive Listener",
"totalSize":"0",
"numFiles":"0"
},
"viewOriginalText":null,
"viewExpandedText":null,
"tableType":"MANAGED_TABLE",
"privileges":{
"userPrivileges":{
"anonymous":[
{
"privilege":"INSERT",
"createTime":-1,
"grantor":"anonymous",
"grantorType":"USER",
"grantOption":true,
"setGrantOption":true,
"setCreateTime":true,
"setGrantor":true,
"setGrantorType":true,
"setPrivilege":true
},
{
"privilege":"SELECT",
"createTime":-1,
"grantor":"anonymous",
"grantorType":"USER",
"grantOption":true,
"setGrantOption":true,
"setCreateTime":true,
"setGrantor":true,
"setGrantorType":true,
"setPrivilege":true
},
{
"privilege":"UPDATE",
"createTime":-1,
"grantor":"anonymous",
"grantorType":"USER",
"grantOption":true,
"setGrantOption":true,
"setCreateTime":true,
"setGrantor":true,
"setGrantorType":true,
"setPrivilege":true
},
{
"privilege":"DELETE",
"createTime":-1,
"grantor":"anonymous",
"grantorType":"USER",
"grantOption":true,
"setGrantOption":true,
"setCreateTime":true,
"setGrantor":true,
"setGrantorType":true,
"setPrivilege":true
}
]
},
"groupPrivileges":null,
"rolePrivileges":null,
"setUserPrivileges":true,
"setGroupPrivileges":false,
"setRolePrivileges":false,
"userPrivilegesSize":1,
"groupPrivilegesSize":0,
"rolePrivilegesSize":0
},
"temporary":false,
"rewriteEnabled":false,
"setParameters":true,
"setPartitionKeys":true,
"partitionKeysSize":0,
"setSd":true,
"setLastAccessTime":true,
"setRetention":true,
"partitionKeysIterator":[
],
"parametersSize":4,
"setTemporary":true,
"setRewriteEnabled":false,
"setTableName":true,
"setDbName":true,
"setOwner":true,
"setViewOriginalText":false,
"setViewExpandedText":false,
"setTableType":true,
"setPrivileges":true,
"setCreateTime":true
}
當我們再執行修改表操作時
ALTER TABLE testlistener
ADD COLUMNS (age int COMMENT '年齡');
再次觀察日志:
可以看出上面有兩條記錄,第一條記錄是old table的信息,第二條是修改之后的表的信息。
- old table
{
"tableName":"testlistener",
"dbName":"default",
"owner":"anonymous",
"createTime":1597989316,
"lastAccessTime":0,
"retention":0,
"sd":{
"cols":[
{
"name":"id",
"type":"int",
"comment":"id",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setComment":true,
"setType":true,
"setName":true
}
],
"location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",
"inputFormat":"org.apache.hadoop.mapred.TextInputFormat",
"outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"compressed":false,
"numBuckets":-1,
"serdeInfo":{
"name":null,
"serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"parameters":{
"serialization.format":" ",
"field.delim":" "
},
"setSerializationLib":true,
"setParameters":true,
"parametersSize":2,
"setName":false
},
"bucketCols":[
],
"sortCols":[
],
"parameters":{
},
"skewedInfo":{
"skewedColNames":[
],
"skewedColValues":[
],
"skewedColValueLocationMaps":{
},
"setSkewedColNames":true,
"setSkewedColValues":true,
"setSkewedColValueLocationMaps":true,
"skewedColNamesSize":0,
"skewedColNamesIterator":[
],
"skewedColValuesSize":0,
"skewedColValuesIterator":[
],
"skewedColValueLocationMapsSize":0
},
"storedAsSubDirectories":false,
"setCols":true,
"setOutputFormat":true,
"setSerdeInfo":true,
"setBucketCols":true,
"setSortCols":true,
"colsSize":2,
"colsIterator":[
{
"name":"id",
"type":"int",
"comment":"id",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setComment":true,
"setType":true,
"setName":true
}
],
"setCompressed":true,
"setNumBuckets":true,
"bucketColsSize":0,
"bucketColsIterator":[
],
"sortColsSize":0,
"sortColsIterator":[
],
"setStoredAsSubDirectories":true,
"setParameters":true,
"setLocation":true,
"setInputFormat":true,
"parametersSize":0,
"setSkewedInfo":true
},
"partitionKeys":[
],
"parameters":{
"totalSize":"0",
"numFiles":"0",
"transient_lastDdlTime":"1597989316",
"comment":"建表_測試Hive Listener"
},
"viewOriginalText":null,
"viewExpandedText":null,
"tableType":"MANAGED_TABLE",
"privileges":null,
"temporary":false,
"rewriteEnabled":false,
"setParameters":true,
"setPartitionKeys":true,
"partitionKeysSize":0,
"setSd":true,
"setLastAccessTime":true,
"setRetention":true,
"partitionKeysIterator":[
],
"parametersSize":4,
"setTemporary":false,
"setRewriteEnabled":true,
"setTableName":true,
"setDbName":true,
"setOwner":true,
"setViewOriginalText":false,
"setViewExpandedText":false,
"setTableType":true,
"setPrivileges":false,
"setCreateTime":true
}
- new table
{
"tableName":"testlistener",
"dbName":"default",
"owner":"anonymous",
"createTime":1597989316,
"lastAccessTime":0,
"retention":0,
"sd":{
"cols":[
{
"name":"id",
"type":"int",
"comment":"id",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"age",
"type":"int",
"comment":"年齡",
"setComment":true,
"setType":true,
"setName":true
}
],
"location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",
"inputFormat":"org.apache.hadoop.mapred.TextInputFormat",
"outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"compressed":false,
"numBuckets":-1,
"serdeInfo":{
"name":null,
"serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"parameters":{
"serialization.format":" ",
"field.delim":" "
},
"setSerializationLib":true,
"setParameters":true,
"parametersSize":2,
"setName":false
},
"bucketCols":[
],
"sortCols":[
],
"parameters":{
},
"skewedInfo":{
"skewedColNames":[
],
"skewedColValues":[
],
"skewedColValueLocationMaps":{
},
"setSkewedColNames":true,
"setSkewedColValues":true,
"setSkewedColValueLocationMaps":true,
"skewedColNamesSize":0,
"skewedColNamesIterator":[
],
"skewedColValuesSize":0,
"skewedColValuesIterator":[
],
"skewedColValueLocationMapsSize":0
},
"storedAsSubDirectories":false,
"setCols":true,
"setOutputFormat":true,
"setSerdeInfo":true,
"setBucketCols":true,
"setSortCols":true,
"colsSize":3,
"colsIterator":[
{
"name":"id",
"type":"int",
"comment":"id",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"age",
"type":"int",
"comment":"年齡",
"setComment":true,
"setType":true,
"setName":true
}
],
"setCompressed":true,
"setNumBuckets":true,
"bucketColsSize":0,
"bucketColsIterator":[
],
"sortColsSize":0,
"sortColsIterator":[
],
"setStoredAsSubDirectories":true,
"setParameters":true,
"setLocation":true,
"setInputFormat":true,
"parametersSize":0,
"setSkewedInfo":true
},
"partitionKeys":[
],
"parameters":{
"totalSize":"0",
"last_modified_time":"1597989660",
"numFiles":"0",
"transient_lastDdlTime":"1597989660",
"comment":"建表_測試Hive Listener",
"last_modified_by":"anonymous"
},
"viewOriginalText":null,
"viewExpandedText":null,
"tableType":"MANAGED_TABLE",
"privileges":null,
"temporary":false,
"rewriteEnabled":false,
"setParameters":true,
"setPartitionKeys":true,
"partitionKeysSize":0,
"setSd":true,
"setLastAccessTime":true,
"setRetention":true,
"partitionKeysIterator":[
],
"parametersSize":6,
"setTemporary":false,
"setRewriteEnabled":true,
"setTableName":true,
"setDbName":true,
"setOwner":true,
"setViewOriginalText":false,
"setViewExpandedText":false,
"setTableType":true,
"setPrivileges":false,
"setCreateTime":true
}
可以看出:修改之后的表的元數據信息中,包含新添加的列age
。
總結
在本文中,我們介紹了如何在Hive中操作元數據,從而能夠自動進行元數據管理。我們給出了Hive Hooks和Metastore Listener的基本使用方式,這些方式可以幫助我們實現操作元數據。當然也可以將這些元數據信息推送到Kafka中,以此構建自己的元數據管理系統。
公眾號『大數據技術與數倉』,回復『資料』領取大數據資料包