【流數據處理】Oracle日志 推送到Kafka 並進行實時計算


實驗環境

本次測試環境都是基於單機環境:

  • Oracle:docker oracle12c
  • 日志模式:歸檔日志模式 archivelog
  • 用戶:scott/tiger 具有dba權限
  • 大數據組件:kafka(默認創建好topic:flink_topic),zookeeper,flink1.12
  • Kafka-Oracle連接器:kafka-connect-oracle-1.0.jar
    下載地址

1. Oralce環境准備

  • 如歸檔日志沒開啟,則需要提前開啟
# sqlplus / as sysdba -- dba登錄
SQL> shutdown immediate --關閉數據庫
SQL> startup mount          --數據庫啟動到mount狀態
SQL> alter database archivelog; --打開歸檔日志
SQL> alter database open;       --打開數據庫
SQL> alter database add supplemental log data (all) columns;    --開啟行級日志模式
SQL> conn username/password                                           --連接數據庫
  • 創建測試表
--創建表
CREATE TABLE YAOWENTAO_20210104
(
    ID INT
    ,NAME VARCHAR2(10)
    ,SALE_AMOUNT NUMBER(30,8)
);

--插入數據
INSERT INTO YAOWENTAO_20210104 VALUES(1,'one',1.1);
INSERT INTO YAOWENTAO_20210104 VALUES(2,'two',2.2);

2. Kafka相關配置

  • 准備相關Jar包,並放入對應得目錄
    • 從https://github.com/erdemcer/kafka-connect-oracle下載整個項目,把整個項目mvn clean package成kafa-connect-oracle-1.0.jar
    • 下載一個oracle的jdbc驅動jar—ojdbc7.jar
    • 將kafa-connect-oracle-1.0.jar and ojdbc7.jar放在kafka的安裝包下的lib目錄下
    • 將github項目里面的config/OracleSourceConnector.properties文件拷貝到kafak/config

不願意自己打包的,可直接下載:下載地址-kafka-connect-oracle-1.0.jar

  • 配置Kafka
# vi /opt/cloudera/parcels/KAFKA/lib/kafka/config/OracleSourceConnector.properties
name=oracle-logminer-connector  
connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector
db.name.alias=test
tasks.max=1
topic=cdctest        --topic名稱
db.name=orcl        --oracleSID
db.hostname=10.20.60.44  --IP地址
db.port=1521                 
db.user=scott
db.user.password=tiger
db.fetch.size=1
table.whitelist=SCOTT.* --白名單,必須是大寫
parse.dml.data=true
reset.offset=true
start.scn=
multitenant=false
table.blacklist=SCOTT.A --黑名單,必須是大寫

示例圖1

  • Kafka配置文件

示例圖2

3. 運行Oracle-Connect

# bin/connect-standalone.sh config/connect-standalone.properties config/OracleSourceConnector.properties

4. 啟動Kakfa消費者進行,觀察結果

bin/kafka-console-consumer.sh --bootstrap-server 192.168.58.177:9092 --from-beginning --topic flink_topic

示例圖3

5. Flink環境准備並進行計算

  • 5.11 下載相應Jar包
根據自身flink版本進行下載
1. 【mysql-connector-java】
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connect.html#jdbc-connector
2. 【flink-connector-jdbc_2.11】
https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.12.0/flink-connector-jdbc_2.11-1.12.0.jar
  • 5.12 放到Flink得lib目錄下

示例圖8

  • 5.3 在MySql中創建sink需要得表
CREATE TABLE test_sink
(
    NAME  VARCHAR(20),
    SALE_AMOUNT decimal(30,8)
);
  • 5.4 啟動FlinkSql客戶端
# cd /opt/flink1.12/flink-1.12.1/
# ./bin/sql-client.sh embedded
  • 5.5 在Flink中創建sourse,sink。在MySql中創建對應得表,表名與sink保持一致
-- -- 開啟 mini-batch
-- SET table.exec.mini-batch.enabled=true;
-- -- mini-batch的時間間隔,即作業需要額外忍受的延遲
-- SET table.exec.mini-batch.allow-latency=1s;
-- -- 一個 mini-batch 中允許最多緩存的數據
-- SET table.exec.mini-batch.size=1000;
-- -- 開啟 local-global 優化
-- SET table.optimizer.agg-phase-strategy=TWO_PHASE;
--
-- -- 開啟 distinct agg 切分
-- SET table.optimizer.distinct-agg.split.enabled=true;

-- source
drop table user_log1;
CREATE TABLE user_log1 (
    payload ROW(SCN string,SEG_OWNER string,TABLE_NAME string,data ROW(ID string,NAME string,SALE_AMOUNT string))
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',      --version
    'connector.topic' = 'cdctest',          --Kafka topic name
    'connector.startup-mode' = 'earliest-offset',       --optional: valid modes are "earliest-offset","latest-offset", "group-offsets",or "specific-offsets"
    'connector.properties.group.id' = 'test',
    'connector.properties.zookeeper.connect' = '192.168.58.171:2181,192.168.58.177:2181,192.168.58.178:2181',
    'connector.properties.bootstrap.servers' = '192.168.58.177:9092',
    'format.type' = 'json',
    'format.json-schema' =      --json format
    '{
        "type": "object",
        "properties": 
        {
           "payload":
           {type: "object",
                   "properties" : 
                   {
						"SCN" 		 : {type:"string"},
						"SEG_OWNER"  : {type:"string"},
						"TABLE_NAME" : {type:"string"},
						"data": 
						{type : "object", 
                               "properties": 
                               {
                               	"ID"          : {type : "string"},
                                "NAME"        : {type : "string"},
                                "SALE_AMOUNT" : {type : "string"}
                               }
                   		}
           		   }
           }
        }
    }'
);

-- sink
CREATE TABLE test_sink 
(
    --dt	  VARCHAR,
    NAME  VARCHAR,
    SALE_AMOUNT decimal(30,8)
) WITH (
    'connector.type' = 'jdbc', -- 使用 jdbc connector
    'connector.url' = 'jdbc:mysql://10.20.60.44:3306/shuixing-uat', -- jdbc url
    'connector.table' = 'test_sink', -- 表名
    'connector.username' = 'root', -- 用戶名
    'connector.password' = 'Sx202101', -- 密碼
    'connector.write.flush.max-rows' = '1' -- 默認5000條,為了演示改為1條
);

-- insert
insert into test_sink
select payload.data.NAME,sum(cast(payload.data.SALE_AMOUNT as decimal(30,8)))
from user_log1
group by payload.data.NAME;

示例圖4
示例圖5
示例圖6

  • 5.6 在前台界面查看結果,地址http://ip:8081
    示例圖4
    示例圖9


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM