作業開發流程
-
創建輸入流
-
創建輸出流
-
檢查輸出數據類型是否和結果表一致,如不一致進行格式轉換
-
創建中間流
-
保存、語法檢查
-
測試發布
-
生產發布申請
FlinkSQL語法參考 創建輸入流 語法格式 CREATE TABLE [catalog_name_][db_name_]table_name ( { <column_definition> | <computed_column_definition> }[ , ...n] [ <watermark_definition> ] ) [COMMENT table_comment] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] WITH (key1=val1, key2=val2, ...) -- 定義表字段 <column_definition>: column_name column_type [COMMENT column_comment] -- 定義計算列 <computed_column_definition>: column_name AS computed_column_expression [COMMENT column_comment] -- 定義水位線 <watermark_definition>: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression COMPUTED COLUMN(計算列) 計算列是一個通過column_name AS computed_column_expression生成的虛擬列,產生的計算列不是物理存儲在數據源表中。一個計算列可以通過原有數據源表中的某個字段、運算符及內置函數生成。比如,定義一個消費金額的計算列(cost),可以使用表的價格(price)*數量(quantity)計算得到。計算列常常被用在定義時間屬性(見另一篇文章Flink Table API&SQL編程指南之時間屬性(3),可以通過PROCTIME()函數定義處理時間屬性,語法為proc AS PROCTIME()。除此之外,計算列可以被用作提取事件時間列,因為原始的事件時間可能不是TIMESTAMP(3)類型或者是存在JSON串中。 水位線 水位線定義了表的事件時間屬性,其語法為: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 其中rowtime_column_name表示表中已經存在的事件時間字段,值得注意的是,該事件時間字段必須是TIMESTAMP(3)類型,即形如yyyy-MM-dd HH:mm:ss,如果不是這種形式的數據類型,需要通過定義計算列進行轉換。watermark_strategy_expression定義了水位線生成的策略,該表達式的返回數據類型必須是TIMESTAMP(3)類型。 Flink提供了許多常用的水位線生成策略: 嚴格單調遞增的水位線:語法為WATERMARK FOR rowtime_column AS rowtime_column 即直接使用時間時間戳作為水位線 遞增水位線:語法為WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND 亂序水位線:語法為WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit-- 比如,允許5秒的亂序WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND WITH 選項 創建Table source或者Table sink需要指定表的屬性,屬性是以key/value的形式配置的,具體參考其相對應的connector 定義kafka source CREATE TABLE KafkaTable ( ... ) WITH ( 'connector.type' = 'kafka', -- 連接類型 'connector.version' = '0.11',-- 必選: 可選的kafka版本有:0.8/0.9/0.10/0.11/universal 'connector.topic' = 'topic_name', -- 必選: 主題名稱 'connector.properties.zookeeper.connect' = 'localhost:2181', -- 必選: zk連接地址 'connector.properties.bootstrap.servers' = 'localhost:9092', -- 必選: Kafka連接地址 'connector.properties.group.id' = 'testGroup', --可選: 消費者組 -- 可選:偏移量, earliest-offset/latest-offset/group-offsets/specific-offsets 'connector.startup-mode' = 'earliest-offset', -- 可選: 當偏移量指定為specific offsets,為指定每個分區指定具體位置 'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300', 'connector.sink-partitioner' = '...',-- 可選: sink分區器,fixed/round-robin/custom -- 可選: 當自定義分區器時,指定分區器的類名 'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner', 'format.type' = '...',-- 必選: 指定格式,支持csv/json/avro /dts-avro 'update-mode' = 'append', -- 指定update-mode,支持append/retract/upsert 'dts-avro.table-name' = 'tablename' ) 定義mysql source CREATE TABLE MySQLTable ( ... ) WITH ( 'connector.type' = 'jdbc', -- 必選: jdbc方式 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- 必選: JDBC url 'connector.table' = 'jdbc_table_name', -- 必選: 表名 -- 可選: JDBC driver,如果不配置,會自動通過url提取 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'name', -- 可選: 數據庫用戶名 'connector.password' = 'password',-- 可選: 數據庫密碼 -- 可選, 將輸入進行分區的字段名. 'connector.read.partition.column' = 'column_name', -- 可選, 分區數量. 'connector.read.partition.num' = '50', -- 可選, 第一個分區的最小值. 'connector.read.partition.lower-bound' = '500', -- 可選, 最后一個分區的最大值 'connector.read.partition.upper-bound' = '1000', -- 可選, 一次提取數據的行數,默認為0,表示忽略此配置 'connector.read.fetch-size' = '100', -- 可選, lookup緩存數據的最大行數,如果超過改配置,老的數據會被清除 'connector.lookup.cache.max-rows' = '5000', -- 可選,lookup緩存存活的最大時間,超過該時間舊數據會過時,注意cache.max-rows與cache.ttl必須同時配置 'connector.lookup.cache.ttl' = '10s', -- 可選, 查詢數據最大重試次數 'connector.lookup.max-retries' = '3', -- 可選,寫數據最大的flush行數,默認5000,超過改配置,會觸發刷數據 'connector.write.flush.max-rows' = '5000', --可選,flush數據的間隔時間,超過該時間,會通過一個異步線程flush數據,默認是0s 'connector.write.flush.interval' = '2s', -- 可選, 寫數據失敗最大重試次數 'connector.write.max-retries' = '3' ) 示例 //交易日志源表 CREATE TABLE datav_nrts_prod_trans_log ( id BIGINT COMMENT '自增主鍵', charge_id STRING COMMENT '支付唯一標識', trace_id STRING COMMENT '全局流水號', app_id STRING COMMENT '商戶appId', mer_cust_id STRING COMMENT '商戶客戶號', order_no STRING COMMENT '商戶請求訂單號', trans_id STRING COMMENT '通道返回訂單號', cust_id STRING COMMENT '客戶號', acct_id STRING COMMENT '賬戶號', pay_channel STRING COMMENT '支付渠道', channel STRING COMMENT '支付通道:hf-匯付', trans_stat STRING COMMENT '狀態:I-初始;R-待支付;P-處理中;S-成功;F-失敗', time_expire STRING COMMENT '訂單失效時間', trans_amt DECIMAL(14,2) COMMENT '交易金額', fee_amt DECIMAL(14,2) COMMENT '手續費金額', ts AS (CASE WHEN create_time IS NULL THEN TO_TIMESTAMP('1970-01-01 00:00:00') ELSE create_time END), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = '${kafka.topic}', 'properties.bootstrap.servers' = '${kafka.brokers}', 'properties.group.id' = '${kafka.group}', 'scan.startup.mode' = '${kafka.startup}', 'format' = 'dts-avro', 'dts-avro.table-name' = 'trans_log' ); 創建輸出流 示例 // 在這里編寫代碼 -- 統計指標目標表 CREATE TABLE all_trans_5m ( product_name STRING, window_start STRING, window_end STRING, trans_amt DECIMAL(24,2), trans_cnt BIGINT, bus_name STRING, PRIMARY KEY (product_name) NOT ENFORCED ) WITH ( 'connector'= 'jdbc', 'url' = '${mysql.url}', 'username' = '${mysql.username}', 'password' = '${mysql.password}', 'table-name' = '${mysql.tablename1}' ); 創建中間流 示例 // 在這里編寫代碼 CREATE VIEW v_ads_trans_detail AS SELECT id ,create_time ,mer_cust_id ,order_no ,pay_channel ,trans_amt ,fee_amt ,ts ,trans_date ,trans_time ,trans_type_desc ,product_name ,data_type ,window_start ,window_end FROM ( SELECT c.id ,c.create_time ,c.mer_cust_id ,c.order_no ,c.pay_channel ,c.trans_amt ,c.fee_amt ,c.ts ,DATE_FORMAT(c.create_time, 'yyyyMMdd') AS trans_date ,DATE_FORMAT(c.create_time,'HHmmss') trans_time ,(CASE WHEN c.pay_channel LIKE 'alipay%' THEN 'alipay' WHEN c.pay_channel LIKE 'wx%' THEN 'WeChat' WHEN c.pay_channel LIKE 'union%' THEN 'Union' else 'other' END) AS trans_type_desc ,'ADAPAY' AS product_name ,'ORDER' data_type FROM trans_log c WHERE c.trans_stat = 'S' ) dat , LATERAL TABLE(WINDOW_IN_MINUTES(ts, 5)) AS T(window_start, window_end); 創建離線維表 示例 //phoenix離線商戶信息 CREATE TABLE dim_mer_sales_pd_info( mer_id VARCHAR COMMENT '商戶id', mer_name VARCHAR COMMENT '商戶名稱', type VARCHAR COMMENT '商戶來源', sales_name VARCHAR COMMENT '銷售', domain_account VARCHAR COMMENT '銷售域賬戶', is_adamall VARCHAR COMMENT '業務類別是否adamall,1是,0否', channel_parent_mer_cust_id VARCHAR COMMENT '上級代理商ID', channel_parent_mer_cust_name VARCHAR COMMENT '上級代理商名稱', channel_top_mer_cust_id VARCHAR COMMENT '頂級渠道商id', channel_top_mer_cust_name VARCHAR COMMENT '頂級渠道商', bus_level VARCHAR COMMENT '渠道商層級', PRIMARY KEY (mer_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = '${phoenix.url}', 'table-name' = '${phoenix.table}', 'lookup.cache.ttl' = '1d', -- lookup cache 中每一行記錄的最大存活時間,若超過該時間,則最老的行記錄將會過期。 'lookup.cache.max-rows' = '150000', -- lookup cache 的最大行數,若超過該值,則最老的行記錄將會過期。 'lookup.max-retries' = '3' -- 查詢數據庫失敗的最大重試時間。 ); 創建實時維表 示例 //實時商戶信息 CREATE TABLE mer_user_info ( id bigint COMMENT '物理主鍵,自增', login_id varchar COMMENT '商戶注冊電話', mer_id varchar COMMENT 'pa商戶號', product_id varchar COMMENT '產品號', mer_cust_id varchar COMMENT '商戶客戶號', mer_name varchar COMMENT '商戶名', mer_short_name varchar COMMENT '商戶名簡稱', mer_provice varchar COMMENT '商戶所屬省份', mer_area varchar COMMENT '商戶所屬地區', upd_user varchar COMMENT '更新用戶', upd_ts TIMESTAMP COMMENT '更新時間', entry_mer_type varchar COMMENT '進件商戶類型:1-企業;2-小微', PRIMARY KEY (login_id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = '${kafka.topic}', 'properties.bootstrap.servers' = '${kafka.brokers}', 'properties.group.id' = '${kafka.group}', 'scan.startup.mode' = '${kafka.startup}', 'format' = 'dts-avro', 'dts-avro.table-name' = 'mer_user_info' ); 數據操作語句DML INSERT INTO語句 本文為您介紹如何在一個作業中寫入一個Sink或多個Sink。 源表 CREATE TABLE datagen_source ( name VARCHAR, score BIGINT ) WITH ( 'connector' = 'datagen' ); 結果表 create table blackhole_sink( name VARCHAR, score BIGINT ) WITH ( 'connector' = 'blackhole' ); 計算 Sink INSERT INTO blackhole_sink SELECT UPPER(name), score FROM datagen_source; 寫入多個Sink示例 BEGIN STATEMENT SET; --寫入多個Sink時,必填。 INSERT INTO blackhole_sinkA SELECT UPPER(name), sum(score) FROM datagen_source GROUP BY UPPER(name); INSERT INTO blackhole_sinkB SELECT LOWER(name), max(score) FROM datagen_source GROUP BY LOWER(name); END; --寫入多個Sink時,必填。 說明:寫入多個Sink語句時,需要以**BEGIN STATEMENT SET;開頭,以END;**結尾。 窗口函數 滾動窗口 本文為您介紹如何使用Flink滾動窗口函數。 定義 滾動窗口(TUMBLE)將每個元素分配到一個指定大小的窗口中。通常,滾動窗口有一個固定的大小,並且不會出現重疊。例如,如果指定了一個5分鍾大小的滾動窗口,無限流的數據會根據時間划分為[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口。 語法 TUMBLE函數用在GROUP BY子句中,用來定義滾動窗口。 TUMBLE(<time-attr>, <size-interval>) <size-interval>: INTERVAL 'string' timeUnit 說明 <time-attr>參數必須是時間流中的一個合法的時間屬性字段,指定為Processing Time或Event Time,請參見概述,了解如何定義時間屬性。 標識函數 使用標識函數選出窗口的起始時間或者結束時間,窗口的時間屬性用於下級Window的聚合。 窗口標識函數 返回類型 描述 TUMBLE_START(time-attr, size-interval) TIMESTAMP 返回窗口的起始時間(包含邊界)。例如[00:10,00:15)窗口,返回00:10 TUMBLE_END(time-attr, size-interval) TIMESTAMP 返回窗口的結束時間(包含邊界)。例如[00:00, 00:15]窗口,返回00:15 TUMBLE_ROWTIME(time-attr, size-interval) TIMESTAMP(rowtime-attr) 返回窗口的結束時間(不包含邊界)。例如[00:00, 00:15]窗口,返回 00:14:59.999。返回值是一個rowtime attribute,即可以基於該字段進行時間屬性的操作,例如,級聯窗口只能用在基於Event Time的Window上 TUMBLE_PROCTIME(time-attr, size-interval) TIMESTAMP(rowtime-attr) 返回窗口的結束時間(不包含邊界)。例如[00:00, 00:15]窗口,返回00:14:59.999。返回值是一個Proctime Attribute,即可以基於該字段進行時間屬性的操作。例如,級聯窗口只能用在基於Processing Time的Window上 示例 INSERT INTO tumble_output SELECT TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start, TUMBLE_END(ts, INTERVAL '1' MINUTE) as window_end, username, COUNT(click_url) FROM user_clicks GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),username;