Flink任務開發簡單流程


作業開發流程

 
  1.  
    創建輸入流
     
  2.  
    創建輸出流
     
  3.  
    檢查輸出數據類型是否和結果表一致,如不一致進行格式轉換
     
  4.  
    創建中間流
     
  5.  
    保存、語法檢查
     
  6.  
    測試發布
     
  7.  
    生產發布申請
FlinkSQL語法參考

創建輸入流
語法格式
CREATE TABLE [catalog_name_][db_name_]table_name
  (
    { <column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
-- 定義表字段
<column_definition>:
  column_name column_type [COMMENT column_comment]
-- 定義計算列
<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]
-- 定義水位線
<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

COMPUTED COLUMN(計算列)
計算列是一個通過column_name AS computed_column_expression生成的虛擬列,產生的計算列不是物理存儲在數據源表中。一個計算列可以通過原有數據源表中的某個字段、運算符及內置函數生成。比如,定義一個消費金額的計算列(cost),可以使用表的價格(price)*數量(quantity)計算得到。計算列常常被用在定義時間屬性(見另一篇文章Flink Table API&SQL編程指南之時間屬性(3),可以通過PROCTIME()函數定義處理時間屬性,語法為proc AS PROCTIME()。除此之外,計算列可以被用作提取事件時間列,因為原始的事件時間可能不是TIMESTAMP(3)類型或者是存在JSON串中。
水位線
水位線定義了表的事件時間屬性,其語法為:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
其中rowtime_column_name表示表中已經存在的事件時間字段,值得注意的是,該事件時間字段必須是TIMESTAMP(3)類型,即形如yyyy-MM-dd HH:mm:ss,如果不是這種形式的數據類型,需要通過定義計算列進行轉換。watermark_strategy_expression定義了水位線生成的策略,該表達式的返回數據類型必須是TIMESTAMP(3)類型。
Flink提供了許多常用的水位線生成策略:
嚴格單調遞增的水位線:語法為WATERMARK FOR rowtime_column AS rowtime_column
即直接使用時間時間戳作為水位線
遞增水位線:語法為WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
亂序水位線:語法為WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit-- 比如,允許5秒的亂序WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
WITH 選項
創建Table source或者Table sink需要指定表的屬性,屬性是以key/value的形式配置的,具體參考其相對應的connector
定義kafka source
CREATE TABLE KafkaTable (
  ...
) WITH (
  'connector.type' = 'kafka', -- 連接類型       
  'connector.version' = '0.11',-- 必選: 可選的kafka版本有:0.8/0.9/0.10/0.11/universal
  'connector.topic' = 'topic_name', -- 必選: 主題名稱
  'connector.properties.zookeeper.connect' = 'localhost:2181', -- 必選: zk連接地址
  'connector.properties.bootstrap.servers' = 'localhost:9092', -- 必選: Kafka連接地址
  'connector.properties.group.id' = 'testGroup', --可選: 消費者組
   -- 可選:偏移量, earliest-offset/latest-offset/group-offsets/specific-offsets
  'connector.startup-mode' = 'earliest-offset',                                          
  -- 可選: 當偏移量指定為specific offsets,為指定每個分區指定具體位置
  'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',
  'connector.sink-partitioner' = '...',-- 可選: sink分區器,fixed/round-robin/custom
  -- 可選: 當自定義分區器時,指定分區器的類名
  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
  'format.type' = '...',-- 必選: 指定格式,支持csv/json/avro
/dts-avro
  'update-mode' = 'append',
-- 指定update-mode,支持append/retract/upsert
  'dts-avro.table-name' = 'tablename'
)

定義mysql source
CREATE TABLE MySQLTable (
  ...
) WITH (
  'connector.type' = 'jdbc', -- 必選: jdbc方式
  'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- 必選: JDBC url
  'connector.table' = 'jdbc_table_name',  -- 必選: 表名
   -- 可選: JDBC driver,如果不配置,會自動通過url提取 
  'connector.driver' = 'com.mysql.jdbc.Driver',                                           
  'connector.username' = 'name', -- 可選: 數據庫用戶名
  'connector.password' = 'password',-- 可選: 數據庫密碼
    -- 可選, 將輸入進行分區的字段名.
  'connector.read.partition.column' = 'column_name',
    -- 可選, 分區數量.
  'connector.read.partition.num' = '50', 
    -- 可選, 第一個分區的最小值.
  'connector.read.partition.lower-bound' = '500',
    -- 可選, 最后一個分區的最大值
  'connector.read.partition.upper-bound' = '1000', 
    -- 可選, 一次提取數據的行數,默認為0,表示忽略此配置
  'connector.read.fetch-size' = '100', 
   -- 可選, lookup緩存數據的最大行數,如果超過改配置,老的數據會被清除
  'connector.lookup.cache.max-rows' = '5000', 
   -- 可選,lookup緩存存活的最大時間,超過該時間舊數據會過時,注意cache.max-rows與cache.ttl必須同時配置
  'connector.lookup.cache.ttl' = '10s', 
   -- 可選, 查詢數據最大重試次數
  'connector.lookup.max-retries' = '3', 
   -- 可選,寫數據最大的flush行數,默認5000,超過改配置,會觸發刷數據 
  'connector.write.flush.max-rows' = '5000', 
   --可選,flush數據的間隔時間,超過該時間,會通過一個異步線程flush數據,默認是0s 
  'connector.write.flush.interval' = '2s', 
  -- 可選, 寫數據失敗最大重試次數
  'connector.write.max-retries' = '3' 
)
示例
//交易日志源表
CREATE TABLE datav_nrts_prod_trans_log (
    id    BIGINT COMMENT '自增主鍵',
    charge_id    STRING COMMENT '支付唯一標識',
    trace_id    STRING COMMENT '全局流水號',
    app_id    STRING COMMENT '商戶appId',
    mer_cust_id    STRING COMMENT '商戶客戶號',
    order_no    STRING COMMENT '商戶請求訂單號',
    trans_id    STRING COMMENT '通道返回訂單號',
    cust_id    STRING COMMENT '客戶號',
    acct_id    STRING COMMENT '賬戶號',
    pay_channel    STRING COMMENT '支付渠道',
    channel    STRING COMMENT '支付通道:hf-匯付',
    trans_stat    STRING COMMENT '狀態:I-初始;R-待支付;P-處理中;S-成功;F-失敗',
    time_expire    STRING COMMENT '訂單失效時間',
    trans_amt    DECIMAL(14,2) COMMENT '交易金額',
    fee_amt    DECIMAL(14,2) COMMENT '手續費金額',
    ts AS (CASE WHEN create_time IS NULL THEN TO_TIMESTAMP('1970-01-01 00:00:00') ELSE create_time END),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = '${kafka.topic}',
    'properties.bootstrap.servers' = '${kafka.brokers}',
    'properties.group.id' = '${kafka.group}',
    'scan.startup.mode' = '${kafka.startup}',
    'format' = 'dts-avro',
    'dts-avro.table-name' = 'trans_log'
);
創建輸出流
示例
// 在這里編寫代碼
-- 統計指標目標表
CREATE TABLE all_trans_5m (
    product_name               STRING,
    window_start               STRING,
    window_end                 STRING,
    trans_amt                  DECIMAL(24,2),
    trans_cnt                  BIGINT,
    bus_name                   STRING,
    PRIMARY KEY (product_name) NOT ENFORCED
) WITH (
    'connector'= 'jdbc',
    'url' = '${mysql.url}',
    'username' = '${mysql.username}',
    'password' = '${mysql.password}',
    'table-name' = '${mysql.tablename1}'
);
創建中間流
示例
// 在這里編寫代碼
CREATE VIEW v_ads_trans_detail AS
SELECT id
     ,create_time
     ,mer_cust_id
     ,order_no
     ,pay_channel
     ,trans_amt
     ,fee_amt
     ,ts
     ,trans_date
     ,trans_time
     ,trans_type_desc
     ,product_name
     ,data_type
     ,window_start
     ,window_end
FROM (
         SELECT c.id
              ,c.create_time
              ,c.mer_cust_id
              ,c.order_no
              ,c.pay_channel
              ,c.trans_amt
              ,c.fee_amt
              ,c.ts
              ,DATE_FORMAT(c.create_time, 'yyyyMMdd') AS trans_date
              ,DATE_FORMAT(c.create_time,'HHmmss')  trans_time
              ,(CASE
                  WHEN c.pay_channel LIKE 'alipay%' THEN 'alipay'
                  WHEN c.pay_channel LIKE 'wx%' THEN 'WeChat'
                  WHEN c.pay_channel LIKE 'union%' THEN 'Union'
                  else 'other'
                END) AS trans_type_desc
              ,'ADAPAY' AS product_name
              ,'ORDER' data_type
         FROM trans_log c
         WHERE c.trans_stat = 'S'
) dat
, LATERAL TABLE(WINDOW_IN_MINUTES(ts, 5)) AS T(window_start, window_end);

創建離線維表
示例
//phoenix離線商戶信息
CREATE TABLE dim_mer_sales_pd_info( 
  mer_id VARCHAR COMMENT '商戶id',
  mer_name VARCHAR COMMENT '商戶名稱',
  type  VARCHAR COMMENT '商戶來源',
  sales_name    VARCHAR COMMENT '銷售',
  domain_account    VARCHAR COMMENT '銷售域賬戶',
  is_adamall    VARCHAR COMMENT '業務類別是否adamall,1是,0否',
  channel_parent_mer_cust_id VARCHAR  COMMENT '上級代理商ID',
  channel_parent_mer_cust_name VARCHAR  COMMENT '上級代理商名稱',
  channel_top_mer_cust_id    VARCHAR  COMMENT '頂級渠道商id',
  channel_top_mer_cust_name    VARCHAR  COMMENT '頂級渠道商',
  bus_level    VARCHAR  COMMENT '渠道商層級',
  PRIMARY KEY (mer_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = '${phoenix.url}',
    'table-name' = '${phoenix.table}',
    'lookup.cache.ttl' = '1d', -- lookup cache 中每一行記錄的最大存活時間,若超過該時間,則最老的行記錄將會過期。
    'lookup.cache.max-rows' = '150000', -- lookup cache 的最大行數,若超過該值,則最老的行記錄將會過期。
    'lookup.max-retries' = '3' -- 查詢數據庫失敗的最大重試時間。
);
創建實時維表
示例
//實時商戶信息
CREATE TABLE mer_user_info (
  id bigint COMMENT '物理主鍵,自增',
  login_id varchar  COMMENT '商戶注冊電話',
  mer_id varchar COMMENT 'pa商戶號',
  product_id varchar  COMMENT '產品號',
  mer_cust_id varchar  COMMENT '商戶客戶號',
  mer_name varchar   COMMENT '商戶名',
  mer_short_name varchar  COMMENT '商戶名簡稱',
  mer_provice varchar COMMENT '商戶所屬省份',
  mer_area varchar COMMENT '商戶所屬地區',
  upd_user varchar  COMMENT '更新用戶',
  upd_ts TIMESTAMP  COMMENT '更新時間',
  entry_mer_type varchar COMMENT '進件商戶類型:1-企業;2-小微',
  PRIMARY KEY (login_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = '${kafka.topic}',
    'properties.bootstrap.servers' = '${kafka.brokers}',
    'properties.group.id' = '${kafka.group}',
    'scan.startup.mode' = '${kafka.startup}',
    'format' = 'dts-avro',
    'dts-avro.table-name' = 'mer_user_info'
);
數據操作語句DML
INSERT INTO語句
本文為您介紹如何在一個作業中寫入一個Sink或多個Sink。
源表
CREATE TABLE datagen_source (
  name VARCHAR,
  score BIGINT
) WITH (
  'connector' = 'datagen' 
);
結果表
create table blackhole_sink(
    name VARCHAR,
    score BIGINT
) WITH (
  'connector' = 'blackhole' 
);
計算 Sink
INSERT INTO blackhole_sink 
SELECT UPPER(name), score FROM datagen_source;
寫入多個Sink示例

BEGIN STATEMENT SET;      --寫入多個Sink時,必填。
INSERT INTO blackhole_sinkA 
  SELECT UPPER(name), sum(score) 
  FROM datagen_source 
  GROUP BY UPPER(name);
INSERT INTO blackhole_sinkB 
  SELECT LOWER(name), max(score) 
  FROM datagen_source 
  GROUP BY LOWER(name);
END;      --寫入多個Sink時,必填。
說明:寫入多個Sink語句時,需要以**BEGIN STATEMENT SET;開頭,以END;**結尾。
窗口函數
滾動窗口
本文為您介紹如何使用Flink滾動窗口函數。
定義
滾動窗口(TUMBLE)將每個元素分配到一個指定大小的窗口中。通常,滾動窗口有一個固定的大小,並且不會出現重疊。例如,如果指定了一個5分鍾大小的滾動窗口,無限流的數據會根據時間划分為[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口。
語法
TUMBLE函數用在GROUP BY子句中,用來定義滾動窗口。
TUMBLE(<time-attr>, <size-interval>)
<size-interval>: INTERVAL 'string' timeUnit
說明 <time-attr>參數必須是時間流中的一個合法的時間屬性字段,指定為Processing Time或Event Time,請參見概述,了解如何定義時間屬性。
標識函數
使用標識函數選出窗口的起始時間或者結束時間,窗口的時間屬性用於下級Window的聚合。
窗口標識函數
返回類型
描述
TUMBLE_START(time-attr, size-interval)
TIMESTAMP
返回窗口的起始時間(包含邊界)。例如[00:10,00:15)窗口,返回00:10
TUMBLE_END(time-attr, size-interval)
TIMESTAMP
返回窗口的結束時間(包含邊界)。例如[00:00, 00:15]窗口,返回00:15
TUMBLE_ROWTIME(time-attr, size-interval)
TIMESTAMP(rowtime-attr)
返回窗口的結束時間(不包含邊界)。例如[00:00, 00:15]窗口,返回
00:14:59.999。返回值是一個rowtime attribute,即可以基於該字段進行時間屬性的操作,例如,級聯窗口只能用在基於Event Time的Window上
TUMBLE_PROCTIME(time-attr, size-interval)
TIMESTAMP(rowtime-attr)
返回窗口的結束時間(不包含邊界)。例如[00:00, 00:15]窗口,返回00:14:59.999。返回值是一個Proctime Attribute,即可以基於該字段進行時間屬性的操作。例如,級聯窗口只能用在基於Processing Time的Window上
示例
INSERT INTO tumble_output
SELECT
TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start,
TUMBLE_END(ts, INTERVAL '1' MINUTE) as window_end,
username,
COUNT(click_url)
FROM user_clicks
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),username;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM