spark SQL之Catalog API使用


Catalog API簡介

    Spark中的DataSet和Dataframe API支持結構化分析。結構化分析的一個重要的方面是管理元數據。這些元數據可能是一些臨時元數據(比如臨時表)、SQLContext上注冊的UDF以及持久化的元數據(比如Hivemeta store或者HCatalog)。

Spark的早期版本是沒有標准的API來訪問這些元數據的。用戶通常使用查詢語句(比如show tables)來查詢這些元數據。這些查詢通常需要操作原始的字符串,而且不同元數據類型的操作也是不一樣的。

這種情況在Spark 2.0中得到改變。Spark 2.0中添加了標准的API(稱為catalog)來訪問Spark SQL中的元數據。這個API既可以操作Spark SQL,也可以操作Hive元數據。

訪問Catalog

Catalog可以通過SparkSession獲取,下面代碼展示如何獲取Catalog:

import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder.appName("example").enableHiveSupport().getOrCreate()

val catalog = sparkSession.catalog

訪問databases

我們一旦創建好catalog對象之后,我們可以使用它來查詢元數據中的數據庫,catalog上的API返回的結果全部都是dataset。

scala> catalog.listDatabases().show(false)

+----------+--------------------+--------------------+
|      name|   description      |      locationUri   |
+----------+--------------------+--------------------+
|data_clean|                    |hdfs://asiainfo-1...|
|default   |Default Hive data...|hdfs://asiainfo-1...|
+----------+--------------------+--------------------+

scala> catalog.listDatabases().select("name").show(false)
+-----------------------+
|name                   |
+-----------------------+
|iteblog                |
|default                |
+-----------------------+

listDatabases返回元數據中所有的數據庫。

默認情況下,元數據僅僅只有名為default的數據庫。如果是Hive元數據,那么它會從Hive元數據中獲取所有的數據庫。listDatabases返回的類型是dataset,所以我們可以使用Dataset上的所有操作來查詢元數據。

使用createTempView注冊Dataframe

在Spark的早期版本,我們使用registerTempTable來注冊Dataframe。然而在Spark 2.0中,這個API已經被遺棄了。registerTempTable名字很讓人誤解,因為用戶會認為這個函數會將Dataframe持久化並且保證這個臨時表,但是實際上並不是這樣的,所以社區才有意將它替換成createTempViewcreateTempView的使用方法如下:

df.createTempView("temp")

查詢表

正如我們可以展示出元數據中的所有數據庫一樣,我們也可以展示出元數據中某個數據庫中的表。它會展示出Spark SQL中所有注冊的臨時表。同時可以展示出Hive中默認數據庫(也就是default)中的表。如下:

scala> catalog.listTables().select("name").show(false)
+----------------------------------------+
|name                                    |
+----------------------------------------+
|city_to_level                           |
|table2                                  |
|test                                    |
|ticket_order                            |
|tmp1_result                             |
+----------------------------------------+

判斷某個表是否緩存

我們可以使用Catalog提供的API來檢查某個表是否緩存。如下:

scala> println(catalog.isCached("temp"))
false

上面判斷temp表是否緩存,結果輸出false。默認情況下表是不會被緩存的,我們可以手動緩存某個表,如下:

scala>  df.cache()
res4: df.type = [_c0: string, _c1: string ... 2 more fields]
 
scala> println(catalog.isCached("temp"))
true

現在iteblog表已經被緩存了,所有現在的輸出結構是true。

刪除view

我們可以使用catalog提供的API來刪除view。如果是Spark SQL情況,那么它會刪除事先注冊好的view;如果是hive情況,那么它會從元數據中刪除表。

 scala> catalog.dropTempView("iteblog"

查詢已經注冊的函數

我們不僅可以使用Catalog API操作表,還可以用它操作UDF。下面代碼片段展示SparkSession上所有已經注冊號的函數,當然也包括了Spark內置的函數。

scala> catalog.listFunctions().select("name","className","isTemporary").show(100, false)

+---------------------+-----------------------------------------------------------------------+-----------+
|name                 |className                                                              |isTemporary|
+---------------------+-----------------------------------------------------------------------+-----------+
|!                    |org.apache.spark.sql.catalyst.expressions.Not                          |true       |
|%                    |org.apache.spark.sql.catalyst.expressions.Remainder                    |true       |
|&                    |org.apache.spark.sql.catalyst.expressions.BitwiseAnd                   |true       |
|*                    |org.apache.spark.sql.catalyst.expressions.Multiply                     |true       |
|+                    |org.apache.spark.sql.catalyst.expressions.Add                          |true       |
+---------------------+-----------------------------------------------------------------------+-----------+

 

參考:https://blog.csdn.net/pengzonglu7292/article/details/81044857


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM