Flink基礎(三十一):FLINK-SQL語法(七)DML(一)INSERT 語句


INSERT 語句用來向表中添加行。

1 執行 INSERT 語句

單條 INSERT 語句,可以使用 TableEnvironment 中的 executeSql() 方法執行,也可以在 SQL CLI 中執行 INSERT 語句。executeSql() 方法執行 INSERT 語句時會立即提交一個 Flink 作業,並且返回一個 TableResult 對象,通過該對象可以獲取 JobClient 方便的操作提交的作業。 多條 INSERT 語句,使用 TableEnvironment 中的 createStatementSet 創建一個 StatementSet 對象,然后使用 StatementSet 中的 addInsertSql() 方法添加多條 INSERT 語句,最后通過 StatementSet 中的 execute() 方法來執行。

以下的例子展示了如何在 TableEnvironment 和 SQL CLI 中執行一條 INSERT 語句,或者通過 StatementSet 執行多條 INSERT 語句。

val settings = EnvironmentSettings.newInstance()...
val tEnv = TableEnvironment.create(settings)

// 注冊一個 "Orders" 源表,和 "RubberOrders" 結果表
tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
tEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")

// 運行一個 INSERT 語句,將源表的數據輸出到結果表中
val tableResult1 = tEnv.executeSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
// 通過 TableResult 來獲取作業狀態
println(tableResult1.getJobClient().get().getJobStatus())

//----------------------------------------------------------------------------
// 注冊一個 "GlassOrders" 結果表用於運行多 INSERT 語句
tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)");

// 運行多個 INSERT 語句,將原表數據輸出到多個結果表中
val stmtSet = tEnv.createStatementSet()
// `addInsertSql` 方法每次只接收單條 INSERT 語句
stmtSet.addInsertSql(
  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
stmtSet.addInsertSql(
  "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
// 執行剛剛添加的所有 INSERT 語句
val tableResult2 = stmtSet.execute()
// 通過 TableResult 來獲取作業狀態
println(tableResult1.getJobClient().get().getJobStatus())

2 將 SELECT 查詢數據插入表中

通過 INSERT 語句,可以將查詢的結果插入到表中,

語法

INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement

part_spec:
  (part_col_name1=val1 [, part_col_name2=val2, ...])

OVERWRITE

INSERT OVERWRITE 將會覆蓋表中或分區中的任何已存在的數據。否則,新數據會追加到表中或分區中。

PARTITION

PARTITION 語句應該包含需要插入的靜態分區列與值。

示例

-- 創建一個分區表
CREATE TABLE country_page_view (user STRING, cnt INT, date STRING, country STRING)
PARTITIONED BY (date, country)
WITH (...)

-- 追加行到該靜態分區中 (date='2019-8-30', country='China')
INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
  SELECT user, cnt FROM page_view_source;

-- 追加行到分區 (date, country) 中,其中 date 是靜態分區 '2019-8-30';country 是動態分區,其值由每一行動態決定
INSERT INTO country_page_view PARTITION (date='2019-8-30')
  SELECT user, cnt, country FROM page_view_source;

-- 覆蓋行到靜態分區 (date='2019-8-30', country='China')
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China')
  SELECT user, cnt FROM page_view_source;

-- 覆蓋行到分區 (date, country) 中,其中 date 是靜態分區 '2019-8-30';country 是動態分區,其值由每一行動態決定
INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30')
  SELECT user, cnt, country FROM page_view_source;

3 將值插入表中

通過 INSERT 語句,也可以直接將值插入到表中,

語法

INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...] values_row: : (val1 [, val2, ...])

OVERWRITE

INSERT OVERWRITE 將會覆蓋表中的任何已存在的數據。否則,新數據會追加到表中。

示例

CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...); INSERT INTO students VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM