Flink MySQL cdc分別sink到ES、Kafka、Hudi並通過spark-sql加載Hudi表


hadoop、spark、flink、kafka、zookeeper安裝參照本博客部署安裝

組件版本選擇

hadoop2.7.5
spark-2.4.8-bin-hadoop2.7
flink-1.13.1-bin-scala_2.11
kafka_2.13-2.6.2
zookeeper-3.6.3

maven安裝(版本>=3.3.1)

官網下載maven:http://maven.apache.org/download.cgi,這里下載apache-maven-3.8.4-bin.tar.gz
# cd /opt
解壓
# tar -zxvf apache-maven-3.8.4-bin.tar.gz

配置環境變量
# vim /etc/profile
添加
export MAVEN_HOME=/opt/apache-maven-3.8.3
export PATH=$MAVEN_HOME/bin

檢查版本
# mvn -v

Hudi安裝

# cd /home/hadoop/app
# git clone https://github.com/apache/hudi
# cd hudi
修改各組件版本
# vim pom.xml
<flink.version>1.13.1</flink.version>
<hadoop.version>2.7.5</hadoop.version>
<spark2.version>2.4.8</spark2.version>

編譯
# mvn clean package -DskipTests

image

編譯成功

進入hudi
# cd /home/hadoop/app/hudi/hudi-cli/
# ./hudi-cli.sh

image

# cd /home/hadoop/app
# git clone https://github.com/ververica/flink-cdc-connectors
# cd flink-cdc-connectors

修改配置文件pom.xml
# vim pom.xml
<flink.version>1.13.1</flink.version>

編譯安裝
# mvn clean install -DskipTests

image

flink集群添加cdc jar

將jar包放入flink集群

mysql
# cd /home/hadoop/app/flink-cdc-connectors/flink-sql-connector-mysql-cdc/target
# cp flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar /home/hadoop/app/flink/lib/

pg
# cd /home/hadoop/app/flink-cdc-connectors/flink-sql-connector-postgres-cdc/target
# cp flink-sql-connector-postgres-cdc-2.2-SNAPSHOT.jar /home/hadoop/app/flink/lib/

kafka
# cd /home/hadoop/app/flink-cdc-connectors/flink-format-changelog-json/target
# cp flink-format-changelog-json-2.2-SNAPSHOT.jar /home/hadoop/app/flink/lib/

重啟flink集群(master節點)
# cd /home/hadoop/app/flink/bin/
# ./stop-cluster.sh
# ./start-cluster.sh
mysql sink
Flink SQL>SET execution.checkpointing.interval = 3s;
Flink SQL>CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 name STRING,
 description STRING,
 weight DECIMAL(10,3)
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'xxx',
 'password' = 'xxxx',
 'database-name' = 'just_test',
 'table-name' = 'inventory'
);

image

Flink SQL>select id,name,description,weight from mysql_binlog;
報錯:[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
解決:mysql端ID為主鍵,flink client端建表未指定主鍵,需要添加主鍵
CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 name STRING,
 description STRING,
 weight DECIMAL(10,3),
 primary key(id) not enforced
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'xxx',
 'password' = 'xxxx',
 'database-name' = 'just_test',
 'table-name' = 'inventory'
);

Flink SQL>select id,name,description,weight from mysql_binlog;
報錯:Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/ObjectMapper
解決:copy的jar包路徑錯誤,不是/home/hadoop/app/flink-cdc-connectors/flink-connector-mysql-cdc/target而應該是/home/hadoop/app/flink-cdc-connectors/flink-sql-connector-mysql-cdc/target,要刪掉flink/lib下的jar包重新copy

Flink SQL>select id,name,description,weight from mysql_binlog;
報錯:[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph
解決:flink任務只能在active NameNode上提交

Flink SQL>select id,name,description,weight from mysql_binlog;
報錯:[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
解決:flink各個節點都要把mysql-cdc jar添加進去


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM