1、流計算SQL原理和架構
流計算SQL通常是一個類SQL的聲明式語言,主要用於對流式數據(Streams)的持續性查詢,目的是在常見流計算平台和框架(如Storm、Spark Streaming、Flink、Beam等)的底層API上,
通過使用簡易通用的的SQL語言構建SQL抽象層,降低實時開發的門檻。
流計算SQL的原理其實很簡單,就是在SQL和底層的流計算引擎之間架起一座橋梁---流計算SQL被用戶提交,被SQL引擎層翻譯為底層的API並在底層的流計算引擎上執行。比如對Storm
來說,會自動翻譯成Storm的任務拓撲並在Storm集群上運行。
流計算SQL引擎是流計算SQL的核心,主要負責對用戶SQL輸入進行語法分析、語義分析、邏輯計划生成、邏輯計划執行、物理執行計划生成等操作。而真正執行計算的是底層的流計算平台。
不同於離線任務,實時的數據是不斷流入的,所以為了使用SQL來對流處理進行抽象,流計算SQL也引入了“表”的概念,不過這里的表是動態表。
流計算SQL的架構如下:
SQL層:流計算SQL給用戶的接口,它提供過濾、轉換、關聯、聚合、窗口、select、union、split等各種功能。
SQL引擎層:負責SQL解析/校驗、邏輯計划生成優化和物理計划執行等。
流計算引擎層:具體執行SQL引擎層生成的執行計划。
2、流計算SQL:未來主要的實時開發技術
目前流計算SQL在各個計算框架的進度和支持力度不一。
Storm SQL還只是一個實驗性的功能。Flink SQL是Flink大力推廣的核心API。Flink是一個原生的開源流計算引擎,而且目前還沒有其它開源流計算引擎能提供比Flink 更優秀的流
計算SQL框架和語法等,所以Flink SQL實際上在定義流計算SQL的標注。
阿里雲Stream SQL 的底層就是Flink引擎(實際是Blink,也就是Alibaba Flink),可以認為Blink是Flink的企業版本,
3、Stream SQL
阿里雲提供了Stream SQL 開發的完整環境,包括Stream SQL語法、IDE開發工具、調試及運維等。下面具體介紹概念和語法
3.1、Stream SQL 源表
Stream SQL 通常將源頭數據抽象為源表,就像一個Storm任務必須至少定義一個spout,一個Stream SQL 任務必須至少定義一個源表。
定義Stream SQL 源表的語法如下:
CREATE TABLE tablename
(columnName dataType [,columnName dataType]*)
[WITH (propertyName=propertyValue [,propertyName=propertyValue ] * )];
如下面的例子創建了一個datahub類型的源表
create table datahub_stream(
name varchar,
age BIGINT,
birthday BIGINT)
with (
type ='datahub',
endPoint =‘http://dh-et2.aliyun-inc.com’,
project='blink-datahub_test',
topic ='test_topic_1',
accessId =0i70RRFJD1OBAWAs',
accessKey ='yF60EwURseo1UAn4NinvQPJ2zhCfHU',
startTime='2018-08-20 00:00:00'
);
其中的type表示流式數據的源頭類型,可以為datahub,也可以為日志或消息中間件等,type下面的各個參數類型的不同而不同,它們共同確定了此type的某個源頭類型。
此外,阿里雲Stream SQL底層流計算引擎是Flink/Blink,因此其支持水位線機制。
定義水位線的語法如下:
WATERMARK [watermarkName] FOR <rowtime_field>
AS withOffset(<rowtime_field>,offset)
比如WATERMARK FOR rowtime AS withOffset(rowtime,4000)就對源頭數據列rowtime定義了固定延遲4s的水位線。
3.2 、Stream SQL 結果表
有源表,就是結果表,Stream SQL定義結果表的語法如下:
CREATE TABLE tablename
(columnName dataType [,columnName dataType]*)
[WITH (propertyName=propertyValue [,propertyName=propertyValue ] * )];
Stream SQL的結果表支持各種類型,包括類似MySQL的RDS、類似HBase的TableStore、類似消息隊列的MessageQueue的,下面以RDS來介紹Stream SQL 結果表的具體語法:
create table rds_output(
id int,
len int,
content varchar,
primary key(id,len)
) with (
type ='rds',
url='jdbc:mysql:XXXXXX',
tableName='test4',
userName='test',
password='xxxx'
);
在上述代碼中,結果表的type不同,相應后面的其它參數也不一樣,具體可用參考阿里雲幫助文檔。
3.3、Stream SQL維度表
流計算SQL的維度表數據一類特殊的外部數據,相對流數據來說,他比較穩定且變化緩慢,是靜態或准靜態數據,作為join / left outer join的右表使用。需要特別注意的是,
維度表在流計算中不允許作為from 后面的數據存儲。流計算中對於from子句后對接的數據存儲一定是流式數據存儲,即 select * from dim_table是不被允許的。
阿里雲Stream SQL中沒有專門為維度表設計的DDL語法,使用標准的create table語法即可,但是需要額外增加一行PERIOD FOR SYSTEM_TIME的聲明,這行聲明定義了
維度表的變化周期,即表明該表是一張會變化的表。
一個簡單的維度表定義實例如下,type后面的語法類似源表定義,
CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY key(id), --用作維度表,必須有聲明的主鍵
PERIOD FOR SYSTEM_TIME ---定義了維度表的變化周期
) with (
type = 'xxx',
。。。
);
3.4、Stream SQL 臨時表
在實際的實時開發中,經常發現業務邏輯的復雜性使得只用一個Stream SQL來完成所有的業務邏輯基本是不可能的,而必須拆分為多個SQL共同完成,此時就需要定義中間臨時表(
在阿里雲Stream SQL 中也叫view,即視圖)。在Stream SQL中定義臨時表的語法如下:
CREATE VIEW viewName
[ (columnName[,columnName]*])]
AS queryStatement;
但需要注意的是,Stream SQL臨時表僅用於輔助計算邏輯表達的內存邏輯中間狀態,其物理是並不存在,也不會產生數據的物理存儲。當然,臨時表也不占用系統空間。一個臨時表的例子
如下:
CREATE VIEW largeOrders(r, t, c, u) AS
SELECT rowtime, productId, c, units
FROM Orders;
3.5、Stream SQL DML
Stream SQL語法和SQL標准語法絕大部分都是相同的,下面僅着重介紹insert操作
insert操作的語法:
INSERT INTO tableName
[ ( columnName[,columnName]* )]
queryStatement;
流計算不支持單獨SELECT操作,當前在執行SELECT查詢之前必須執行INSERT操作將結果保存起來。同時,需要注意的是,一個SQL文件支持多個源表輸入和多個結果表輸出。
只有result表和tmp表可以執行INSERT操作,且每張表只能執行一次INSERT操作,dim 表和stream表不能執行insert操作。
普通的select操作是從幾張表中讀數據,但查詢的對象也可以是另一個select操作,也就是子查詢,但要注意子查詢必須加別名,實例如下:
insert into result_table
select * from (
select t.a, sum(t.b) AS sum_b, from t1 t
group by t.a
) t1
where t1.sum_b>100;
參考資料:《離線和實時大數據開發實戰》