Flink 1.10 SQL 讀寫Kafka


最近因為疫情的原因,偷了好長時間的懶,現在終於開始繼續看Flink 的SQL 了 

————————————————

電腦上的Flink 項目早就升級到了 1.10了,最近還在看官網新的文檔,趁着周末,體驗一下新版本的SQL API(踩一下坑)。

直接從之前的 雲邪大佬的Flink 的 SQL 樣例開始(pom 已經提前整理好了)。

簡單回憶一下內容,就是從kafka 接收 用戶行為,根據時間分組,求PV 和UV ,然后輸出到 mysql 中。

先看下加的 依賴:

    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table</artifactId>
    <version>${flink.version}</version>
    <type>pom</type>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- or... -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-jdbc_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>${flink.version}</version>
</dependency>

table 相關的有這些,注意幾個新的依賴,如:flink-jdbc_2.11-1.10.0.jar

看下對應的sql文件:

--sourceTable
CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3)
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.0.key' = 'zookeeper.connect',
    'connector.properties.0.value' = 'venn:2181',
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = 'venn:9092',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);

--sinkTable
CREATE TABLE pvuv_sink (
    dt VARCHAR,
    pv BIGINT,
    uv BIGINT
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://venn:3306/venn',
    'connector.table' = 'pvuv_sink',
    'connector.username' = 'root',
    'connector.password' = '123456',
    'connector.write.flush.max-rows' = '1'
);

--insert
INSERT INTO pvuv_sink(dt, pv, uv)
SELECT
  DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
  COUNT(*) AS pv,
  COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');

執行

遇到的第一個問題就是: "Type TIMESTAMP(6) of table field 'ts' does not match with the physical type TIMESTAMP(3) of the 'ts' field of the TableSource return type"

看起來 TIMESTAMP 默認的是 TIMESTAMP(6) ,與 source 中的 TIMESTAMP("ts": "2017-11-26T01:00:01Z")不匹配,直接將 ts 的數據類型改為 : TIMESTAMP(3),搞定。

好像沒有其他坑了,直接可以執行,數據也輸出到myql 中了

 

 

 之后,從sql 的 connector 開始,先看了下 kafak的,Flink 1.10 SQL 中,kafka 只支持 csv、json 和 avro 三種類型。(試了下  json 和 csv)

兩個sql程序,包含讀寫 json、csn。

直接將上面的table sink 的sql 修改成寫kafak:

--sinkTable
CREATE TABLE user_log_sink (
    dt VARCHAR,
    pv BIGINT,
    uv BIGINT
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior_sink',
    'connector.properties.zookeeper.connect' = 'venn:2181',
    'connector.properties.bootstrap.servers' = 'venn:9092',
    'update-mode' = 'append', 'format.type' = 'json'
);

然而,並不能執行。

報了如下的錯:

AppendStreamTableSink requires that Table has only insert changes.

WTF,上面的 'update-mode' 明明寫的是  'append

然后,我就開始了一段,並沒有什么鳥用的操作:看官網文檔、修改sql 的配置。。

---------------此處花了不少時間-----------------

直到最后,突發奇想,直接將source 的內容輸出呢,不做任何轉換:

--insert
INSERT INTO user_log_sink(dt, pv, uv)
SELECT user_id, item_id, category_id, behavior, ts
FROM user_log;

2020-03-18 改: 突然發現 表后面加了 字段,是忘了刪的,但是也沒報錯,5個字段都寫到kafka了”

sink 部分也隨之修改:

--sinkTable
CREATE TABLE user_log_sink (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3)
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior_sink_1',
    'connector.properties.zookeeper.connect' = 'venn:2181',
    'connector.properties.bootstrap.servers' = 'venn:9092',
    'update-mode' = 'append',
    'format.type' = 'json'
);

就好了,好了,了。。

哎,等官網文檔,看完了,應該就知道是為什么了(注:等知道后,來加上)

然后就開始了最后一個坑。

在寫csv 的時候,遇到了最后一個坑,之前的版本里,“flink-shaded-jackso”我一直用的 “2.7.9-3.0”,但是里面並沒有 CsvSchame,所以又有了這個報錯:

Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema$Builder

將 flink-shaded-jackso 的版本換成了flink 代碼里的版本 “2.9.8-7.0” 

基本上,就順暢的完成了kafka connector 讀寫 json 和 csv 了。

最后貼上完整的SQL:

--sourceTable
CREATE TABLE user_log(
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3)
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior',
    'connector.properties.zookeeper.connect' = 'venn:2181',
    'connector.properties.bootstrap.servers' = 'venn:9092',
    'connector.startup-mode' = 'earliest-offset',
    'format.type' = 'json'
#    'format.type' = 'csv'
);

--sinkTable
CREATE TABLE user_log_sink (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3)
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior_sink',
    'connector.properties.zookeeper.connect' = 'venn:2181',
    'connector.properties.bootstrap.servers' = 'venn:9092',
    'update-mode' = 'append',
#    'format.type' = 'json'
     'format.type' = 'csv'
);

--insert
INSERT INTO user_log_sink(dt, pv, uv)
SELECT user_id, item_id, category_id, behavior, ts
FROM user_log;

相關的SQL文件上傳到github 上了:  flink-rookic  ,pom.xml 內的依賴也有更新。

 

好久沒寫了,最近會簡單嘗試一遍SQL 的 kafak/mysql/hbase/es/file/hdfs 等 connector,然后再嘗試 SQL 的其他內容

 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM