Spark SQL 表的命名方式是db_name.table_name,只有數據庫名稱和數據表名稱。如果沒有指定db_name而直接引用table_name,實際上是引用default 數據庫下的表。在Spark SQL中,數據庫只是指定表文件存儲的路徑,每個表都可以使用不同的文件格式來存儲數據,從這個角度來看,可以把database看作是Databricks 表的上層目錄,用於組織數據表及其文件。
在python語言環境中,可以使用 %sql 切換到SQL命令模式:
%sql
一,管理數據庫
常用的數據庫命令,切換當前的數據庫、顯示數據庫列表、表列表、視圖列表和列信息:
use db_name show databases show tables [in db_name] show views [in db_name] show columns in db_name.table_name
1,創建數據庫
創建數據庫,通過LOCATION 指定數據庫文件存儲的位置:
CREATE { DATABASE | SCHEMA } [ IF NOT EXISTS ] database_name [ LOCATION database_directory ]
LOCATION database_directory:指定存儲數據庫文件系統的路徑,如果底層的文件系統中不存在該路徑,那么需要先創建該目錄。如果未指定LOCATION參數,那么使用默認的數據倉庫目錄來創建數據庫,默認的數據倉庫目錄是由靜態配置參數spark.sql.warehouse.dir指定的。
2,查看數據庫的描述
{ DESC | DESCRIBE } DATABASE [ EXTENDED ] db_name
extended 選項表示查看數據庫的擴展屬性。
3,刪除數據庫
DROP { DATABASE | SCHEMA } [ IF EXISTS ] dbname [ RESTRICT | CASCADE ]
IF EXISTS:該選項表示在數據庫不存在時,DROP操作不會引發異常。
RESTRICT:該選項表示不能刪除非空數據庫,並在默認情況下啟用。
CASCADE:該選項表示刪除數據庫中所有關聯的表和函數。
二,創建數據表
表有兩種作用域:全局和本地,全局表可以在所有的Cluster中引用,而本地表只能在本地的Cluster中引用,被稱作臨時視圖。用戶可以從DBFS中的文件或存儲在任何受支持數據源中的數據來填充表。
在創建表時,需要指定存儲表數據的文件格式,以及表數據文件存儲的位置。
1,使用數據源創建表(標准的CREATE TABLE命令)
創建表的語法,注意:如果數據庫中已存在同名的表,則會引發異常。
CREATE TABLE [ IF NOT EXISTS ] [db_name].table_name [ ( col_name1 col_type1, ... ) ] USING data_source [ OPTIONS ( key1=val1, key2=val2, ... ) ] [ PARTITIONED BY ( col_name1, col_name2, ... ) ] [ CLUSTERED BY ( col_name3, col_name4, ... ) [ SORTED BY ( col_name [ ASC | DESC ], ... ) ] INTO num_buckets BUCKETS ] [ LOCATION path ] [ AS select_statement ]
參數注釋:
- IF NOT EXISTS:如果數據庫中已存在同名的表,則不會執行任何操作。
- USING data_source:用於表的文件格式,data_source 必須是 TEXT、CSV、JSON、JDBC、PARQUET、ORC、HIVE、DELTA 或 LIBSVM 中的一個,或 org.apache.spark.sql.sources.DataSourceRegister 的自定義實現的完全限定的類名。支持使用 HIVE 創建 Hive SerDe 表。 你可以使用 OPTIONS 子句指定 Hive 特定的 file_format 和 row_format,這是不區分大小寫的字符串映射。選項鍵為 FILEFORMAT、INPUTFORMAT、OUTPUTFORMAT、SERDE、FIELDDELIM、ESCAPEDELIM、MAPKEYDELIM 和 LINEDELIM。
- OPTIONS:用於優化表的行為或配置 HIVE 表的表選項。
- PARTITIONED BY (col_name1, col_name2, ...):按指定的列對創建的表進行分區,將為每個分區創建一個目錄。
- CLUSTERED BY col_name3, col_name4, ...):按照指定的列,把表中的分區分割到固定數目的 Bucket中,該選項通常與分區操作配合使用。delta格式的文件不支持該子句。
- SORTED BY:數據在buckets中的排序方式,默認是升序ASC。
- INTO num_buckets BUCKETS:bucket是一個優化技術,使用bucket(和bucket 列)來確定數據的分區,並避免數據洗牌(data shuffle),使數據變得有序。
- LOCATION path:用於存儲表數據的目錄,可以指定分布式存儲上的路徑。
- AS select_statement:使用來自 SELECT 語句的輸出數據填充該表。
2,使用Delta Lake(增量Lake)創建表
用戶可以使用標准的CREATE TABLE命令來創建存儲在delta lake中的表,除了標准的創建delta table的命令之外,還可以使用以下的語法來創建delta表:
CREATE [OR REPLACE] TABLE table_identifier[(col_name1 col_type1 [NOT NULL], ...)] USING DELTA [LOCATION <path-to-delta-files>]
table_identifier 有兩種格式:
[database_name.] table_name
: 表的名稱delta.`delta_file_path`
LOCATION <path-to-delta-files> :如果指定的 LOCATION 已包含增量 lake 中存儲的數據,Delta lake 會執行以下操作:
如果僅指定了表名稱和位置,例如:
CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events'
Hive 元存儲中的表會自動繼承現有數據的架構、分區和表屬性,此功能可用於把數據“導入”到元存儲(metastore)中。
如果你指定了任何配置(架構、分區或表屬性),那么 Delta Lake 會驗證指定的內容是否與現有數據的配置完全匹配。如果指定的配置與數據的配置並非完全匹配,則 Delta Lake 會引發一個描述差異的異常。
3,創建表的示例
--Use data source CREATE TABLE student (id INT, name STRING, age INT) USING PARQUET; --Use data from another table CREATE TABLE student_copy USING PARQUET AS SELECT * FROM student; --Omit the USING clause, which uses the default data source (parquet by default) CREATE TABLE student (id INT, name STRING, age INT);
--Create partitioned and bucketed table CREATE TABLE student (id INT, name STRING, age INT) USING PARQUET PARTITIONED BY (age) CLUSTERED BY (Id) INTO 4 buckets;
三,和數據源的交互
數據源表的作用類似於指向基礎數據源的指針,例如,您可以使用JDBC數據源在Azure Databricks中創建表foo,該表指向MySQL中的表bar。當讀寫表foo時,實際上就是讀寫表bar。
通常,CREATE TABLE會創建一個“指針”,並且必須確保它指向的對象是存在的,一個例外是文件源,例如Parquet,JSON,如果您未指定LOCATION選項,那么Azure Databricks會創建一個默認表位置。
對於CREATE TABLE AS SELECT,Azure Databricks使用select查詢的輸出數據來覆蓋(overwrite)底層的數據源,以確保創建的表包含與輸入查詢完全相同的數據。
四,向表插入數據
用戶可以向表種插入數據,也可以向Spark支持的文件中插入數據。
1,向表中插入數據
使用INSERT INTO 命令向表中追加數據,不會影響表中的現有數據;使用INSERT OVERWRITE 命令,會覆蓋表中的現有數據。
INSERT INTO [ TABLE ] table_identifier [ partition_spec ] { VALUES ( { value | NULL } [ , ... ] ) [ , ( ... ) ] | query } INSERT OVERWRITE [ TABLE ] table_identifier [ partition_spec [ IF NOT EXISTS ] ] { VALUES ( { value | NULL } [ , ... ] ) [ , ( ... ) ] | query }
參數注釋:
- table_identifier:[database_name.] table_name:表名稱,可選擇使用數據庫名稱進行限定。delta.<路徑到表> :現有增量表的位置。
- partition_spec:一個可選參數,用於指定分區的鍵/值對的逗號分隔列表。語法:PARTITION ( partition_col_name = partition_col_val [ , ... ] )
- 值 ( {value |NULL} [,...] ) [, ( ... ) ]:要插入的值。 顯式指定的值或 NULL 。 使用逗號分隔子句中的每個值。 您可以指定多個值集來插入多個行。
- query:生成要插入的行的查詢,可用的查詢格式:SELECT語句、TABLE語句、FROM語句
舉個例子,創建表之后,通過VALUES子句向表中插入少量的值,也可以通過 SELECT 子句、TABLE和FROM向表中批量插入數據。
CREATE TABLE students (name VARCHAR(64), address VARCHAR(64), student_id INT) USING PARQUET PARTITIONED BY (student_id); -- VALUES INSERT INTO students VALUES ('Bob Brown', '456 Taylor St, Cupertino', 222222), ('Cathy Johnson', '789 Race Ave, Palo Alto', 333333); -- SELECT INSERT INTO students PARTITION (student_id = 444444) SELECT name, address FROM persons WHERE name = "Dora Williams"; -- TABLE INSERT INTO students TABLE visiting_students; -- FROM INSERT INTO students FROM applicants SELECT name, address, id applicants WHERE qualified = true;
2,向文件中插入數據
使用給定的Spark文件格式用新值覆蓋目錄中的現有數據,也就是說,向目錄中插入數據時,只能用新數據覆蓋現有的數據:
INSERT OVERWRITE [ LOCAL ] DIRECTORY [ directory_path ] USING file_format [ OPTIONS ( key = val [ , ... ] ) ] { VALUES ( { value | NULL } [ , ... ] ) [ , ( ... ) ] | query }
參數注釋:
- directory_path:目標目錄,還可以使用在中指定 OPTIONS path 。 LOCAL關鍵字用於指定目錄位於本地文件系統中。
- file_format:要用於插入的文件格式。 有效選項包括 TEXT 、 CSV 、 JSON 、 JDBC 、 PARQUET 、ORC、HIVE、LIBSVM,或者自定義實現的完全限定類名 org.apache.spark.sql.execution.datasources.FileFormat 。
- OPTIONS ( key = val [,...] ):指定用於寫入文件格式的一個或多個選項。
示例,使用新數據覆蓋目錄中的數據:
INSERT OVERWRITE DIRECTORY '/tmp/destination' USING parquet OPTIONS (col1 1, col2 2, col3 'test') SELECT * FROM test_table; INSERT OVERWRITE DIRECTORY USING parquet OPTIONS ('path' '/tmp/destination', col1 1, col2 2, col3 'test') SELECT * FROM test_table;
參考文檔: