實驗環境
本次測試環境都是基於單機環境:
- Oracle:docker oracle12c
- 日志模式:歸檔日志模式 archivelog
- 用戶:scott/tiger 具有dba權限
- 大數據組件:kafka(默認創建好topic:flink_topic),zookeeper,flink1.12
- Kafka-Oracle連接器:kafka-connect-oracle-1.0.jar
下載地址
1. Oralce環境准備
- 如歸檔日志沒開啟,則需要提前開啟
# sqlplus / as sysdba -- dba登錄
SQL> shutdown immediate --關閉數據庫
SQL> startup mount --數據庫啟動到mount狀態
SQL> alter database archivelog; --打開歸檔日志
SQL> alter database open; --打開數據庫
SQL> alter database add supplemental log data (all) columns; --開啟行級日志模式
SQL> conn username/password --連接數據庫
- 創建測試表
--創建表
CREATE TABLE YAOWENTAO_20210104
(
ID INT
,NAME VARCHAR2(10)
,SALE_AMOUNT NUMBER(30,8)
);
--插入數據
INSERT INTO YAOWENTAO_20210104 VALUES(1,'one',1.1);
INSERT INTO YAOWENTAO_20210104 VALUES(2,'two',2.2);
2. Kafka相關配置
- 准備相關Jar包,並放入對應得目錄
- 從https://github.com/erdemcer/kafka-connect-oracle下載整個項目,把整個項目mvn clean package成kafa-connect-oracle-1.0.jar
- 下載一個oracle的jdbc驅動jar—ojdbc7.jar
- 將kafa-connect-oracle-1.0.jar and ojdbc7.jar放在kafka的安裝包下的lib目錄下
- 將github項目里面的config/OracleSourceConnector.properties文件拷貝到kafak/config
不願意自己打包的,可直接下載:下載地址-kafka-connect-oracle-1.0.jar
- 配置Kafka
# vi /opt/cloudera/parcels/KAFKA/lib/kafka/config/OracleSourceConnector.properties
name=oracle-logminer-connector
connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector
db.name.alias=test
tasks.max=1
topic=cdctest --topic名稱
db.name=orcl --oracleSID
db.hostname=10.20.60.44 --IP地址
db.port=1521
db.user=scott
db.user.password=tiger
db.fetch.size=1
table.whitelist=SCOTT.* --白名單,必須是大寫
parse.dml.data=true
reset.offset=true
start.scn=
multitenant=false
table.blacklist=SCOTT.A --黑名單,必須是大寫
- Kafka配置文件
3. 運行Oracle-Connect
# bin/connect-standalone.sh config/connect-standalone.properties config/OracleSourceConnector.properties
4. 啟動Kakfa消費者進行,觀察結果
bin/kafka-console-consumer.sh --bootstrap-server 192.168.58.177:9092 --from-beginning --topic flink_topic
5. Flink環境准備並進行計算
- 5.11 下載相應Jar包
根據自身flink版本進行下載
1. 【mysql-connector-java】
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connect.html#jdbc-connector
2. 【flink-connector-jdbc_2.11】
https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.12.0/flink-connector-jdbc_2.11-1.12.0.jar
- 5.12 放到Flink得lib目錄下
- 5.3 在MySql中創建sink需要得表
CREATE TABLE test_sink
(
NAME VARCHAR(20),
SALE_AMOUNT decimal(30,8)
);
- 5.4 啟動FlinkSql客戶端
# cd /opt/flink1.12/flink-1.12.1/
# ./bin/sql-client.sh embedded
- 5.5 在Flink中創建sourse,sink。在MySql中創建對應得表,表名與sink保持一致
-- -- 開啟 mini-batch
-- SET table.exec.mini-batch.enabled=true;
-- -- mini-batch的時間間隔,即作業需要額外忍受的延遲
-- SET table.exec.mini-batch.allow-latency=1s;
-- -- 一個 mini-batch 中允許最多緩存的數據
-- SET table.exec.mini-batch.size=1000;
-- -- 開啟 local-global 優化
-- SET table.optimizer.agg-phase-strategy=TWO_PHASE;
--
-- -- 開啟 distinct agg 切分
-- SET table.optimizer.distinct-agg.split.enabled=true;
-- source
drop table user_log1;
CREATE TABLE user_log1 (
payload ROW(SCN string,SEG_OWNER string,TABLE_NAME string,data ROW(ID string,NAME string,SALE_AMOUNT string))
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal', --version
'connector.topic' = 'cdctest', --Kafka topic name
'connector.startup-mode' = 'earliest-offset', --optional: valid modes are "earliest-offset","latest-offset", "group-offsets",or "specific-offsets"
'connector.properties.group.id' = 'test',
'connector.properties.zookeeper.connect' = '192.168.58.171:2181,192.168.58.177:2181,192.168.58.178:2181',
'connector.properties.bootstrap.servers' = '192.168.58.177:9092',
'format.type' = 'json',
'format.json-schema' = --json format
'{
"type": "object",
"properties":
{
"payload":
{type: "object",
"properties" :
{
"SCN" : {type:"string"},
"SEG_OWNER" : {type:"string"},
"TABLE_NAME" : {type:"string"},
"data":
{type : "object",
"properties":
{
"ID" : {type : "string"},
"NAME" : {type : "string"},
"SALE_AMOUNT" : {type : "string"}
}
}
}
}
}
}'
);
-- sink
CREATE TABLE test_sink
(
--dt VARCHAR,
NAME VARCHAR,
SALE_AMOUNT decimal(30,8)
) WITH (
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.url' = 'jdbc:mysql://10.20.60.44:3306/shuixing-uat', -- jdbc url
'connector.table' = 'test_sink', -- 表名
'connector.username' = 'root', -- 用戶名
'connector.password' = 'Sx202101', -- 密碼
'connector.write.flush.max-rows' = '1' -- 默認5000條,為了演示改為1條
);
-- insert
insert into test_sink
select payload.data.NAME,sum(cast(payload.data.SALE_AMOUNT as decimal(30,8)))
from user_log1
group by payload.data.NAME;
- 5.6 在前台界面查看結果,地址http://ip:8081