INSERT 語句用來向表中添加行。
1 執行 INSERT 語句
單條 INSERT 語句,可以使用 TableEnvironment 中的 executeSql() 方法執行,也可以在 SQL CLI 中執行 INSERT 語句。executeSql() 方法執行 INSERT 語句時會立即提交一個 Flink 作業,並且返回一個 TableResult 對象,通過該對象可以獲取 JobClient 方便的操作提交的作業。 多條 INSERT 語句,使用 TableEnvironment 中的 createStatementSet 創建一個 StatementSet 對象,然后使用 StatementSet 中的 addInsertSql() 方法添加多條 INSERT 語句,最后通過 StatementSet 中的 execute() 方法來執行。
以下的例子展示了如何在 TableEnvironment 和 SQL CLI 中執行一條 INSERT 語句,或者通過 StatementSet 執行多條 INSERT 語句。
val settings = EnvironmentSettings.newInstance()... val tEnv = TableEnvironment.create(settings) // 注冊一個 "Orders" 源表,和 "RubberOrders" 結果表 tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)") tEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)") // 運行一個 INSERT 語句,將源表的數據輸出到結果表中 val tableResult1 = tEnv.executeSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") // 通過 TableResult 來獲取作業狀態 println(tableResult1.getJobClient().get().getJobStatus()) //---------------------------------------------------------------------------- // 注冊一個 "GlassOrders" 結果表用於運行多 INSERT 語句 tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)"); // 運行多個 INSERT 語句,將原表數據輸出到多個結果表中 val stmtSet = tEnv.createStatementSet() // `addInsertSql` 方法每次只接收單條 INSERT 語句 stmtSet.addInsertSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") stmtSet.addInsertSql( "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'") // 執行剛剛添加的所有 INSERT 語句 val tableResult2 = stmtSet.execute() // 通過 TableResult 來獲取作業狀態 println(tableResult1.getJobClient().get().getJobStatus())
2 將 SELECT 查詢數據插入表中
通過 INSERT 語句,可以將查詢的結果插入到表中,
語法
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement
part_spec:
(part_col_name1=val1 [, part_col_name2=val2, ...])
OVERWRITE
INSERT OVERWRITE 將會覆蓋表中或分區中的任何已存在的數據。否則,新數據會追加到表中或分區中。
PARTITION
PARTITION 語句應該包含需要插入的靜態分區列與值。
示例
-- 創建一個分區表 CREATE TABLE country_page_view (user STRING, cnt INT, date STRING, country STRING) PARTITIONED BY (date, country) WITH (...) -- 追加行到該靜態分區中 (date='2019-8-30', country='China') INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China') SELECT user, cnt FROM page_view_source; -- 追加行到分區 (date, country) 中,其中 date 是靜態分區 '2019-8-30';country 是動態分區,其值由每一行動態決定 INSERT INTO country_page_view PARTITION (date='2019-8-30') SELECT user, cnt, country FROM page_view_source; -- 覆蓋行到靜態分區 (date='2019-8-30', country='China') INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China') SELECT user, cnt FROM page_view_source; -- 覆蓋行到分區 (date, country) 中,其中 date 是靜態分區 '2019-8-30';country 是動態分區,其值由每一行動態決定 INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30') SELECT user, cnt, country FROM page_view_source;
3 將值插入表中
通過 INSERT 語句,也可以直接將值插入到表中,
語法
INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...] values_row: : (val1 [, val2, ...])
OVERWRITE
INSERT OVERWRITE 將會覆蓋表中的任何已存在的數據。否則,新數據會追加到表中。
示例
CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...); INSERT INTO students VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);
