Flink任务开发简单流程


作业开发流程

 
  1.  
    创建输入流
     
  2.  
    创建输出流
     
  3.  
    检查输出数据类型是否和结果表一致,如不一致进行格式转换
     
  4.  
    创建中间流
     
  5.  
    保存、语法检查
     
  6.  
    测试发布
     
  7.  
    生产发布申请
FlinkSQL语法参考

创建输入流
语法格式
CREATE TABLE [catalog_name_][db_name_]table_name
  (
    { <column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
-- 定义表字段
<column_definition>:
  column_name column_type [COMMENT column_comment]
-- 定义计算列
<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]
-- 定义水位线
<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

COMPUTED COLUMN(计算列)
计算列是一个通过column_name AS computed_column_expression生成的虚拟列,产生的计算列不是物理存储在数据源表中。一个计算列可以通过原有数据源表中的某个字段、运算符及内置函数生成。比如,定义一个消费金额的计算列(cost),可以使用表的价格(price)*数量(quantity)计算得到。计算列常常被用在定义时间属性(见另一篇文章Flink Table API&SQL编程指南之时间属性(3),可以通过PROCTIME()函数定义处理时间属性,语法为proc AS PROCTIME()。除此之外,计算列可以被用作提取事件时间列,因为原始的事件时间可能不是TIMESTAMP(3)类型或者是存在JSON串中。
水位线
水位线定义了表的事件时间属性,其语法为:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
其中rowtime_column_name表示表中已经存在的事件时间字段,值得注意的是,该事件时间字段必须是TIMESTAMP(3)类型,即形如yyyy-MM-dd HH:mm:ss,如果不是这种形式的数据类型,需要通过定义计算列进行转换。watermark_strategy_expression定义了水位线生成的策略,该表达式的返回数据类型必须是TIMESTAMP(3)类型。
Flink提供了许多常用的水位线生成策略:
严格单调递增的水位线:语法为WATERMARK FOR rowtime_column AS rowtime_column
即直接使用时间时间戳作为水位线
递增水位线:语法为WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
乱序水位线:语法为WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit-- 比如,允许5秒的乱序WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
WITH 选项
创建Table source或者Table sink需要指定表的属性,属性是以key/value的形式配置的,具体参考其相对应的connector
定义kafka source
CREATE TABLE KafkaTable (
  ...
) WITH (
  'connector.type' = 'kafka', -- 连接类型       
  'connector.version' = '0.11',-- 必选: 可选的kafka版本有:0.8/0.9/0.10/0.11/universal
  'connector.topic' = 'topic_name', -- 必选: 主题名称
  'connector.properties.zookeeper.connect' = 'localhost:2181', -- 必选: zk连接地址
  'connector.properties.bootstrap.servers' = 'localhost:9092', -- 必选: Kafka连接地址
  'connector.properties.group.id' = 'testGroup', --可选: 消费者组
   -- 可选:偏移量, earliest-offset/latest-offset/group-offsets/specific-offsets
  'connector.startup-mode' = 'earliest-offset',                                          
  -- 可选: 当偏移量指定为specific offsets,为指定每个分区指定具体位置
  'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',
  'connector.sink-partitioner' = '...',-- 可选: sink分区器,fixed/round-robin/custom
  -- 可选: 当自定义分区器时,指定分区器的类名
  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
  'format.type' = '...',-- 必选: 指定格式,支持csv/json/avro
/dts-avro
  'update-mode' = 'append',
-- 指定update-mode,支持append/retract/upsert
  'dts-avro.table-name' = 'tablename'
)

定义mysql source
CREATE TABLE MySQLTable (
  ...
) WITH (
  'connector.type' = 'jdbc', -- 必选: jdbc方式
  'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- 必选: JDBC url
  'connector.table' = 'jdbc_table_name',  -- 必选: 表名
   -- 可选: JDBC driver,如果不配置,会自动通过url提取 
  'connector.driver' = 'com.mysql.jdbc.Driver',                                           
  'connector.username' = 'name', -- 可选: 数据库用户名
  'connector.password' = 'password',-- 可选: 数据库密码
    -- 可选, 将输入进行分区的字段名.
  'connector.read.partition.column' = 'column_name',
    -- 可选, 分区数量.
  'connector.read.partition.num' = '50', 
    -- 可选, 第一个分区的最小值.
  'connector.read.partition.lower-bound' = '500',
    -- 可选, 最后一个分区的最大值
  'connector.read.partition.upper-bound' = '1000', 
    -- 可选, 一次提取数据的行数,默认为0,表示忽略此配置
  'connector.read.fetch-size' = '100', 
   -- 可选, lookup缓存数据的最大行数,如果超过改配置,老的数据会被清除
  'connector.lookup.cache.max-rows' = '5000', 
   -- 可选,lookup缓存存活的最大时间,超过该时间旧数据会过时,注意cache.max-rows与cache.ttl必须同时配置
  'connector.lookup.cache.ttl' = '10s', 
   -- 可选, 查询数据最大重试次数
  'connector.lookup.max-retries' = '3', 
   -- 可选,写数据最大的flush行数,默认5000,超过改配置,会触发刷数据 
  'connector.write.flush.max-rows' = '5000', 
   --可选,flush数据的间隔时间,超过该时间,会通过一个异步线程flush数据,默认是0s 
  'connector.write.flush.interval' = '2s', 
  -- 可选, 写数据失败最大重试次数
  'connector.write.max-retries' = '3' 
)
示例
//交易日志源表
CREATE TABLE datav_nrts_prod_trans_log (
    id    BIGINT COMMENT '自增主键',
    charge_id    STRING COMMENT '支付唯一标识',
    trace_id    STRING COMMENT '全局流水号',
    app_id    STRING COMMENT '商户appId',
    mer_cust_id    STRING COMMENT '商户客户号',
    order_no    STRING COMMENT '商户请求订单号',
    trans_id    STRING COMMENT '通道返回订单号',
    cust_id    STRING COMMENT '客户号',
    acct_id    STRING COMMENT '账户号',
    pay_channel    STRING COMMENT '支付渠道',
    channel    STRING COMMENT '支付通道:hf-汇付',
    trans_stat    STRING COMMENT '状态:I-初始;R-待支付;P-处理中;S-成功;F-失败',
    time_expire    STRING COMMENT '订单失效时间',
    trans_amt    DECIMAL(14,2) COMMENT '交易金额',
    fee_amt    DECIMAL(14,2) COMMENT '手续费金额',
    ts AS (CASE WHEN create_time IS NULL THEN TO_TIMESTAMP('1970-01-01 00:00:00') ELSE create_time END),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = '${kafka.topic}',
    'properties.bootstrap.servers' = '${kafka.brokers}',
    'properties.group.id' = '${kafka.group}',
    'scan.startup.mode' = '${kafka.startup}',
    'format' = 'dts-avro',
    'dts-avro.table-name' = 'trans_log'
);
创建输出流
示例
// 在这里编写代码
-- 统计指标目标表
CREATE TABLE all_trans_5m (
    product_name               STRING,
    window_start               STRING,
    window_end                 STRING,
    trans_amt                  DECIMAL(24,2),
    trans_cnt                  BIGINT,
    bus_name                   STRING,
    PRIMARY KEY (product_name) NOT ENFORCED
) WITH (
    'connector'= 'jdbc',
    'url' = '${mysql.url}',
    'username' = '${mysql.username}',
    'password' = '${mysql.password}',
    'table-name' = '${mysql.tablename1}'
);
创建中间流
示例
// 在这里编写代码
CREATE VIEW v_ads_trans_detail AS
SELECT id
     ,create_time
     ,mer_cust_id
     ,order_no
     ,pay_channel
     ,trans_amt
     ,fee_amt
     ,ts
     ,trans_date
     ,trans_time
     ,trans_type_desc
     ,product_name
     ,data_type
     ,window_start
     ,window_end
FROM (
         SELECT c.id
              ,c.create_time
              ,c.mer_cust_id
              ,c.order_no
              ,c.pay_channel
              ,c.trans_amt
              ,c.fee_amt
              ,c.ts
              ,DATE_FORMAT(c.create_time, 'yyyyMMdd') AS trans_date
              ,DATE_FORMAT(c.create_time,'HHmmss')  trans_time
              ,(CASE
                  WHEN c.pay_channel LIKE 'alipay%' THEN 'alipay'
                  WHEN c.pay_channel LIKE 'wx%' THEN 'WeChat'
                  WHEN c.pay_channel LIKE 'union%' THEN 'Union'
                  else 'other'
                END) AS trans_type_desc
              ,'ADAPAY' AS product_name
              ,'ORDER' data_type
         FROM trans_log c
         WHERE c.trans_stat = 'S'
) dat
, LATERAL TABLE(WINDOW_IN_MINUTES(ts, 5)) AS T(window_start, window_end);

创建离线维表
示例
//phoenix离线商户信息
CREATE TABLE dim_mer_sales_pd_info( 
  mer_id VARCHAR COMMENT '商户id',
  mer_name VARCHAR COMMENT '商户名称',
  type  VARCHAR COMMENT '商户来源',
  sales_name    VARCHAR COMMENT '销售',
  domain_account    VARCHAR COMMENT '销售域账户',
  is_adamall    VARCHAR COMMENT '业务类别是否adamall,1是,0否',
  channel_parent_mer_cust_id VARCHAR  COMMENT '上级代理商ID',
  channel_parent_mer_cust_name VARCHAR  COMMENT '上级代理商名称',
  channel_top_mer_cust_id    VARCHAR  COMMENT '顶级渠道商id',
  channel_top_mer_cust_name    VARCHAR  COMMENT '顶级渠道商',
  bus_level    VARCHAR  COMMENT '渠道商层级',
  PRIMARY KEY (mer_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = '${phoenix.url}',
    'table-name' = '${phoenix.table}',
    'lookup.cache.ttl' = '1d', -- lookup cache 中每一行记录的最大存活时间,若超过该时间,则最老的行记录将会过期。
    'lookup.cache.max-rows' = '150000', -- lookup cache 的最大行数,若超过该值,则最老的行记录将会过期。
    'lookup.max-retries' = '3' -- 查询数据库失败的最大重试时间。
);
创建实时维表
示例
//实时商户信息
CREATE TABLE mer_user_info (
  id bigint COMMENT '物理主键,自增',
  login_id varchar  COMMENT '商户注册电话',
  mer_id varchar COMMENT 'pa商户号',
  product_id varchar  COMMENT '产品号',
  mer_cust_id varchar  COMMENT '商户客户号',
  mer_name varchar   COMMENT '商户名',
  mer_short_name varchar  COMMENT '商户名简称',
  mer_provice varchar COMMENT '商户所属省份',
  mer_area varchar COMMENT '商户所属地区',
  upd_user varchar  COMMENT '更新用户',
  upd_ts TIMESTAMP  COMMENT '更新时间',
  entry_mer_type varchar COMMENT '进件商户类型:1-企业;2-小微',
  PRIMARY KEY (login_id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = '${kafka.topic}',
    'properties.bootstrap.servers' = '${kafka.brokers}',
    'properties.group.id' = '${kafka.group}',
    'scan.startup.mode' = '${kafka.startup}',
    'format' = 'dts-avro',
    'dts-avro.table-name' = 'mer_user_info'
);
数据操作语句DML
INSERT INTO语句
本文为您介绍如何在一个作业中写入一个Sink或多个Sink。
源表
CREATE TABLE datagen_source (
  name VARCHAR,
  score BIGINT
) WITH (
  'connector' = 'datagen' 
);
结果表
create table blackhole_sink(
    name VARCHAR,
    score BIGINT
) WITH (
  'connector' = 'blackhole' 
);
计算 Sink
INSERT INTO blackhole_sink 
SELECT UPPER(name), score FROM datagen_source;
写入多个Sink示例

BEGIN STATEMENT SET;      --写入多个Sink时,必填。
INSERT INTO blackhole_sinkA 
  SELECT UPPER(name), sum(score) 
  FROM datagen_source 
  GROUP BY UPPER(name);
INSERT INTO blackhole_sinkB 
  SELECT LOWER(name), max(score) 
  FROM datagen_source 
  GROUP BY LOWER(name);
END;      --写入多个Sink时,必填。
说明:写入多个Sink语句时,需要以**BEGIN STATEMENT SET;开头,以END;**结尾。
窗口函数
滚动窗口
本文为您介绍如何使用Flink滚动窗口函数。
定义
滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中。通常,滚动窗口有一个固定的大小,并且不会出现重叠。例如,如果指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口。
语法
TUMBLE函数用在GROUP BY子句中,用来定义滚动窗口。
TUMBLE(<time-attr>, <size-interval>)
<size-interval>: INTERVAL 'string' timeUnit
说明 <time-attr>参数必须是时间流中的一个合法的时间属性字段,指定为Processing Time或Event Time,请参见概述,了解如何定义时间属性。
标识函数
使用标识函数选出窗口的起始时间或者结束时间,窗口的时间属性用于下级Window的聚合。
窗口标识函数
返回类型
描述
TUMBLE_START(time-attr, size-interval)
TIMESTAMP
返回窗口的起始时间(包含边界)。例如[00:10,00:15)窗口,返回00:10
TUMBLE_END(time-attr, size-interval)
TIMESTAMP
返回窗口的结束时间(包含边界)。例如[00:00, 00:15]窗口,返回00:15
TUMBLE_ROWTIME(time-attr, size-interval)
TIMESTAMP(rowtime-attr)
返回窗口的结束时间(不包含边界)。例如[00:00, 00:15]窗口,返回
00:14:59.999。返回值是一个rowtime attribute,即可以基于该字段进行时间属性的操作,例如,级联窗口只能用在基于Event Time的Window上
TUMBLE_PROCTIME(time-attr, size-interval)
TIMESTAMP(rowtime-attr)
返回窗口的结束时间(不包含边界)。例如[00:00, 00:15]窗口,返回00:14:59.999。返回值是一个Proctime Attribute,即可以基于该字段进行时间属性的操作。例如,级联窗口只能用在基于Processing Time的Window上
示例
INSERT INTO tumble_output
SELECT
TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start,
TUMBLE_END(ts, INTERVAL '1' MINUTE) as window_end,
username,
COUNT(click_url)
FROM user_clicks
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),username;

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM