kafka监控oracle归档日志并进行后续flink处理(单节点模式)


一.软件包准备

1.操作系统centos7
2.jdk-8u301-linux-x64.tar.gz (安装请自行百度,简单的丫皮~)
3.apache-zookeeper-3.6.3-bin.tar.gz (单节点的安装没啥说的-->解压-->修改zoo.cfg-->运行)
4.kafka_2.11-0.11.0.2.tgz (单节点的安装没啥说的-->解压-->修改server.properties-->运行)(但是过程中会有添加jar的操作)
5.kafka-connect-oracle-master.zip (下载地址:https://codeload.github.com/erdemcer/kafka-connect-oracle/zip/refs/heads/master)(详细讲解)
6.apache-maven-3.6.3-bin.tar.gz (类比于jdk安装,解压,配置path,完事)
7.flink-1.10.1-bin-scala_2.11.tgz
8.oracel11g的归档日志模型请自行开启 (不懂的话尝试询问dba吧,我寻思这不是重点)

二. 安装组件

1.此处默认了大佬您的归档模式已经开启,jdk已经完成安装,zk已经安装,kafka已经安装,maven已经安装,那么接下来我们继续嗨~
2.关于kafka-connect-oracle-master.zip的处理,可以理解为这里是在搞一个kafka监控oracle日志的玩意儿.

步骤1:上传服务器目录,此处本人的为:/opt/module,随你便记住便好.
步骤2:执行 unzip kafka-connect-oracle-master.zip
步骤3:进入 kafka-connect-oracle-master/config/ 并且 vi OracleSourceConnector.properties ,修改内容如下:
name=oracle-logminer-connector
connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector
db.name.alias= #oracle实例名称:select instance_name from v$instance
tasks.max=1
topic=cdctest
db.name= #oracle服务器:select name from v|database;|替换成美元符
db.hostname= #oracle服务器地址
db.port=1521
db.user= #数据库用户名
db.user.password= ***
db.fetch.size=1
table.whitelist=xx.tablename
table.blacklist= #必须要有这一行 否则报错 值可以为空
parse.dml.data=true
reset.offset=true
start.scn=
multitenant=false
步骤4:进入 kafka-connect-oracle-master 执行:mvn clean package.小bug之前使用maven3.8出现错误,换回3.6可以成功,编译成功会有提示,并生成target文件夹
步骤5.复制target中以及kafka-connect-oracle-master/config/的配置文件到kafka目录(!请根据自己的文件路径来!)
cp /kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.71.jar /kafka/libs/
cp /kafka-connect-oracle-master/lib/ojdbc7.jar /kafka/libs/
cp /kafka-connect-oracle-master/config/OracleSourceConnector.properties /kafka/config/

三. 运行测试

1.启动zookeeper,bin目录下执行
./zookeeper-server-start.sh ../config/zookeeper.properties
2.启动kafka服务,bin目录下执行
./kafka-server-start.sh ../config/server.properties
建立topic-cdctest,即配置文件中自己命名的topic
./kafka-topics.sh --create --zookeeper 你的ip:2181 --replication-factor 1 --partitions 1 --topic cdctest
启动kafka监听oracle的服务,bin目录执行
./connect-standalone.sh ../config/connect-standalone.properties ../config/OracleSourceConnector.properties
启动消费端测试查看
./kafka-console-consumer.sh --bootstrap-server 你的ip:9092 --from-beginning --topic cdctest
本人方法:开启消费者后,直接在数据库管理工具中,向自己所监听的表中insert一条语句,看消费者是否能现实,如下:
[root@flink-kafka bin]# ./kafka-console-consumer.sh --bootstrap-server xxxxxxxx:9092 --from-beginning --topic cdctest
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"SCN"},{"type":"string","optional":false,"field":"SEG_OWNER"},{"type":"string","optional":false,"field":"TABLE_NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"TIMESTAMP"},{"type":"string","optional":false,"field":"SQL_REDO"},{"type":"string","optional":false,"field":"OPERATION"},{"type":"struct","fields":[{"type":"double","optional":true,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"string","optional":true,"field":"SEX"},{"type":"double","optional":true,"field":"AGE"}],"optional":true,"name":"value","field":"data"},{"type":"struct","fields":[{"type":"double","optional":true,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"string","optional":true,"field":"SEX"},{"type":"double","optional":true,"field":"AGE"}],"optional":true,"name":"value","field":"before"}],"optional":false,"name":"helowin.zytk35.flink_kafka_0824.row"},"payload":{"SCN":3880591880,"SEG_OWNER":"ZYTK35","TABLE_NAME":"FLINK_KAFKA_0824","TIMESTAMP":1629772528000,"SQL_REDO":"insert into "ZYTK35"."FLINK_KAFKA_0824"("ID","NAME","SEX","AGE") values (3,'ldza','man',95)","OPERATION":"INSERT","data":{"ID":3.0,"NAME":"ldza","SEX":"man","AGE":95.0},"before":null}}

四. 结束,后续进行flink分析,研究透了再分享.


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM