一.軟件包准備
1.操作系統centos7
2.jdk-8u301-linux-x64.tar.gz (安裝請自行百度,簡單的丫皮~)
3.apache-zookeeper-3.6.3-bin.tar.gz (單節點的安裝沒啥說的-->解壓-->修改zoo.cfg-->運行)
4.kafka_2.11-0.11.0.2.tgz (單節點的安裝沒啥說的-->解壓-->修改server.properties-->運行)(但是過程中會有添加jar的操作)
5.kafka-connect-oracle-master.zip (下載地址:https://codeload.github.com/erdemcer/kafka-connect-oracle/zip/refs/heads/master)(詳細講解)
6.apache-maven-3.6.3-bin.tar.gz (類比於jdk安裝,解壓,配置path,完事)
7.flink-1.10.1-bin-scala_2.11.tgz
8.oracel11g的歸檔日志模型請自行開啟 (不懂的話嘗試詢問dba吧,我尋思這不是重點)
二. 安裝組件
1.此處默認了大佬您的歸檔模式已經開啟,jdk已經完成安裝,zk已經安裝,kafka已經安裝,maven已經安裝,那么接下來我們繼續嗨~
2.關於kafka-connect-oracle-master.zip的處理,可以理解為這里是在搞一個kafka監控oracle日志的玩意兒.
步驟1:上傳服務器目錄,此處本人的為:/opt/module,隨你便記住便好.
步驟2:執行 unzip kafka-connect-oracle-master.zip
步驟3:進入 kafka-connect-oracle-master/config/ 並且 vi OracleSourceConnector.properties ,修改內容如下:
name=oracle-logminer-connector
connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector
db.name.alias= #oracle實例名稱:select instance_name from v$instance
tasks.max=1
topic=cdctest
db.name= #oracle服務器:select name from v|database;|替換成美元符
db.hostname= #oracle服務器地址
db.port=1521
db.user= #數據庫用戶名
db.user.password= ***
db.fetch.size=1
table.whitelist=xx.tablename
table.blacklist= #必須要有這一行 否則報錯 值可以為空
parse.dml.data=true
reset.offset=true
start.scn=
multitenant=false
步驟4:進入 kafka-connect-oracle-master 執行:mvn clean package.小bug之前使用maven3.8出現錯誤,換回3.6可以成功,編譯成功會有提示,並生成target文件夾
步驟5.復制target中以及kafka-connect-oracle-master/config/的配置文件到kafka目錄(!請根據自己的文件路徑來!)
cp /kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.71.jar /kafka/libs/
cp /kafka-connect-oracle-master/lib/ojdbc7.jar /kafka/libs/
cp /kafka-connect-oracle-master/config/OracleSourceConnector.properties /kafka/config/
三. 運行測試
1.啟動zookeeper,bin目錄下執行
./zookeeper-server-start.sh ../config/zookeeper.properties
2.啟動kafka服務,bin目錄下執行
./kafka-server-start.sh ../config/server.properties
建立topic-cdctest,即配置文件中自己命名的topic
./kafka-topics.sh --create --zookeeper 你的ip:2181 --replication-factor 1 --partitions 1 --topic cdctest
啟動kafka監聽oracle的服務,bin目錄執行
./connect-standalone.sh ../config/connect-standalone.properties ../config/OracleSourceConnector.properties
啟動消費端測試查看
./kafka-console-consumer.sh --bootstrap-server 你的ip:9092 --from-beginning --topic cdctest
本人方法:開啟消費者后,直接在數據庫管理工具中,向自己所監聽的表中insert一條語句,看消費者是否能現實,如下:
[root@flink-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server xxxxxxxx:9092 --from-beginning --topic cdctest
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"SCN"},{"type":"string","optional":false,"field":"SEG_OWNER"},{"type":"string","optional":false,"field":"TABLE_NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"TIMESTAMP"},{"type":"string","optional":false,"field":"SQL_REDO"},{"type":"string","optional":false,"field":"OPERATION"},{"type":"struct","fields":[{"type":"double","optional":true,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"string","optional":true,"field":"SEX"},{"type":"double","optional":true,"field":"AGE"}],"optional":true,"name":"value","field":"data"},{"type":"struct","fields":[{"type":"double","optional":true,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"string","optional":true,"field":"SEX"},{"type":"double","optional":true,"field":"AGE"}],"optional":true,"name":"value","field":"before"}],"optional":false,"name":"helowin.zytk35.flink_kafka_0824.row"},"payload":{"SCN":3880591880,"SEG_OWNER":"ZYTK35","TABLE_NAME":"FLINK_KAFKA_0824","TIMESTAMP":1629772528000,"SQL_REDO":"insert into "ZYTK35"."FLINK_KAFKA_0824"("ID","NAME","SEX","AGE") values (3,'ldza','man',95)","OPERATION":"INSERT","data":{"ID":3.0,"NAME":"ldza","SEX":"man","AGE":95.0},"before":null}}
