3、使用Oracle Logminer同步Demo


使用Oracle Logminer同步Demo

1 Demo介紹

1.1 Demo設想

前面介紹了Oracle LogMiner配置使用以及使用LogMiner進行解析日志文件性能,在這篇文章中將利用LogMiner進行數據同步,實現從源目標數據庫到目標數據庫之間的數據同步。由於LogMiner支持的版本是8.1及以上,所以進行數據同步的Oracle數據庫版本也必須是8.1及以上。

當然在本文中介紹的是LogMiner進行數據同步例子,也可以利用LogMiner進行數據審計、數據操作追蹤等功能,由於這些從操作原理來說是一致,在本文不做討論。

1.2 框架圖

clip_image002[6]

1.3 流程圖

clip_image002[4]

l 配置階段

1、 控制端:指定源端、目標端數據庫信息、LOGMINER同步時間等配置信息;

l 獲取源端同步數據

2、 控制台:通過定時輪詢的方式檢測是否到達數據同步時間,如果是則進行數據同步,否則繼續進行輪詢;

3、 源數據庫:定時加載數據庫歸檔日志文件到動態表v$logmnr_contents中;

4、 源數據庫:根據條件讀取指定sql語句;

l 目標端數據入庫

5、 源數據庫:執行sql語句。

2 代碼分析

2.1 目錄及環境配置

在該Demo項目中需要引入Oracle JDBC驅動包,具體項目分為四個類:

1. Start.java:程序入口方法;

2. SyncTask.java:數據同步Demo核心,生成字典文件和讀取日志文件、目標數據庫執行SQL語句等;

3. DataBase.java:數據庫操作基礎類;

4. Constants.java:源數據庫、目標數據庫配置、字典文件和歸檔文件路徑。

clip_image006[4]

2.2 代碼分析

2.2.1 Constants.java

在該類中設置了數據同步開始SCN號、源數據庫配置、目標數據庫配置以及字典文件/日志文件路徑。需要注意的是在源數據庫配置中有兩個用戶:一個是調用LogMiner用戶,該用戶需要擁有dbms_logmnr、dbms_logmnr_d兩個過程權限,在該Demo中該用為為sync;另外一個為LogMiner讀取該用戶操作SQL語句,在該Demo中該用為為LOGMINER。

package com.constants;

/**
 * [Constants]|描述:Logminer配置參數
 * @作者: ***
 * @日期: 2013-1-15 下午01:53:57
 * @修改歷史:
 */
public class Constants {
    
    /** 上次數據同步最后SCN號 */
    public static String LAST_SCN = "0";

    /** 源數據庫配置 */
    public static String DATABASE_DRIVER="oracle.jdbc.driver.OracleDriver";
    public static String SOURCE_DATABASE_URL="jdbc:oracle:thin:@127.0.0.1:1521:practice";
    public static String SOURCE_DATABASE_USERNAME="sync";
    public static String SOURCE_DATABASE_PASSWORD="sync";
    public static String SOURCE_CLIENT_USERNAME = "LOGMINER";
    
    /** 目標數據庫配置 */
    public static String SOURCE_TARGET_URL="jdbc:oracle:thin:@127.0.0.1:1521:target";
    public static String SOURCE_TARGET_USERNAME="target";
    public static String SOURCE_TARGET_PASSWORD="target";
    
    /** 日志文件路徑 */
    public static String LOG_PATH = "D:\\oracle\\oradata\\practice";
    
    /** 數據字典路徑 */
    public static String DATA_DICTIONARY = "D:\\oracle\\oradata\\practice\\LOGMNR";
}
2.2.2 SyncTask.java

在該類中有兩個方法,第一個方法為createDictionary,作用為生成數據字典文件,另外一個是startLogmur,該方法是LogMiner分析同步方法。

/**
 * <p>方法名稱: createDictionary|描述: 調用logminer生成數據字典文件</p>
 * @param sourceConn 源數據庫連接
 * @throws Exception 異常信息
 */
public void createDictionary(Connection sourceConn) throws Exception{
    String createDictSql = "BEGIN dbms_logmnr_d.build(dictionary_filename => 'dictionary.ora', dictionary_location =>'"+Constants.DATA_DICTIONARY+"'); END;";
    CallableStatement callableStatement = sourceConn.prepareCall(createDictSql);
    callableStatement.execute();
}

 

/**
 * <p>方法名稱: startLogmur|描述:啟動logminer分析 </p>
 * @throws Exception
 */
public void startLogmur() throws Exception{
    
    Connection sourceConn = null;
    Connection targetConn = null;
    try {
        ResultSet resultSet = null;
        
        // 獲取源數據庫連接
        sourceConn = DataBase.getSourceDataBase();
        Statement statement = sourceConn.createStatement();
        
        // 添加所有日志文件,本代碼僅分析聯機日志
        StringBuffer sbSQL = new StringBuffer();
        sbSQL.append(" BEGIN");
        sbSQL.append(" dbms_logmnr.add_logfile(logfilename=>'"+Constants.LOG_PATH+"\\REDO01.LOG', options=>dbms_logmnr.NEW);");
        sbSQL.append(" dbms_logmnr.add_logfile(logfilename=>'"+Constants.LOG_PATH+"\\REDO02.LOG', options=>dbms_logmnr.ADDFILE);");
        sbSQL.append(" dbms_logmnr.add_logfile(logfilename=>'"+Constants.LOG_PATH+"\\REDO03.LOG', options=>dbms_logmnr.ADDFILE);");
        sbSQL.append(" END;");
        CallableStatement callableStatement = sourceConn.prepareCall(sbSQL+"");
        callableStatement.execute();
        
        // 打印獲分析日志文件信息
        resultSet = statement.executeQuery("SELECT db_name, thread_sqn, filename FROM v$logmnr_logs");
        while(resultSet.next()){
            System.out.println("已添加日志文件==>"+resultSet.getObject(3));
        }
        
        System.out.println("開始分析日志文件,起始scn號:"+Constants.LAST_SCN);
        callableStatement = sourceConn.prepareCall("BEGIN dbms_logmnr.start_logmnr(startScn=>'"+Constants.LAST_SCN+"',dictfilename=>'"+Constants.DATA_DICTIONARY+"\\dictionary.ora',OPTIONS =>DBMS_LOGMNR.COMMITTED_DATA_ONLY+dbms_logmnr.NO_ROWID_IN_STMT);END;");
        callableStatement.execute();
        System.out.println("完成分析日志文件");
        
        // 查詢獲取分析結果
        System.out.println("查詢分析結果");
        resultSet = statement.executeQuery("SELECT scn,operation,timestamp,status,sql_redo FROM v$logmnr_contents WHERE seg_owner='"+Constants.SOURCE_CLIENT_USERNAME+"' AND seg_type_name='TABLE' AND operation !='SELECT_FOR_UPDATE'");
        
        // 連接到目標數據庫,在目標數據庫執行redo語句
        targetConn = DataBase.getTargetDataBase();
        Statement targetStatement = targetConn.createStatement();
        
        String lastScn = Constants.LAST_SCN;
        String operation = null;
        String sql = null;
        boolean isCreateDictionary = false;
        while(resultSet.next()){
            lastScn = resultSet.getObject(1)+"";
            if( lastScn.equals(Constants.LAST_SCN) ){
                continue;
            }
            
            operation = resultSet.getObject(2)+"";
            if( "DDL".equalsIgnoreCase(operation) ){
                isCreateDictionary = true;
            }
            
            sql = resultSet.getObject(5)+"";
            
            // 替換用戶
            sql = sql.replace("\""+Constants.SOURCE_CLIENT_USERNAME+"\".", "");
            System.out.println("scn="+lastScn+",自動執行sql=="+sql+"");
            
            try {
                targetStatement.executeUpdate(sql.substring(0, sql.length()-1));
            } catch (Exception e) {
                System.out.println("測試一下,已經執行過了");
            }
        }
        
        // 更新scn
        Constants.LAST_SCN = (Integer.parseInt(lastScn))+"";
        
        // DDL發生變化,更新數據字典
        if( isCreateDictionary ){
            System.out.println("DDL發生變化,更新數據字典");
            createDictionary(sourceConn);
            System.out.println("完成更新數據字典");
            isCreateDictionary = false;
        }
        
        System.out.println("完成一個工作單元");
        
    }
    finally{
        if( null != sourceConn ){
            sourceConn.close();
        }
        if( null != targetConn ){
            targetConn.close();
        }
        
        sourceConn = null;
        targetConn = null;
    }
}

3 運行結果

3.1 源數據庫操作

1、創建AAAAA表,並插入數據

clip_image008[4]

2、創建EMP1表

clip_image010[4]

3.2 運行Demo

在控制台中輸出如下日志

clip_image012[4]

3.3 目標數據庫結果

創建AAAAA和EMP1表,並在AAAAA插入了數據

clip_image014[4]


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM