作业开发流程
-
创建输入流
-
创建输出流
-
检查输出数据类型是否和结果表一致,如不一致进行格式转换
-
创建中间流
-
保存、语法检查
-
测试发布
-
生产发布申请
FlinkSQL语法参考 创建输入流 语法格式 CREATE TABLE [catalog_name_][db_name_]table_name ( { <column_definition> | <computed_column_definition> }[ , ...n] [ <watermark_definition> ] ) [COMMENT table_comment] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] WITH (key1=val1, key2=val2, ...) -- 定义表字段 <column_definition>: column_name column_type [COMMENT column_comment] -- 定义计算列 <computed_column_definition>: column_name AS computed_column_expression [COMMENT column_comment] -- 定义水位线 <watermark_definition>: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression COMPUTED COLUMN(计算列) 计算列是一个通过column_name AS computed_column_expression生成的虚拟列,产生的计算列不是物理存储在数据源表中。一个计算列可以通过原有数据源表中的某个字段、运算符及内置函数生成。比如,定义一个消费金额的计算列(cost),可以使用表的价格(price)*数量(quantity)计算得到。计算列常常被用在定义时间属性(见另一篇文章Flink Table API&SQL编程指南之时间属性(3),可以通过PROCTIME()函数定义处理时间属性,语法为proc AS PROCTIME()。除此之外,计算列可以被用作提取事件时间列,因为原始的事件时间可能不是TIMESTAMP(3)类型或者是存在JSON串中。 水位线 水位线定义了表的事件时间属性,其语法为: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 其中rowtime_column_name表示表中已经存在的事件时间字段,值得注意的是,该事件时间字段必须是TIMESTAMP(3)类型,即形如yyyy-MM-dd HH:mm:ss,如果不是这种形式的数据类型,需要通过定义计算列进行转换。watermark_strategy_expression定义了水位线生成的策略,该表达式的返回数据类型必须是TIMESTAMP(3)类型。 Flink提供了许多常用的水位线生成策略: 严格单调递增的水位线:语法为WATERMARK FOR rowtime_column AS rowtime_column 即直接使用时间时间戳作为水位线 递增水位线:语法为WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND 乱序水位线:语法为WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit-- 比如,允许5秒的乱序WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND WITH 选项 创建Table source或者Table sink需要指定表的属性,属性是以key/value的形式配置的,具体参考其相对应的connector 定义kafka source CREATE TABLE KafkaTable ( ... ) WITH ( 'connector.type' = 'kafka', -- 连接类型 'connector.version' = '0.11',-- 必选: 可选的kafka版本有:0.8/0.9/0.10/0.11/universal 'connector.topic' = 'topic_name', -- 必选: 主题名称 'connector.properties.zookeeper.connect' = 'localhost:2181', -- 必选: zk连接地址 'connector.properties.bootstrap.servers' = 'localhost:9092', -- 必选: Kafka连接地址 'connector.properties.group.id' = 'testGroup', --可选: 消费者组 -- 可选:偏移量, earliest-offset/latest-offset/group-offsets/specific-offsets 'connector.startup-mode' = 'earliest-offset', -- 可选: 当偏移量指定为specific offsets,为指定每个分区指定具体位置 'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300', 'connector.sink-partitioner' = '...',-- 可选: sink分区器,fixed/round-robin/custom -- 可选: 当自定义分区器时,指定分区器的类名 'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner', 'format.type' = '...',-- 必选: 指定格式,支持csv/json/avro /dts-avro 'update-mode' = 'append', -- 指定update-mode,支持append/retract/upsert 'dts-avro.table-name' = 'tablename' ) 定义mysql source CREATE TABLE MySQLTable ( ... ) WITH ( 'connector.type' = 'jdbc', -- 必选: jdbc方式 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- 必选: JDBC url 'connector.table' = 'jdbc_table_name', -- 必选: 表名 -- 可选: JDBC driver,如果不配置,会自动通过url提取 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'name', -- 可选: 数据库用户名 'connector.password' = 'password',-- 可选: 数据库密码 -- 可选, 将输入进行分区的字段名. 'connector.read.partition.column' = 'column_name', -- 可选, 分区数量. 'connector.read.partition.num' = '50', -- 可选, 第一个分区的最小值. 'connector.read.partition.lower-bound' = '500', -- 可选, 最后一个分区的最大值 'connector.read.partition.upper-bound' = '1000', -- 可选, 一次提取数据的行数,默认为0,表示忽略此配置 'connector.read.fetch-size' = '100', -- 可选, lookup缓存数据的最大行数,如果超过改配置,老的数据会被清除 'connector.lookup.cache.max-rows' = '5000', -- 可选,lookup缓存存活的最大时间,超过该时间旧数据会过时,注意cache.max-rows与cache.ttl必须同时配置 'connector.lookup.cache.ttl' = '10s', -- 可选, 查询数据最大重试次数 'connector.lookup.max-retries' = '3', -- 可选,写数据最大的flush行数,默认5000,超过改配置,会触发刷数据 'connector.write.flush.max-rows' = '5000', --可选,flush数据的间隔时间,超过该时间,会通过一个异步线程flush数据,默认是0s 'connector.write.flush.interval' = '2s', -- 可选, 写数据失败最大重试次数 'connector.write.max-retries' = '3' ) 示例 //交易日志源表 CREATE TABLE datav_nrts_prod_trans_log ( id BIGINT COMMENT '自增主键', charge_id STRING COMMENT '支付唯一标识', trace_id STRING COMMENT '全局流水号', app_id STRING COMMENT '商户appId', mer_cust_id STRING COMMENT '商户客户号', order_no STRING COMMENT '商户请求订单号', trans_id STRING COMMENT '通道返回订单号', cust_id STRING COMMENT '客户号', acct_id STRING COMMENT '账户号', pay_channel STRING COMMENT '支付渠道', channel STRING COMMENT '支付通道:hf-汇付', trans_stat STRING COMMENT '状态:I-初始;R-待支付;P-处理中;S-成功;F-失败', time_expire STRING COMMENT '订单失效时间', trans_amt DECIMAL(14,2) COMMENT '交易金额', fee_amt DECIMAL(14,2) COMMENT '手续费金额', ts AS (CASE WHEN create_time IS NULL THEN TO_TIMESTAMP('1970-01-01 00:00:00') ELSE create_time END), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = '${kafka.topic}', 'properties.bootstrap.servers' = '${kafka.brokers}', 'properties.group.id' = '${kafka.group}', 'scan.startup.mode' = '${kafka.startup}', 'format' = 'dts-avro', 'dts-avro.table-name' = 'trans_log' ); 创建输出流 示例 // 在这里编写代码 -- 统计指标目标表 CREATE TABLE all_trans_5m ( product_name STRING, window_start STRING, window_end STRING, trans_amt DECIMAL(24,2), trans_cnt BIGINT, bus_name STRING, PRIMARY KEY (product_name) NOT ENFORCED ) WITH ( 'connector'= 'jdbc', 'url' = '${mysql.url}', 'username' = '${mysql.username}', 'password' = '${mysql.password}', 'table-name' = '${mysql.tablename1}' ); 创建中间流 示例 // 在这里编写代码 CREATE VIEW v_ads_trans_detail AS SELECT id ,create_time ,mer_cust_id ,order_no ,pay_channel ,trans_amt ,fee_amt ,ts ,trans_date ,trans_time ,trans_type_desc ,product_name ,data_type ,window_start ,window_end FROM ( SELECT c.id ,c.create_time ,c.mer_cust_id ,c.order_no ,c.pay_channel ,c.trans_amt ,c.fee_amt ,c.ts ,DATE_FORMAT(c.create_time, 'yyyyMMdd') AS trans_date ,DATE_FORMAT(c.create_time,'HHmmss') trans_time ,(CASE WHEN c.pay_channel LIKE 'alipay%' THEN 'alipay' WHEN c.pay_channel LIKE 'wx%' THEN 'WeChat' WHEN c.pay_channel LIKE 'union%' THEN 'Union' else 'other' END) AS trans_type_desc ,'ADAPAY' AS product_name ,'ORDER' data_type FROM trans_log c WHERE c.trans_stat = 'S' ) dat , LATERAL TABLE(WINDOW_IN_MINUTES(ts, 5)) AS T(window_start, window_end); 创建离线维表 示例 //phoenix离线商户信息 CREATE TABLE dim_mer_sales_pd_info( mer_id VARCHAR COMMENT '商户id', mer_name VARCHAR COMMENT '商户名称', type VARCHAR COMMENT '商户来源', sales_name VARCHAR COMMENT '销售', domain_account VARCHAR COMMENT '销售域账户', is_adamall VARCHAR COMMENT '业务类别是否adamall,1是,0否', channel_parent_mer_cust_id VARCHAR COMMENT '上级代理商ID', channel_parent_mer_cust_name VARCHAR COMMENT '上级代理商名称', channel_top_mer_cust_id VARCHAR COMMENT '顶级渠道商id', channel_top_mer_cust_name VARCHAR COMMENT '顶级渠道商', bus_level VARCHAR COMMENT '渠道商层级', PRIMARY KEY (mer_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = '${phoenix.url}', 'table-name' = '${phoenix.table}', 'lookup.cache.ttl' = '1d', -- lookup cache 中每一行记录的最大存活时间,若超过该时间,则最老的行记录将会过期。 'lookup.cache.max-rows' = '150000', -- lookup cache 的最大行数,若超过该值,则最老的行记录将会过期。 'lookup.max-retries' = '3' -- 查询数据库失败的最大重试时间。 ); 创建实时维表 示例 //实时商户信息 CREATE TABLE mer_user_info ( id bigint COMMENT '物理主键,自增', login_id varchar COMMENT '商户注册电话', mer_id varchar COMMENT 'pa商户号', product_id varchar COMMENT '产品号', mer_cust_id varchar COMMENT '商户客户号', mer_name varchar COMMENT '商户名', mer_short_name varchar COMMENT '商户名简称', mer_provice varchar COMMENT '商户所属省份', mer_area varchar COMMENT '商户所属地区', upd_user varchar COMMENT '更新用户', upd_ts TIMESTAMP COMMENT '更新时间', entry_mer_type varchar COMMENT '进件商户类型:1-企业;2-小微', PRIMARY KEY (login_id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = '${kafka.topic}', 'properties.bootstrap.servers' = '${kafka.brokers}', 'properties.group.id' = '${kafka.group}', 'scan.startup.mode' = '${kafka.startup}', 'format' = 'dts-avro', 'dts-avro.table-name' = 'mer_user_info' ); 数据操作语句DML INSERT INTO语句 本文为您介绍如何在一个作业中写入一个Sink或多个Sink。 源表 CREATE TABLE datagen_source ( name VARCHAR, score BIGINT ) WITH ( 'connector' = 'datagen' ); 结果表 create table blackhole_sink( name VARCHAR, score BIGINT ) WITH ( 'connector' = 'blackhole' ); 计算 Sink INSERT INTO blackhole_sink SELECT UPPER(name), score FROM datagen_source; 写入多个Sink示例 BEGIN STATEMENT SET; --写入多个Sink时,必填。 INSERT INTO blackhole_sinkA SELECT UPPER(name), sum(score) FROM datagen_source GROUP BY UPPER(name); INSERT INTO blackhole_sinkB SELECT LOWER(name), max(score) FROM datagen_source GROUP BY LOWER(name); END; --写入多个Sink时,必填。 说明:写入多个Sink语句时,需要以**BEGIN STATEMENT SET;开头,以END;**结尾。 窗口函数 滚动窗口 本文为您介绍如何使用Flink滚动窗口函数。 定义 滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中。通常,滚动窗口有一个固定的大小,并且不会出现重叠。例如,如果指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口。 语法 TUMBLE函数用在GROUP BY子句中,用来定义滚动窗口。 TUMBLE(<time-attr>, <size-interval>) <size-interval>: INTERVAL 'string' timeUnit 说明 <time-attr>参数必须是时间流中的一个合法的时间属性字段,指定为Processing Time或Event Time,请参见概述,了解如何定义时间属性。 标识函数 使用标识函数选出窗口的起始时间或者结束时间,窗口的时间属性用于下级Window的聚合。 窗口标识函数 返回类型 描述 TUMBLE_START(time-attr, size-interval) TIMESTAMP 返回窗口的起始时间(包含边界)。例如[00:10,00:15)窗口,返回00:10 TUMBLE_END(time-attr, size-interval) TIMESTAMP 返回窗口的结束时间(包含边界)。例如[00:00, 00:15]窗口,返回00:15 TUMBLE_ROWTIME(time-attr, size-interval) TIMESTAMP(rowtime-attr) 返回窗口的结束时间(不包含边界)。例如[00:00, 00:15]窗口,返回 00:14:59.999。返回值是一个rowtime attribute,即可以基于该字段进行时间属性的操作,例如,级联窗口只能用在基于Event Time的Window上 TUMBLE_PROCTIME(time-attr, size-interval) TIMESTAMP(rowtime-attr) 返回窗口的结束时间(不包含边界)。例如[00:00, 00:15]窗口,返回00:14:59.999。返回值是一个Proctime Attribute,即可以基于该字段进行时间属性的操作。例如,级联窗口只能用在基于Processing Time的Window上 示例 INSERT INTO tumble_output SELECT TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start, TUMBLE_END(ts, INTERVAL '1' MINUTE) as window_end, username, COUNT(click_url) FROM user_clicks GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),username;