3: 讀取kafka數據寫如mysql


關於kafka的source部分請參考 上一篇: https://www.cnblogs.com/liufei1983/p/15801848.html

 

1: 首先下載兩個和jdbc和mysql相關的jar包,注意版本,我的flink是1.13.1, 所以flink-connect-jdck_2.11也用1.13.1的版本,否則會報錯誤。

 

 

2:  在MYSQL里建立一個表:

-- `sql-demo`.cdn_access_statistic definition  (這個在MYSQL里執行)

CREATE TABLE `cdn_access_statistic` (
  `province` varchar(100) DEFAULT NULL,
  `access_count` bigint DEFAULT NULL,
  `total_download` bigint DEFAULT NULL,
  `download_speed` bigint DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

在zeppelin里創建SINK job: 因為zeppeline是在docker運行,所以MYSQL的url的地址不能寫localhost, 要寫宿主機的IP

%flink.ssql

DROP table if exists cdn_access_statistic;

-- Please create this mysql table first in your mysql instance. Flink won't create mysql table for you.

CREATE TABLE cdn_access_statistic (
 province VARCHAR,
 access_count BIGINT,
 total_download BIGINT,
 download_speed DOUBLE
) WITH (
 'connector.type' = 'jdbc',
 'connector.url' = 'jdbc:mysql://192.168.3.XXX:3306/sql-demo',
 'connector.table' = 'cdn_access_statistic',
 'connector.username' = 'sql-demo',
 'connector.password' = 'demo-sql',
 'connector.write.flush.interval' = '1s'
)

3: 確定 kafak的source table和 mysql的sink table都創建了。

        

 

 4: 從kafka消費數據,存儲到mysql. 可以看到mysql 數據庫里數據在變化

%flink.ssql

insert into cdn_access_statistic 
select client_ip, request_time,request_time,request_time
from cdn_access_log

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM