【流数据处理】Oracle日志 推送到Kafka 并进行实时计算


实验环境

本次测试环境都是基于单机环境:

  • Oracle:docker oracle12c
  • 日志模式:归档日志模式 archivelog
  • 用户:scott/tiger 具有dba权限
  • 大数据组件:kafka(默认创建好topic:flink_topic),zookeeper,flink1.12
  • Kafka-Oracle连接器:kafka-connect-oracle-1.0.jar
    下载地址

1. Oralce环境准备

  • 如归档日志没开启,则需要提前开启
# sqlplus / as sysdba -- dba登录
SQL> shutdown immediate --关闭数据库
SQL> startup mount          --数据库启动到mount状态
SQL> alter database archivelog; --打开归档日志
SQL> alter database open;       --打开数据库
SQL> alter database add supplemental log data (all) columns;    --开启行级日志模式
SQL> conn username/password                                           --连接数据库
  • 创建测试表
--创建表
CREATE TABLE YAOWENTAO_20210104
(
    ID INT
    ,NAME VARCHAR2(10)
    ,SALE_AMOUNT NUMBER(30,8)
);

--插入数据
INSERT INTO YAOWENTAO_20210104 VALUES(1,'one',1.1);
INSERT INTO YAOWENTAO_20210104 VALUES(2,'two',2.2);

2. Kafka相关配置

  • 准备相关Jar包,并放入对应得目录
    • 从https://github.com/erdemcer/kafka-connect-oracle下载整个项目,把整个项目mvn clean package成kafa-connect-oracle-1.0.jar
    • 下载一个oracle的jdbc驱动jar—ojdbc7.jar
    • 将kafa-connect-oracle-1.0.jar and ojdbc7.jar放在kafka的安装包下的lib目录下
    • 将github项目里面的config/OracleSourceConnector.properties文件拷贝到kafak/config

不愿意自己打包的,可直接下载:下载地址-kafka-connect-oracle-1.0.jar

  • 配置Kafka
# vi /opt/cloudera/parcels/KAFKA/lib/kafka/config/OracleSourceConnector.properties
name=oracle-logminer-connector  
connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector
db.name.alias=test
tasks.max=1
topic=cdctest        --topic名称
db.name=orcl        --oracleSID
db.hostname=10.20.60.44  --IP地址
db.port=1521                 
db.user=scott
db.user.password=tiger
db.fetch.size=1
table.whitelist=SCOTT.* --白名单,必须是大写
parse.dml.data=true
reset.offset=true
start.scn=
multitenant=false
table.blacklist=SCOTT.A --黑名单,必须是大写

示例图1

  • Kafka配置文件

示例图2

3. 运行Oracle-Connect

# bin/connect-standalone.sh config/connect-standalone.properties config/OracleSourceConnector.properties

4. 启动Kakfa消费者进行,观察结果

bin/kafka-console-consumer.sh --bootstrap-server 192.168.58.177:9092 --from-beginning --topic flink_topic

示例图3

5. Flink环境准备并进行计算

  • 5.11 下载相应Jar包
根据自身flink版本进行下载
1. 【mysql-connector-java】
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connect.html#jdbc-connector
2. 【flink-connector-jdbc_2.11】
https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.12.0/flink-connector-jdbc_2.11-1.12.0.jar
  • 5.12 放到Flink得lib目录下

示例图8

  • 5.3 在MySql中创建sink需要得表
CREATE TABLE test_sink
(
    NAME  VARCHAR(20),
    SALE_AMOUNT decimal(30,8)
);
  • 5.4 启动FlinkSql客户端
# cd /opt/flink1.12/flink-1.12.1/
# ./bin/sql-client.sh embedded
  • 5.5 在Flink中创建sourse,sink。在MySql中创建对应得表,表名与sink保持一致
-- -- 开启 mini-batch
-- SET table.exec.mini-batch.enabled=true;
-- -- mini-batch的时间间隔,即作业需要额外忍受的延迟
-- SET table.exec.mini-batch.allow-latency=1s;
-- -- 一个 mini-batch 中允许最多缓存的数据
-- SET table.exec.mini-batch.size=1000;
-- -- 开启 local-global 优化
-- SET table.optimizer.agg-phase-strategy=TWO_PHASE;
--
-- -- 开启 distinct agg 切分
-- SET table.optimizer.distinct-agg.split.enabled=true;

-- source
drop table user_log1;
CREATE TABLE user_log1 (
    payload ROW(SCN string,SEG_OWNER string,TABLE_NAME string,data ROW(ID string,NAME string,SALE_AMOUNT string))
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',      --version
    'connector.topic' = 'cdctest',          --Kafka topic name
    'connector.startup-mode' = 'earliest-offset',       --optional: valid modes are "earliest-offset","latest-offset", "group-offsets",or "specific-offsets"
    'connector.properties.group.id' = 'test',
    'connector.properties.zookeeper.connect' = '192.168.58.171:2181,192.168.58.177:2181,192.168.58.178:2181',
    'connector.properties.bootstrap.servers' = '192.168.58.177:9092',
    'format.type' = 'json',
    'format.json-schema' =      --json format
    '{
        "type": "object",
        "properties": 
        {
           "payload":
           {type: "object",
                   "properties" : 
                   {
						"SCN" 		 : {type:"string"},
						"SEG_OWNER"  : {type:"string"},
						"TABLE_NAME" : {type:"string"},
						"data": 
						{type : "object", 
                               "properties": 
                               {
                               	"ID"          : {type : "string"},
                                "NAME"        : {type : "string"},
                                "SALE_AMOUNT" : {type : "string"}
                               }
                   		}
           		   }
           }
        }
    }'
);

-- sink
CREATE TABLE test_sink 
(
    --dt	  VARCHAR,
    NAME  VARCHAR,
    SALE_AMOUNT decimal(30,8)
) WITH (
    'connector.type' = 'jdbc', -- 使用 jdbc connector
    'connector.url' = 'jdbc:mysql://10.20.60.44:3306/shuixing-uat', -- jdbc url
    'connector.table' = 'test_sink', -- 表名
    'connector.username' = 'root', -- 用户名
    'connector.password' = 'Sx202101', -- 密码
    'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条
);

-- insert
insert into test_sink
select payload.data.NAME,sum(cast(payload.data.SALE_AMOUNT as decimal(30,8)))
from user_log1
group by payload.data.NAME;

示例图4
示例图5
示例图6

  • 5.6 在前台界面查看结果,地址http://ip:8081
    示例图4
    示例图9


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM