Catalog API簡介
Spark中的DataSet和Dataframe API支持結構化分析。結構化分析的一個重要的方面是管理元數據。這些元數據可能是一些臨時元數據(比如臨時表)、SQLContext上注冊的UDF以及持久化的元數據(比如Hivemeta store或者HCatalog)。
Spark的早期版本是沒有標准的API來訪問這些元數據的。用戶通常使用查詢語句(比如show tables
)來查詢這些元數據。這些查詢通常需要操作原始的字符串,而且不同元數據類型的操作也是不一樣的。
這種情況在Spark 2.0中得到改變。Spark 2.0中添加了標准的API(稱為catalog)來訪問Spark SQL中的元數據。這個API既可以操作Spark SQL,也可以操作Hive元數據。
訪問Catalog
Catalog可以通過SparkSession獲取,下面代碼展示如何獲取Catalog:
import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder.appName("example").enableHiveSupport().getOrCreate() val catalog = sparkSession.catalog
訪問databases
我們一旦創建好catalog對象之后,我們可以使用它來查詢元數據中的數據庫,catalog上的API返回的結果全部都是dataset。
scala> catalog.listDatabases().show(false) +----------+--------------------+--------------------+ | name| description | locationUri | +----------+--------------------+--------------------+ |data_clean| |hdfs://asiainfo-1...| |default |Default Hive data...|hdfs://asiainfo-1...| +----------+--------------------+--------------------+ scala> catalog.listDatabases().select("name").show(false) +-----------------------+ |name | +-----------------------+ |iteblog | |default | +-----------------------+
listDatabases
返回元數據中所有的數據庫。
默認情況下,元數據僅僅只有名為default的數據庫。如果是Hive元數據,那么它會從Hive元數據中獲取所有的數據庫。listDatabases
返回的類型是dataset,所以我們可以使用Dataset上的所有操作來查詢元數據。
使用createTempView注冊Dataframe
在Spark的早期版本,我們使用registerTempTable
來注冊Dataframe。然而在Spark 2.0中,這個API已經被遺棄了。registerTempTable
名字很讓人誤解,因為用戶會認為這個函數會將Dataframe持久化並且保證這個臨時表,但是實際上並不是這樣的,所以社區才有意將它替換成createTempView
。createTempView
的使用方法如下:
df.createTempView("temp")
查詢表
正如我們可以展示出元數據中的所有數據庫一樣,我們也可以展示出元數據中某個數據庫中的表。它會展示出Spark SQL中所有注冊的臨時表。同時可以展示出Hive中默認數據庫(也就是default)中的表。如下:
scala> catalog.listTables().select("name").show(false) +----------------------------------------+ |name | +----------------------------------------+ |city_to_level | |table2 | |test | |ticket_order | |tmp1_result | +----------------------------------------+
判斷某個表是否緩存
我們可以使用Catalog提供的API來檢查某個表是否緩存。如下:
scala> println(catalog.isCached("temp")) false
上面判斷temp表是否緩存,結果輸出false。默認情況下表是不會被緩存的,我們可以手動緩存某個表,如下:
scala> df.cache() res4: df.type = [_c0: string, _c1: string ... 2 more fields] scala> println(catalog.isCached("temp")) true
現在iteblog表已經被緩存了,所有現在的輸出結構是true。
刪除view
我們可以使用catalog提供的API來刪除view。如果是Spark SQL情況,那么它會刪除事先注冊好的view;如果是hive情況,那么它會從元數據中刪除表。
scala> catalog.dropTempView("iteblog"
查詢已經注冊的函數
我們不僅可以使用Catalog API操作表,還可以用它操作UDF。下面代碼片段展示SparkSession上所有已經注冊號的函數,當然也包括了Spark內置的函數。
scala> catalog.listFunctions().select("name","className","isTemporary").show(100, false)
+---------------------+-----------------------------------------------------------------------+-----------+
|name |className |isTemporary|
+---------------------+-----------------------------------------------------------------------+-----------+
|! |org.apache.spark.sql.catalyst.expressions.Not |true |
|% |org.apache.spark.sql.catalyst.expressions.Remainder |true |
|& |org.apache.spark.sql.catalyst.expressions.BitwiseAnd |true |
|* |org.apache.spark.sql.catalyst.expressions.Multiply |true |
|+ |org.apache.spark.sql.catalyst.expressions.Add |true |
+---------------------+-----------------------------------------------------------------------+-----------+
參考:https://blog.csdn.net/pengzonglu7292/article/details/81044857