Spark SQL之Catalog API介紹和使用


Catalog API                                                                                                     

  1. Spark中的DataSet和Dataframe API支持結構化分析。結構化分析的一個重要的方面就是管理元數據。這些元數據可能是一些臨時元數據(比如臨時表)、SQLContext上注冊的UDF以及持久化的元數據(比如Hivemeta store或者HCatalog)。
  2. Spark的早期版本是沒有標准的API來訪問這些元數據的用戶通常使用查詢語句(比如show tables)來查詢這些元數據。這些查詢通常需要操作原始的字符串,而且不同的元數據類型的操作也是不一樣的。
  3. 這種情況在Spark2.0中得到改變。Spark2.0中添加了標准的API(稱為catalog)來訪問Spark SQL中的元數據。這個API既可以操作Spark SQL,也可以操作Hive元數據
  4. 接下來將介紹如何使用catalog API。

 

 

訪問Catalog                                                                                                     

  • Catalog可以通過SparkSession獲取,下面代碼展示如何獲取Catalog:
/**
 * User: 過往記憶
 * Date: 2016年07月05日
 * Time: 下午23:16
 * bolg: https://www.iteblog.com
 * 本文地址:https://www.iteblog.com/archives/1701.html
 * 過往記憶博客,專注於hadoop、hive、spark、shark、flume的技術博客,大量的干貨
 * 過往記憶博客微信公共帳號:iteblog_hadoop
 */
 
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val sparkSession = SparkSession.builder.appName("spark session example").enableHiveSupport().getOrCreate()
sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5d50ea49
 
scala> val catalog = sparkSession.catalog
catalog: org.apache.spark.sql.catalog.Catalog = org.apache.spark.sql.internal.CatalogImpl@17308af1

 

 

Querying the databases                                                                                 

我們一旦創建好catalog對象之后,我們可以使用它來查詢元數據中的數據庫,catalog上的API返回的結果全部都是dataset

scala> catalog.listDatabases().select("name").show(false)
19/07/17 14:21:54 ERROR metastore.ObjectStore: Version information found in metastore differs 1.1.0 from expected schema version 1.2.0. Schema verififcation is disabled hive.metastore.schema.verification so setting version.
+---------+
|name     |
+---------+
|default  |
|hadoop_g6|
|ruoze_d6 |
+---------+

 

  • listDatabases返回元數據中所有的數據庫。默認情況下,元數據僅僅只有名為default的數據庫。如果是Hive元數據,那么他會從Hive元數據中獲取所有的數據庫。listDatabases返回的類型是dataset,所以我們可以使用Dataset上的所有操作來查詢元數據

 

使用createTempView注冊Dataframe                                                              

  • 在Spark的早期版本,我們使用registerTemTable來注冊Dataframe。然而在Spark2.0中,這個API已經被遺棄了。registerTempTable名字很讓人誤解,因為用戶會認為這個函數會將Dataframe持久化並且保證這個臨時表,但實際上並不是這樣的,所以社區才有意將它替換成CreateTempView。createTempView的使用方法如下:

scala> val df = spark.format("json").read("/home/hadoop/data/people.json")

scala> df.createTempView("iteblog")

  • 我們注冊完一個view之后,然后就可以使用listTables函數來查詢它

查詢表                                                                                                               

  • 正如我們可以展示出元數據中的所有數據庫一樣,我們也可以展示出元數據中某個數據庫中的表。它會展示出SparkSQL中所有注冊的臨時表。同時可以展示出Hive中默認數據庫(也就是default)中的表。如下:

scala> catalog.listTables().select("name").show(false)
+-------------------+
|name |
+-------------------+
|customer |
|dual |
|g6_access |
|g6_access_lzo |
|g6_access_lzo_split|
|g6_access_orc |
|g6_access_orc_none |
|g6_access_par |
|g6_access_par_zip |
|g6_access_rc |
|g6_access_seq |
|makedata_job |
|order |
|traffic_info |
|tv_info |
|iteblog |
+-------------------+

 

 

show源碼

// scalastyle:off println
def show(numRows: Int, truncate: Boolean): Unit = if (truncate) {
println(showString(numRows, truncate = 20))
} else {
println(showString(numRows, truncate = 0))
}

* @param numRows Number of rows to show
* @param truncate If set to more than 0, truncates strings to `truncate` characters and
* all cells will be aligned right.

 

show(numRows,truncate ),參數numRows代表展示的條數,參數truncate=true代表展示字段中的數據只能展示20 的長度,大於20的位置會被截掉,相當於隱藏,默認是true

 

scala> catalog.listTables().select($"name").show(2,false)
+--------+
|name |
+--------+
|customer|
|dual |
+--------+
only showing top 2 rows


scala> catalog.listTables().select($"name").show(2,true)
+--------+
| name|
+--------+
|customer|
| dual|
+--------+
only showing top 2 rows


scala> catalog.listTables().select($"name").show(2)
+--------+
| name|
+--------+
|customer|
| dual|
+--------+
only showing top 2 rows

 

判斷某個表是否緩存                                                                                          

  • 我們可以使用Catalog提供API來檢查某個表是否緩存。如下:
scala> println(catalog.isCached("iteblog"))
false

上面判斷iteblog表是否緩存,結果輸出false。默認情況下表是不會被緩存的,我們可以手動緩存某個表,如下:

scala> df.cache()
res12: df.type = [_corrupt_record: string]

scala> println(catalog.isCached("iteblog"))
true 
  •  現在iteblog表已經被緩存了,所有現在的輸出結構是true。

刪除view                                                                                                           

  • 我們可以使用catalog提供的API來刪除view。如果是Spark SQL情況,那么它會刪除事先注冊好的view;如果是hive情況,那么他會從元數據中刪除表。
scala>  catalog.dropTempView("iteblog")
res16: Boolean = true

查詢已經注冊的函                                                                                              

+------------+----------------------------------------+------------------+
|name        |className                               |  isTemporary|
+-------------+----------------------------------------+-----------------+
|sayhello    |com.ruozedata.UDF.HelloUDF |   false           |
|    %          | org.apache.spark.sql.catalyst.expressions.Remainder |   true|
+-------------+------------------------------------------+----------------+

 

 

 上面展示了10個函數及其實現類。

 

參考原文鏈接:https://www.iteblog.com/archives/1701.html#Catalog_API


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM