大數據開發實戰:Stream SQL實時開發一


  1、流計算SQL原理和架構

    流計算SQL通常是一個類SQL的聲明式語言,主要用於對流式數據(Streams)的持續性查詢,目的是在常見流計算平台和框架(如Storm、Spark Streaming、Flink、Beam等)的底層API上,

  通過使用簡易通用的的SQL語言構建SQL抽象層,降低實時開發的門檻。

    流計算SQL的原理其實很簡單,就是在SQL和底層的流計算引擎之間架起一座橋梁---流計算SQL被用戶提交,被SQL引擎層翻譯為底層的API並在底層的流計算引擎上執行。比如對Storm

  來說,會自動翻譯成Storm的任務拓撲並在Storm集群上運行。

    流計算SQL引擎是流計算SQL的核心,主要負責對用戶SQL輸入進行語法分析、語義分析、邏輯計划生成、邏輯計划執行、物理執行計划生成等操作。而真正執行計算的是底層的流計算平台。

    不同於離線任務,實時的數據是不斷流入的,所以為了使用SQL來對流處理進行抽象,流計算SQL也引入了“表”的概念,不過這里的表是動態表。

    流計算SQL的架構如下:

    

    SQL層:流計算SQL給用戶的接口,它提供過濾、轉換、關聯、聚合、窗口、select、union、split等各種功能。

    SQL引擎層:負責SQL解析/校驗、邏輯計划生成優化和物理計划執行等。

    流計算引擎層:具體執行SQL引擎層生成的執行計划。

  2、流計算SQL:未來主要的實時開發技術

    目前流計算SQL在各個計算框架的進度和支持力度不一。

    Storm SQL還只是一個實驗性的功能。Flink SQL是Flink大力推廣的核心API。Flink是一個原生的開源流計算引擎,而且目前還沒有其它開源流計算引擎能提供比Flink 更優秀的流

    計算SQL框架和語法等,所以Flink SQL實際上在定義流計算SQL的標注。

    阿里雲Stream SQL 的底層就是Flink引擎(實際是Blink,也就是Alibaba Flink),可以認為Blink是Flink的企業版本,

  3、Stream SQL

    阿里雲提供了Stream SQL 開發的完整環境,包括Stream SQL語法、IDE開發工具、調試及運維等。下面具體介紹概念和語法

    3.1、Stream SQL 源表

      Stream SQL 通常將源頭數據抽象為源表,就像一個Storm任務必須至少定義一個spout,一個Stream SQL 任務必須至少定義一個源表。

      定義Stream SQL 源表的語法如下:

      CREATE TABLE tablename

      (columnName dataType [,columnName dataType]*)

      [WITH (propertyName=propertyValue [,propertyName=propertyValue ] * )];

      如下面的例子創建了一個datahub類型的源表

      create table datahub_stream(

        name varchar,

        age BIGINT,

        birthday BIGINT)

        with (

          type ='datahub',

          endPoint =‘http://dh-et2.aliyun-inc.com’,

          project='blink-datahub_test',

          topic ='test_topic_1',

          accessId =0i70RRFJD1OBAWAs',

          accessKey ='yF60EwURseo1UAn4NinvQPJ2zhCfHU',

          startTime='2018-08-20 00:00:00'

          );

      其中的type表示流式數據的源頭類型,可以為datahub,也可以為日志或消息中間件等,type下面的各個參數類型的不同而不同,它們共同確定了此type的某個源頭類型。

      此外,阿里雲Stream SQL底層流計算引擎是Flink/Blink,因此其支持水位線機制。

      定義水位線的語法如下:

      WATERMARK  [watermarkName] FOR <rowtime_field>

      AS withOffset(<rowtime_field>,offset)

      比如WATERMARK FOR rowtime AS withOffset(rowtime,4000)就對源頭數據列rowtime定義了固定延遲4s的水位線。

 

    3.2 、Stream SQL 結果表

      有源表,就是結果表,Stream SQL定義結果表的語法如下:

      CREATE TABLE tablename

      (columnName dataType [,columnName dataType]*)

      [WITH (propertyName=propertyValue [,propertyName=propertyValue ] * )];

      Stream SQL的結果表支持各種類型,包括類似MySQL的RDS、類似HBase的TableStore、類似消息隊列的MessageQueue的,下面以RDS來介紹Stream SQL 結果表的具體語法:

      create table rds_output(

        id int,

        len int,

        content varchar,

        primary key(id,len)

      ) with (

         type ='rds',

         url='jdbc:mysql:XXXXXX',

         tableName='test4',

         userName='test',

         password='xxxx'

      );

      在上述代碼中,結果表的type不同,相應后面的其它參數也不一樣,具體可用參考阿里雲幫助文檔。

     3.3、Stream SQL維度表

        流計算SQL的維度表數據一類特殊的外部數據,相對流數據來說,他比較穩定且變化緩慢,是靜態或准靜態數據,作為join / left outer join的右表使用。需要特別注意的是,

      維度表在流計算中不允許作為from 后面的數據存儲。流計算中對於from子句后對接的數據存儲一定是流式數據存儲,即 select * from dim_table是不被允許的。

        阿里雲Stream SQL中沒有專門為維度表設計的DDL語法,使用標准的create table語法即可,但是需要額外增加一行PERIOD FOR SYSTEM_TIME的聲明,這行聲明定義了

      維度表的變化周期,即表明該表是一張會變化的表。

        一個簡單的維度表定義實例如下,type后面的語法類似源表定義,

        CREATE TABLE white_list (

          id varchar,

          name varchar,

          age int,

          PRIMARY key(id),  --用作維度表,必須有聲明的主鍵

          PERIOD FOR SYSTEM_TIME ---定義了維度表的變化周期

          ) with (

            type = 'xxx',

            。。。

          );

      

      3.4、Stream SQL 臨時表

      在實際的實時開發中,經常發現業務邏輯的復雜性使得只用一個Stream SQL來完成所有的業務邏輯基本是不可能的,而必須拆分為多個SQL共同完成,此時就需要定義中間臨時表(

      在阿里雲Stream SQL 中也叫view,即視圖)。在Stream SQL中定義臨時表的語法如下:

      CREATE VIEW viewName

      [ (columnName[,columnName]*])]

      AS queryStatement;

      但需要注意的是,Stream SQL臨時表僅用於輔助計算邏輯表達的內存邏輯中間狀態,其物理是並不存在,也不會產生數據的物理存儲。當然,臨時表也不占用系統空間。一個臨時表的例子

      如下:

        CREATE VIEW largeOrders(r, t, c, u) AS

        SELECT rowtime, productId, c, units

        FROM Orders;

      

    3.5、Stream SQL DML

       Stream SQL語法和SQL標准語法絕大部分都是相同的,下面僅着重介紹insert操作

      insert操作的語法:

      INSERT INTO tableName

      [ ( columnName[,columnName]* )]

      queryStatement;

      流計算不支持單獨SELECT操作,當前在執行SELECT查詢之前必須執行INSERT操作將結果保存起來。同時,需要注意的是,一個SQL文件支持多個源表輸入和多個結果表輸出。

      只有result表和tmp表可以執行INSERT操作,且每張表只能執行一次INSERT操作,dim 表和stream表不能執行insert操作。

      普通的select操作是從幾張表中讀數據,但查詢的對象也可以是另一個select操作,也就是子查詢,但要注意子查詢必須加別名,實例如下:

      insert into result_table

      select * from (

            select t.a,   sum(t.b) AS sum_b,   from t1 t   

            group by t.a

            ) t1

      where t1.sum_b>100;

 

   參考資料:《離線和實時大數據開發實戰》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM