OGG 從Oracle備庫同步數據至kafka


 

 

 

OGG 從Oracle備庫同步數據至kafka

1 目的

測試物理standby 作為ogg 源端的可行性,效率及安全性。

2 環境及規划

以下數據庫及OGG版本是實際目的的最低版本要求。

  • 環境

    服務器ip 作用
    10.10.100.91 Oracle_primary (zookeeper kafka)
    10.10.100.92 Oracle_standby ogg (zookeeper kafka)
    10.10.100.98 zookeeper kafka ogg
  • 版本及路徑

    軟件 版本 路徑
    oracle 11.2.0.4 /u01/app/oracle
    zookeeper 3.4.13 /opt/zookeeper-3.4.13
    kafka 2.12-2.1.1 /opt/kafka_2.12-2.1.1
    ogg for bigdata 12.3.2.1.1 /u01/app/ogg
    ogg for oracle 12.3.0.1.4 /u01/app/ogg
    jdk 1.8u181 /opt/jdk1.8.0_181

3 安裝配置JDK

在所有節點安裝。由於OGG 12以上的版本都要求jdk1.8以上。因此需要單獨安裝jdk。

3.1 安裝jdk

tar -xzvf /opt/jdk-8u181-linux-x64.tar.gz -C /opt/jdk1.8.0_181

3.2 配置環境變量

根據規則,現在我們將每個環境中的環境變量進行配置。 按如下說明修改 $HOME/.bash_profile:

#添加一行
export JAVA_HOME=/opt/jdk1.8.0_181
# 修改PATH變量
PATH=$JAVA_HOME/bin:$PATH

4 安裝Dataguard

 

4.1 安裝備庫軟件

  • 上傳軟件並解壓 上傳文件請使用ftp或者類ftp方式等。解壓安裝包示例:

    unzip p13390677_112040_Linux-x86-64_1of7.zip
    unzip p13390677_112040_Linux-x86-64_2of7.zip
    
  • 編輯響應文件 由於參數過多,此處只列出需要調整的內容. :

    oracle.install.option=INSTALL_DB_SWONLY
    ORACLE_HOSTNAME=pmo2
    UNIX_GROUP_NAME=oinstall
    INVENTORY_LOCATION=/u01/app/oraInventory
    ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1
    ORACLE_BASE=/u01/app/oracle/
    oracle.install.db.DBA_GROUP=dba
    oracle.install.db.OPER_GROUP=oper
    oracle.install.db.config.starterdb.globalDBName=orcl
    oracle.install.db.config.starterdb.SID=orcl
    oracle.install.db.config.starterdb.characterSet=AL32UTF8
    oracle.install.db.config.starterdb.memoryOption=true
    oracle.install.db.config.starterdb.memoryLimit=1384
    oracle.install.db.config.starterdb.password.ALL=Sys123passwd
    oracle.install.db.config.starterdb.storageType=FILE_SYSTEM_STORAGE
    oracle.install.db.config.starterdb.fileSystemStorage.dataLocation=/u01/app/oracle/oradata
    oracle.install.db.config.starterdb.fileSystemStorage.recoveryLocation=/u01/app/oracle/arch
    
  • 安裝軟件

    cd ~/database/
    ./runInstaller -silent -force -ignoreSysprereqs -ignorePrereq -showProgress -responseFile /home/oracle/database/response/db_install.rsp
    

4.2 配置dataguard

 

4.2.1 主庫

 
  1. 主庫開啟強制日志
    alter database force logging;
    
  2. 主庫調整參數
    alter system set db_unique_name='orcl' scope=spfile;
    alter system set log_archive_config='DG_CONFIG=(primary,standby)' scope=both;
    alter system set log_archive_dest_1='LOCATION=/u01/app/oracle/arch VALID_FOR=(ALL_LOGFILES,ALL_ROLES) DB_UNIQUE_NAME=orcl' scope=both;
    alter system set log_archive_dest_2='SERVICE=standby LGWR ASYNC VALID_FOR=(ONLINE_LOGFILES,PRIMARY_ROLE) DB_UNIQUE_NAME=orcl' scope=both;
    alter system set log_archive_format='%t_%s_%r.arc' scope=spfile;
    

    重啟主庫, 使參數生效。

  3. 主庫添加standby 日志

    日志添加規則為原有日志組個數為N, 添加的standby 日志組個數為N+1. 默認數據庫的日志為3個,大小為50M.

    alter database add standby logfile group 4('/u01/app/oracle/oradata/redo04.log') size 50m;
    alter database add standby logfile group 5('/u01/app/oracle/oradata/redo05.log') size 50m;
    alter database add standby logfile group 6('/u01/app/oracle/oradata/redo06.log') size 50m;
    alter database add standby logfile group 7('/u01/app/oracle/oradata/redo07.log') size 50m;
    
  4. 主庫創建密碼文件
    orapwd file=$ORACLE_HOME/dbs/orapworcl entries=5 force=y ignorecase=y password=oracle
    

    創建完密碼文件后,記得在主庫修改下sys用戶密碼。此時的密碼,以簡單便於使用為主。后期再調整為符合密碼規則的密碼。

    sqlplus / as sysdba
    alter user sys identified by oracle;
    exit
    
  5. TNS配置

    TNS 文件: $ORACLE_HOME/network/admin/tnsnames.ora
    文件內容如下:

    primary =
      (DESCRIPTION =
        (ADDRESS = (PROTOCOL = TCP)(HOST = 10.10.100.91)(PORT = 1521))
        (CONNECT_DATA =
          (SERVER = DEDICATED)
          (SERVICE_NAME = orcl)
        )
      )
    
    standby =
      (DESCRIPTION =
        (ADDRESS = (PROTOCOL = TCP)(HOST = 10.10.100.92)(PORT = 1521))
        (CONNECT_DATA =
          (SERVER = DEDICATED)
          (SERVICE_NAME = orcl)
          (UA=R)
        )
      )
    

4.2.2 備庫

 
  1. 參數

    備庫參數文件為$ORACLE_HOME/dbs/initorcl.ora. 內容如下:

    *.db_name='orcl'
    *.db_unique_name='orcls'
    *.compatible='11.2.0.4.0'
    
    *.fal_server='primary'
    *.log_archive_config='DG_CONFIG=(orcls,orcl)'
    *.log_archive_dest_1='LOCATION=/u01/app/oracle/oradata/arch VALID_FOR=(ALL_LOGFILES,ALL_ROLES) DB_UNIQUE_NAME=orcls'
    *.log_archive_dest_state_1='ENABLE'
    *.db_file_name_convert='/u01/app/oracle/oradata/orcl/','/u01/app/oracle/oradata/'
    *.log_file_name_convert='/u01/app/oracle/oradata/orcl/','/u01/app/oracle/oradata/redo'
    *.standby_file_management='AUTO'
    
  2. 啟動實例

    請注意,備庫此時需要啟動至nomount狀態。

    sqlplus / as sysdba
    startup nomount;
    exit
    
  3. 監聽配置並啟動
    • 監聽配置

    監聽文件: $ORACLE_HOME/network/admin/listener.ora

    sid_list_listener =
    (sid_list =
      (sid_desc =
       (global_dbname=orcl )
       (sid_name=orcl )
       (oracle_home=/u01/app/oracle/product/11.2.0/dbhome_1)
      )
     )
    
    LISTENER =
      (DESCRIPTION_LIST =
        (DESCRIPTION =
          (ADDRESS = (PROTOCOL = TCP)(HOST = 10.10.100.92)(PORT = 1521))
        )
      )
    
    
    ADR_BASE_LISTENER = /u01/app/oracle
    
    • 啟動監聽

      lsnrctl start
      
  4. 密碼文件

    將主庫的密碼文件拷貝至備庫的$ORACLE_HOME/dbs路徑中。

    scp 10.10.100.91:$ORACLE_HOME/dbs/orapworcl $ORACLE_HOME/dbs/
    
  5. TNS配置

    將主庫的密碼文件拷貝至備庫的$ORACLE_HOME/network/admin路徑中。 使用如下命令,或者通過復制粘貼待方式將文件內容復制到備庫環境。

    scp 10.10.100.91:$ORACLE_HOME/network/admin/tnsnames.ora $ORACLE_HOME/network/admin/
    

4.3 完成操作

在主庫或者備庫均可操作。

  • 連接 rman 環境

    rman target sys/oracle@primary auxiliary sys/oracle@setandby
    
  • 執行 rman 命令

    run
    {
    allocate channel cl1 type disk;
    allocate channel cl2 type disk;
    allocate channel cl3 type disk;
    allocate auxiliary channel c1 type disk;
    allocate auxiliary channel c2 type disk;
    allocate auxiliary channel c3 type disk;
    duplicate target database for standby from active database dorecover ;
    release channel c1;
    release channel c2;
    release channel c3;
    release channel cl1;
    release channel cl2;
    release channel cl3;
    }
    

4.4 啟動實時復制

alter database recover managed standby database using current logfile disconnect from session;

5 zookeeper集群

 

5.1 上傳並解壓

可使用ftp 等工具或者其他工具上傳。上傳后解壓. 注意,所有節點都需要安裝。所以要上傳至所有服務器並解壓。

tar -xzvf ~/zookeeper-3.4.13.tar.gz -C /opt/

5.2 配置

zookeeper 集群中所有節點都需要進行配置。

  • 規則相關路徑

    # 日志文件路徑
    mkdir -p /opt/zookeeper/zkdatalog
    # 數據存放路徑
    mkdir -p /opt/zookeeper/zkdata
    
  • 配置內容 進入到目錄中, 查看配置文件.相關的配置文件存儲於<path>/zookeeper-<version>/conf 路徑中。

    # cd /opt/zookeeper-3.4.13/conf
    # ls
    configuration.xsl  log4j.properties  zoo_sample.cfg
    # 說明:
    # zoo_sample.cfg  這個文件是官方給我們的zookeeper的樣板文件,給他復制一份命名為zoo.cfg,zoo.cfg是官方指定的文件命名規則。\\
    

    文件內容如下:

    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/opt/zookeeper/zkdata
    dataLogDir=/opt/zookeeper/zkdatalog
    # 3.4 and later
    autopurge.snapRetainCount=60
    autopurge.purgeInterval=24
    clientPort=12181
    server.1=10.10.100.91:12888:13888
    server.2=10.10.100.92:12888:13888
    server.3=10.10.100.98:12888:13888
    

5.3 創建myid文件

在每個服務器上,將server.N 中的N 寫入dataDir 路徑中的myid文件。

# server1
echo "1" > /opt/zookeeper/zkdata/myid
#server2
echo "2" > /opt/zookeeper/zkdata/myid
#server3
echo "3" > /opt/zookeeper/zkdata/myid

5.4 配置環境變量

為了日后管理方便,建議將zookeeper的位置信息配置到用戶的環境變量中去。此處使用 .bash_profile ,在文件中添加以下內容。

export ZK_HOME=/opt/zookeeper-3.4.13
PATH=$JAVA_HOME:$ZK_HOME/bin:$PATH

通過 source ~/.bash_profile/ 使修改生效.

5.5 啟動和查看服務

#cd $ZK_HOME/bin
zkServer.sh start
zkServer.sh status

經查看,zk 集中,server.2 為leader,server1,server3 為follower。 不同環境這個結果是不一樣的。
如果沒啟動,可以使用./zkServer.sh start-foreground啟動,屏幕上會顯示日志信息,能看出哪塊出了問題。

6 kafka集群

 

6.1 上傳並解壓

可使用ftp 等工具或者其他工具上傳,然后用解壓。所有節點都需要安裝,所以要上傳至所有服務器。

tar -xzvf kafka_2.12-2.1.1.tgz

6.2 配置

kafka集群所有節點都需要配置。

  • 規則相關路徑

    #消息目錄
    mkdir -p /opt/kafka_2.12-2.1.1/logs
    
  • 修改配置文件 kafka 的配置文件有很多,存放在<path>/kafka-version/config中。
    對於本環境來講,路徑為:/opt/kafka_2.12-2.1.1/config。
    需要調整的配置文件為:server.properties

    #  每台服務器的broker.id都不能相同
    broker.id=0
    
    #hostname和port參數,在2.12 2.1.1 版本的server.properties 中已經找不到該參數配置了。
    #取而代之的是 listeners參數. 其中的IP 地址, 設置為本主機的IP。
    #host.name=10.10.100.91
    #port = 9092
    listeners=PLAINTEXT://10.10.100.92:9092
    # 日志路徑
    log.dirs=/opt/kafka_2.12-2.1.1/logs
    #在log.retention.hours=168 下面新增下面三項
    message.max.byte=5242880
    default.replication.factor=2
    replica.fetch.max.bytes=5242880
    
    #設置zookeeper的連接端口,集群中所有節點都需要包含。
    zookeeper.connect=10.10.100.91:12181,10.10.100.92:12181,10.10.100.98:12181
    
    

6.3 配置環境變量

為了方便日后管理,需要配置kafka的環境變量。在$HOME/.bash_profile 文件中加入以下 兩行:

export KFK_HOME=/opt/kafka_2.12-2.1.1/config
export PATCH=$KFK_HOME/bin:$PATH

或者直接修改為以下格式,包含zookeeper與kafka的環境變量:

export ZK_HOME=/opt/zookeeper-3.4.13
export KFK_HOME=/opt/kafka_2.12-2.1.1
export PATH=$JAVA_HOME:$ZK_HOME/bin:$KFK_HOME/bin:$PATH

配置完成后, 使用source 命令使配置生效。

6.4 服務管理

  • 啟動服務

    kafka-server-start.sh -daemon $KFK_HOME/config/server.properties
    
  • 查看服務

    # 使用jps查看kafka進程是否有啟動, 直接執行jps,正常如下:
    22707 QuorumPeerMain
    28439 Jps
    28314 Kafka
    

6.5 TOPIC

  • 創建Topic

    kafka-topics.sh --create --zookeeper 10.10.100.92:12181,10.10.100.91:12181,10.10.100.98:12181 --replication-factor 3 -partitions 3 --topic testogg
    

    成功時提示:Created topic "testogg".

  • 查看Topic

    #列出所有可用的topics
    kafka-topics.sh --list --zookeeper 10.10.100.91:12181
    # 查看指定topic 信息
    kafka-topics.sh --describe --zookeeper 10.10.100.91:12181,10.10.100.92:12181,10.10.100.98:12181 --topic testogg
    
  • 創建consumer 這里創建一個consumer,以便在安裝配置完ogg 后,進行驗證。

    kafka-console-consumer.sh -–zookeeper 10.10.100.92:12181,10.10.100.91:12181,10.10.100.98:12181 -–from-beginning –-topic oggtest
    

6.6 測試集群

  1. 發布消息 利用kafka自身提供的發布消息的腳本, 來創建發布消息.

    kafka-console-producer.sh --broker-list 10.10.100.91:9092,10.10.100.92:9092,10.10.100.98:9092 --topic testogg
    
  2. 接收消息

    kafka-console-consumer.sh --topic testogg --bootstrap-server 10.10.100.92:9092,10.10.100.91:9092,10.10.100.98:9092 --group testogg1 --from-beginning
    
  3. 測試結果
    • 消息發送端

      [root@pmo02 config]#    kafka-console-producer.sh --broker-list 10.10.100.91:9092,10.10.100.92:9092,10.10.100.98:9092 --topic testogg
      >1,'This is a test message', `date`
      >2,'Second test message',boooooooo
      >一人
      
    • 消息接收端

      [root@app-01 ~]# kafka-console-consumer.sh --topic testogg --bootstrap-server 10.10.100.92:9092,10.10.100.91:9092,10.10.100.98:9092 --group testogg1 --from-beginning
      2,'Second test message',boooooooo
      1�,'This is a test message', `date`
      一人
      

7 OGG 安裝配置

OGG針對不同的軟件有不同的版本,我們需要安裝針對Oracle 版本和針對大數據的版本。 本次測試的是, oracle 從備庫同步數據至kafka集群。因此我們需要將ogg for oracle 安裝至備庫(10.10.100.92)。

7.1 源端

 

7.1.1 准備

 
  1. 准備OGG軟件

    將文件上傳至服務器oracle用戶的家目錄. 並保證Oracle用戶對安裝包有操作權限:

    chown -R oracle:oinstall /home/oracle/123014_fbo_ggs_Linux_x64_shiphome.zip
    unzip 123014_fbo_ggs_Linux_x64_shiphome.zip
    # 軟件解壓后的路徑為fbo_ggs_Linux_x64_shiphome
    
  2. 修改OGG安裝的配置文件

    軟件安裝的配置文件存放於 =<path>/fbo_ggs_Linux_x64_shiphome/Disk1/response/= 中. 名為oggcore.rsp. 請按下面要求進行設置:

    # 保持不變
    oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v12_1_2
    # 與數據庫版本保持一致,分為ORA12c與ORA11g
    INSTALL_OPTION=ORA11g
    # 配置ogg 的安裝路徑
    SOFTWARE_LOCATION=/u01/app/ogg
    # 如果需要啟動mgr 需要配置以下 三項,如果不需要啟動,則不需要配置,此處不進行配置。
    START_MANAGER=
    MANAGER_PORT=
    DATABASE_LOCATION=
    # windows 平台以下兩項不需要配置。其他平台需要配置。
    INVENTORY_LOCATION=/u01/app/oraInventory
    UNIX_GROUP_NAME=oinstall
    
  3. 創建OGG安裝路徑
    # 用Oracle用戶創建
    mkdir -p /u01/app/ogg
    
  4. 靜默安裝OGG
    # 進入相關路徑
    cd ~/fbo_ggs_Linux_x64_shiphome/Disk1/
    # 執行靜默安裝
    ./runInstaller -silent -responseFile `pwd`/response/oggcore.rsp
    # 安裝過程中會提示安裝日志, 可通過日志查看安裝過程是否正常。
    
  5. 配置環境變量
    #添加OGG_HOME
    export OGG_HOME=/u01/app/ogg
    # 將OGG_HOME路徑添加至PATH
    export PATH=$PATH:$OGG_HOME
    
  6. 創建相關路徑

    ogg 提供了一個命令,用於創建相關的子目錄 ,比如數據目錄,配置文件存放目錄等。

    ggsci<<EOF
    create subdirs
    EOF
    

7.1.2 配置數據庫

 
  1. 開啟日志

    通過以下SQL檢查確認,要求結果都是yes:

    SQL> select force_logging,supplemental_log_data_min, supplemental_log_data_pk,supplemental_log_data_ui from v$database;
    
    FOR SUPPLEME SUP SUP
    --- -------- --- ---
    YES YES      YES YES
    

    如果不是YES,即為NO,根據上面SQL的查詢順序,對應的調整SQL為:

    alter database force logging;
    alter database add supplemental log data;
    alter database add supplemental log data (primary key) columns;
    alter database add supplemental log data (unique) columns;
    -- 后面兩個列可以一起處理,語句如下:
    alter database add supplemental log data (primary key,unique,foreign key) columns;
    -- 一般建議把foreign key 也加上,以防萬一。
    
  2. 創建表空間及用戶
    create tablespace ogg datafile '&absolute_path' size 1G;
    create user ogg identified by ogg123 default tablespace ogg;
    --如果不做ddl trigger,dba權限可以不給
    grant connect,resource,dba to ogg;
    GRANT alter session TO ogg;
    grant select any transaction, SELECT ANY DICTIONARY,SELECT ANY TABLE  TO ogg;
    
    --用戶配置表級追加日志
    GRANT ALTER ANY TABLE , FLASHBACK ANY TABLE   TO ogg;
    GRANT EXECUTE on DBMS_FLASHBACK TO ogg;
    GRANT EXECUTE ON utl_file TO oggtest;
    grant execute on sys.dbms_lob to ogg;
    
    --如下pl/sql塊是在oracle 11g之上版本用的,10g版本不需要執行
    BEGIN
    DBMS_GOLDENGATE_AUTH.GRANT_ADMIN_PRIVILEGE(
    Grantee                 => 'OGG',
    privilege_type          => 'CAPTURE',
    grant_select_privileges => TRUE,
    do_grants               => TRUE);
    END;
    /
    
  3. 配置ddl同步
    cd $GGATE
    sqlplus / as sysdba
    @marker_setup.sql;
    @ddl_setup.sql;
    @role_setup.sql;
    grant GGS_GGSUSER_ROLE to ogg;
    @ddl_enable.sql;
    
  4. 性能優化
    cd $OGG_HOME
    sqlplus / as sysdba
    @?/rdbms/admin/dbmspool
    -- ddl_pin將觸發器用到的plsql包放進內存中
    sqlplus / as sysdba
    @ddl_pin ogg;
    exit
    

7.1.3 配置OGG

 
  1. 配置MGR
    edit params mgr
    # 輸入如下內容,第三行,一般不配置.
    PORT 7809
    DYNAMICPORTLIST 7810-7860
    # AUTORESTART EXTRACT *, RETRIES 5, WAITMINUTES 3, RESETMINUTES 60
    PURGEOLDEXTRACTS ./dirdat/*, usecheckpoints, minkeepdays 1
    LAGREPORTHOURS 1
    LAGINFOMINUTES 30
    LAGCRITICALMINUTES 45
    
  2. 數據初始化配置

    數據初始化,指的是從源端Oracle 數據庫將已存在的需要的數據同步至目標端.
    此節中,操作都可在OGG 環境中進行。對OGG的內部邏輯了解的,也可以在shell環境中配置.

    1. 配置初始化進程
      add extract init01, sourceistable
      EDIT PARAMS init01
      # 編輯參數內容如下:
      EXTRACT init01
      SETENV (NLS_LANG=AMERICAN_AMERICA.ZHS16GBK)
      USERID ogg,PASSWORD ogg123
      #USERID c##ggadmin,PASSWORD ggadmin # for oracle 12C,注意用戶名格式
      RMTHOST 10.10.100.98, MGRPORT 7809
      RMTFILE ./dirdat/in,maxfiles 999, megabytes 500,format release 12.3
      # SOURCECATALOG PDBNAME # FOR ORACLE 12C,11g does not need.
      table oggtest.*;
      
    2. 生成表結構

      GoldenGate 提供了一個名為 DEFGEN 的專用工具,用於生成數據定義,當源表和目標表中 的定義不同時,Oracle GoldenGate 進程將引用該專用工具。在運行 DEFGEN 之前,需要 為其創建一個參數文件:

      edit param defgen
      USERID ogg,PASSWORD ogg123
      defsfile ./dirdef/defgen.def,format release 12.2 # format release 指的是OGG的版本
      # SOURCECATALOG orclpdb # for oracle 12C 。11g及之前版本不需要
      table oggtest.*;
      
      note
      配置中每個 table 行尾都要加上分號。不然會報錯。如上面配置:table oggtest.*;

      生成表結構文件,需要執行shell命令,如果配置中的文件已經存在,執行下面命令會報錯,所以 在執行前需要先刪除:

      rm -f $OGG_HOME/dirdef/defgen.def
      defgen paramfile dirprm/defgen.prm
      

      將生成的定義文件傳送到目標端, 目標端的replicate進程會使用這個文件。

      scp $OGG_HOME/dirdef/defgen.def 10.10.100.98:/u01/app/ogg/dirdef/
      
  3. 實時同步配置
    1. 關於集成模式

      standby 實例 不支持集成模式 集成模式關鍵的三個命令:

      add extract ext_kfk,integrated tranlog,threads 1,begin now
      DBLOGIN USERID <username>,PASSWORD <password>
      # for 12C
      register extract ext_kfk database container (orclpdb)
      # for 11g/ 10G
      register extract ext_kfk
      
    2. 配置抽取進程

      添加抽取進程,OGG命令行:

      add extract ext_kfk,tranlog,threads 1,begin now
      add exttrail ./dirdat/kf, extract ext_kfk, megabytes 500
      

      配置抽取進程參數,OGG命令行:

      edit params ext_kfk
      

      以下內容為參數明細

      EXTRACT ext_kfk
      USERID ogg, PASSWORD ogg123
      Setenv (NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
      gettruncates
      DISCARDFILE ./dirrpt/ext_kfk.dsc, APPEND, MEGABYTES 1024
      # RMTHOST 10.10.100.98, MGRPORT 9178
      # RMTFILE ./dirdat/kf,maxfiles 999, megabytes 500,format release 12.2
      DBOPTIONS  ALLOWUNUSEDCOLUMN
      REPORTCOUNT EVERY 1 MINUTES, RATE
      WARNLONGTRANS 2h,CHECKINTERVAL 300
      FETCHOPTIONS NOUSESNAPSHOT
      EXTTRAIL ./dirdat/kf
      TRANLOGOPTIONS MINEFROMACTIVEDG
      # TRANLOGOPTIONS altarchivelogdest primary instance orcl /u01/app/oracle/oradata/arch 配置此行時,將讀取歸檔日志
      GETUPDATEBEFORES
      NOCOMPRESSUPDATES
      NOCOMPRESSDELETES
      table oggtest.*;
      

      若配置rmthost rmtfile 參數, 可不配置分發進程(pump).

      • TABLE 關鍵詞,必須以分號結束。
      • TRANLOGOPTIONS altarchivelogdest 是standby 端作為ogg 源端, ogg 讀取歸檔的關鍵。也適用於RAC環境,不過RAC環境不建議配置。
      • 一般情況下,ogg 同步數據有5秒左右的延遲. 為了解決這個問題,有 的同學可能會配置 EOFDELAY或者EOFDELAYCSECS,以及FLUSHCSECS. 有 些情況,配置確實可以減少延遲,但不是所有情況都有用。而且配置 以后,很容易產生control file sequence read 等待。
    3. 配置分發進程

      OGG 環境添加分發進程:

      add extract pmp_kfk,exttrailsource ./dirdat/kf
      add rmttrail ./dirdat/kf,EXTRACT PMP_kfk,MEGABYTES 500
      

      編輯分發進程參數:

      edit param PMP_KAF1
      #內容如下:
      extract pmp_kfk
      USERID ogg, password ogg123
      PASSTHRU
      RMTHOST 10.10.100.98, MGRPORT 7809
      RMTTRAIL ./dirdat/kf,format release 12.3
      # 下面一行適用於12C
      # SOURCECATALOG orclpdb
      table oggtest.*;
      
 

7.2 目標端(kafka)

 

7.2.1 安裝

oracle 軟件統一安裝到/u01/app/路徑中,方便日后統一管理.OGG解壓到路徑/u01/app/ogg.

unzip OGG_BigData_Linux_x64_12.3.2.1.1.zip
tar -xvf OGG_BigData_Linux_x64_12.3.2.1.1.tar -C /u01/app/ogg/

7.2.2 配置環境變量

#su - oracle
#vi .bash_profile
# 內容如下:
JAVA_HOME=/opt/jdk1.8.0_181/
OGG_HOME=/u01/app/ogg
LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server/:$JAVA_HOME/lib:$LD_LIBRARY_PATH
PATH=$JAVA_HOME/bin:$PATH:$HOME/bin:$OGG_HOME

export PATH JAVA_HOME ORACLE_HOME OGG_HOME LD_LIBRARY_PATH

修改完環境變量執行: source .bash_profile

7.2.3 目標端OGG配置

 
  1. 創建相關路徑
    ggsci
    create subdirs
    

    示例:

    $ ggsci
    
    Oracle GoldenGate Command Interpreter
    Version 12.3.0.1.2 OGGCORE_OGGADP.12.3.0.1.2_PLATFORMS_180712.2305
    Linux, x64, 64bit (optimized), Generic on Jul 13 2018 00:46:09
    Operating system character set identified as UTF-8.
    
    Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.
    
    
    GGSCI (app-01) 1> create subdirs
    
    Creating subdirectories under current directory /home/oracle
    
    Parameter file                 /u01/app/ogg/dirprm: created.
    Report file                    /u01/app/ogg/dirrpt: created.
    Checkpoint file                /u01/app/ogg/dirchk: created.
    Process status files           /u01/app/ogg/dirpcs: created.
    SQL script files               /u01/app/ogg/dirsql: created.
    Database definitions files     /u01/app/ogg/dirdef: created.
    Extract data files             /u01/app/ogg/dirdat: created.
    Temporary files                /u01/app/ogg/dirtmp: created.
    Credential store files         /u01/app/ogg/dircrd: created.
    Masterkey wallet files         /u01/app/ogg/dirwlt: created.
    Dump files                     /u01/app/ogg/dirdmp: created.
    
  2. 復制參數文件
    cd $OGG_HOME/AdapterExamples/big-data/kafka
    cp * $OGG_HOME/dirprm
    
  3. 參數配置
    1. custom_kafka_producer.properties
      bootstrap.servers=10.1.1.246:9200,10.1.1.247:9200 --只需要改動這一行就行,指定kafka的地址和端口號
      acks=1
      reconnect.backoff.ms=1000
      value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
      key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
      batch.size=16384
      linger.ms=10000
      
    2. kafka.props

      該配置文件主要控制着消息輸出格式。由參數: gg.handler.kafkahandler.format 控制。

      • text mode

        gg.handlerlist = kafkahandler
        gg.handler.kafkahandler.type=kafka
        gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
        #The following resolves the topic name using the short table name
        gg.handler.kafkahandler.topicMappingTemplate= testogg -- 此處指定為要同步到的目標testogg名字
        gg.handler.kafkahandler.format=delimitedtext
        gg.handler.kafkahandler.format.fieldDelimiter=|
        gg.handler.kafkahandler.format.insertOpKey=I
        gg.handler.kafkahandler.format.updateOpKey=U
        gg.handler.kafkahandler.format.deleteOpKey=D
        gg.handler.kafkahandler.format.truncateOpKey=T
        gg.handler.kafkahandler.SchemaTopicName= testogg --此處指定為要同步到的目標topic名字
        gg.handler.kafkahandler.BlockingSend =false
        gg.handler.kafkahandler.includeTokens=false
        gg.handler.kafkahandler.mode=op
        goldengate.userexit.timestamp=utc
        goldengate.userexit.writers=javawriter
        javawriter.stats.display=TRUE
        javawriter.stats.full=TRUE
        gg.log=log4j
        gg.log.level=INFO
        gg.report.time=30sec
        gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ --指定classpath,這里很重要,必須有kafka安裝文件的類庫。
        javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
        
      • json mode

          gg.handlerlist = kafkahandler
        gg.handler.kafkahandler.type=kafka
        gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
        #The following resolves the topic name using the short table name
        gg.handler.kafkahandler.topicMappingTemplate=xiamen_dev
        #The following selects the message key using the concatenated primary keys
        gg.handler.kafkahandler.keyMappingTemplate=xiamen_dev
        gg.handler.kafkahandler.format=json
        gg.handler.hdfs.format.jsonDelimiter=CDATA[]
        #gg.handler.kafkahandler.format.fieldDelimiter=
        #gg.handler.name.format.pkUpdateHandling=delete-insert
        gg.handler.kafkahandler.format.insertOpKey=I
        gg.handler.kafkahandler.format.updateOpKey=U
        gg.handler.kafkahandler.format.deleteOpKey=D
        gg.handler.kafkahandler.format.truncateOpKey=T
        gg.handler.kafkahandler.SchemaTopicName=xiamen_dev
        gg.handler.kafkahandler.BlockingSend =false
        gg.handler.kafkahandler.includeTokens=false
        gg.handler.kafkahandler.mode=op
        
        
        
        goldengate.userexit.timestamp=utc
        goldengate.userexit.writers=javawriter
        javawriter.stats.display=TRUE
        javawriter.stats.full=TRUE
        
        gg.log=log4j
        gg.log.level=INFO
        
        gg.report.time=30sec
        
        #Sample gg.classpath for Apache Kafka
        gg.classpath=dirprm/:/opt/kafka_2.12-2.1.1/libs/*:/u01/app/ogg/*:/u01/app/ogg/lib/*
        #Sample gg.classpath for HDP
        #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*
        
        javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
        
  4. 配置MGR
    edit params mgr
    # 內容如下(第三行在實際配置時已刪除,一般不配置成自動啟動):
    PORT 7809
    DYNAMICPORTLIST 7810-7860
    -- AUTORESTART REPLICAT *, RETRIES 5, WAITMINUTES 3, RESETMINUTES 60
    PURGEOLDEXTRACTS ./dirdat/*, usecheckpoints, minkeepdays 1
    LAGREPORTHOURS 1
    LAGINFOMINUTES 30
    LAGCRITICALMINUTES 45
    
  5. 配置初始化進程
    • 添加並配置初始化進程

      # 添加進程
      ADD replicat init01, specialrun
      # 配置進程
      edit params init01
      

      參數內容:

      SPECIALRUN
      end runtime
      setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
      targetdb libfile libggjava.so set property=./dirprm/kafka.props
      SOURCEDEFS ./dirdef/defgen.def   --- 如果有多個map ,則需要配置sourcedefs,如果只有一個則可以不配置。
      EXTFILE ./dirdat/in
      reportcount every 1 minutes, rate
      grouptransops 10000
      map oggtest.*,target oggtest.*;
      

      如果是12C 數據庫, map 后面的格式為pdb.schema.table , target schema.table

    • 示例

      GGSCI (app-01) 1> ADD replicat init01, specialrun
      REPLICAT added.
      GGSIC (app-01) 2> exit
      cd $OGG_HOME/dirprm
      cat >> init01.prm <<EOF
      SPECIALRUN
      end runtime
      setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
      targetdb libfile libggjava.so set property=./dirprm/kafka.props
      SOURCEDEFS ./dirdef/defgen.def
      EXTFILE ./dirdat/in
      reportcount every 1 minutes, rate
      grouptransops 10000
      map orclpdb.oggtest.*,target oggtest.*;
      EOF
      

    示例中並沒有通過 eidt params 命令來配置初始化進程的參數,而是通過shell的方式。 這是因為 edit params 方式的實際操作就是將配置內容輸出到操作系統 dirprm/中的 文件。文件名是在進程名后加 rpm 后綴。

  6. 配置數據恢復進程

    恢復進程,用於讀取源庫傳送來的數據,解析成ddl/dml,並發送給KAFKA。

    # GGSCI 環境
    add replicat rep1,exttrail ./dirdat/kf
    # shell 環境
    cd $OGG_HOME/dirprm/
    cat >> rep1.prm <<EOF
    REPLICAT rep1
    setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
    HANDLECOLLISIONS
    targetdb libfile libggjava.so set property=./dirprm/kafka.props
    SOURCEDEFS ./dirdef/defgen.def
    reportcount every 1 minutes, rate
    grouptransops 10000
    MAP oggtest.*,target oggtest.*;
    EOF
    

7.3 同步數據

  • 源端

    源端
    start ext_kfk
    start pmp_kfk
    start init01
    
  • 目標端初始化數據

    ./replicat paramfile ./dirprm/init01.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD
    

    kafka console consumer text-mode示例:

    I|OGGTEST.T_TEST_LOB|2019-03-22 06:38:33.026733|2019-03-22T15:55:07.326000|00000000000000001803|1|adbx123

    I|OGGTEST.T_TEST_LOB|2019-03-22 06:38:33.026733|2019-03-22T15:55:08.183000|00000000000000001886|2|adbx123lkjn

  • 目標端追加數據

    I|OGGTEST.T_TEST_LOB|2019-03-22 11:47:11.477910|2019-03-22T19:47:16.932000|00000000010000002197|6|biubiubiu|2019-03-22 20:25:49
    
    D|OGGTEST.T_TEST_LOB|2019-03-22 11:55:12.461842|2019-03-22T19:55:18.378000|00000000010000002363|5||2019-03-22 19:37:51
    
    U|OGGTEST.T_TEST_LOB|2019-03-22 11:55:12.461842|2019-03-22T19:55:18.385000|00000000010000002617|4|123|2019-03-22 16:48:03
    

8 錯誤

 

8.1 OGG-01044

錯誤信息:

ERROR   OGG-01044  Oracle GoldenGate Capture for Oracle, ext_kfk.prm:  The trail './dirdat/kf' is not assigned to extract 'EXT_KFK'. Assign the trail to the extract with the command "ADD EXTTRAIL/RMTTRAIL ./dirdat/kf, EXTRACT EXT_KFK".

該錯誤出現,是由於 在添加extrace 時,沒有將exttrail 與extract name 進行綁定。按照提示操作,即可。

8.2 OGG-01389

錯誤信息:OGG-01389 File header failed to parse tokens. 出現此錯誤信息,原因是生成的表定義文件的頭信息有固定的格式,不同的OGG版本,格式不一樣。 因此,要想讓目標端的OGG 可以正常識別表定義文件,需要指定目標端OGG的版本。 在defsfile 或者rmtfile 后加上format release <version> 即可。

8.3 OGG-06439

這個錯誤是一個警告,並不是錯誤類型。出現這個錯誤,說明表上沒有主鍵。可以給表添加一個主鍵,或者完全不處理。

8.4 OGG-00060

錯誤信息:

ERROR   OGG-00060  Oracle GoldenGate Capture for Oracle, ext_kfk.prm:  Extract requires a value specified for parameter ALTARCHIVELOGDEST when in archived log only mode.

8.5 OGG-00868

單機環境中
standby 日志thread 號與checkpoint的日志線程不匹配。重新添加standby redo log,指定thread 與其他日志組一致即可。
RAC 環境

重新添加抽取進程, 添加時指定thread 編號

add extract ymsextr,tranlog,threads 1,begin now

Author: halberd

Created: 2019-06-13 Thu 19:15

Validate


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM