前提:
1.配置flink的元數據到hive,不然每次重啟flink-cli,之前建的表都丟失了
在這個默認的sql-client-defaults.yaml修改
catalogs: - name: catalog_1 type: hive hive-conf-dir: /opt/module/hive/conf default-database: mydb execution: type: streaming result-mode: tableau #查詢出來的數據表格化,更好看
2.需要導入包:(我使用的flink1.11,自帶 flink-json-1.11.3.jar,flink-sql-connector-kafka_2.11-1.11.3.jar)
flink-connector-hive_2.11-1.11.0.jar
flink-connector-jdbc_2.12-1.11.3.jar
flink-table-planner_2.11-1.11.3.jar
flink-table-planner-blink_2.11-1.11.3.jar
hive-exec-3.1.2.jar
mysql-connector-java-8.0.15.jar
有了這些包分發到flink集群,flink就可以和mysql,hive,kafka互通
一、Flink 與 Mysql
1.mysql庫mydw建維度表
CREATE TABLE dim_province ( province_id BIGINT, province_name VARCHAR, region_name VARCHAR) ;
並插入數據:
insert into dim_province (province_id,province_name,region_name) values (1,'廣東','華南'); insert into dim_province (province_id,province_name,region_name) values (2,'湖北','華中'); insert into dim_province (province_id,province_name,region_name) values (3,'山東','華北');
2.Flink Source mysql
建維度表:維度數據存儲在mysql
CREATE TABLE dim_province ( province_id BIGINT, province_name VARCHAR, region_name VARCHAR ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://192.168.9.103:3306/mydw?useUnicode=true&characterEncoding=utf-8', 'connector.table' = 'dim_province', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = '000000', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' );
校驗是否成功:
Flink-sql>select * from dim_province;
Mysql的數據可以在這查出來;
3.Flink sink mysql
建表:
CREATE TABLE region_sales_sink ( region_name VARCHAR(30), buy_cnt BIGINT ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://192.168.9.103:3306/mydw?useUnicode=true&characterEncoding=utf-8', 'connector.table' = 'top_region', -- MySQL中的待插入數據的表 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = '000000', 'connector.write.flush.interval' = '1s' );
數據寫到mysql過程可能會出現中文亂碼,把mysql服務器配置文件設置為urf-8,
首先修改MySQL的配置文件/etc/mysql/my.cnf:
在[mysqld]下追加:
character-set-server=utf8
Mysql>status;
可以查看服務器編碼狀態都改為utf-8;
二、Flink 與 Kafka
建事實行為表:行為數據存儲在kafka
CREATE TABLE user_behavior ( user_id BIGINT, -- 用戶id item_id BIGINT, -- 商品id cat_id BIGINT, -- 品類id action STRING, -- 用戶行為 province INT, -- 用戶所在的省份 ts BIGINT, -- 用戶行為發生的時間戳 proctime as PROCTIME(), -- 通過計算列產生一個處理時間列 eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時間 WATERMARK FOR eventTime as eventTime - INTERVAL '5' SECOND ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka主題 'connector.startup-mode' = 'earliest-offset', -- 偏移量,從起始 offset 開始讀取 'connector.properties.group.id' = 'group1', -- 消費者組 'connector.properties.zookeeper.connect' = 'hadoop101:2181', -- zookeeper 地址 'connector.properties.bootstrap.servers' = 'hadoop101:9092', -- kafka broker 地址 'format.type' = 'json' -- 數據源格式為 json );
kafka生產者生產數據,檢驗數據:
Flink-sql>select * from user_behavior ;
打印數據
三、Flink + Hive
配置好catalog之后,Flink-Sql建的表都會在Hive落庫