datax中oracleWriter


在使用datax的oraclewriter時,由於對oracle的不熟悉,以及c++編譯的不熟悉,頗費了一些周折。在此,記錄一下,供再次使用的人參考。

1.oracleWriter :oracle提供了OCCI接口,便於直接往oracle里load數據,但是是c++的接口,所以,datax的oracleWriter通過對cpp代碼的包裝,使用JNI的方式去調用。

2.oracleJdbcWriter使用起來就簡單多了,后面附上代碼,不再贅述。

 

准備工作為:oracle客戶端的安裝和liboraclewriter.so的編譯。

一:oracle客戶端安裝。本位在redhat64上使用了oracle11g,以下為詳細安裝步驟。

  1.oracle官網下載rpm包,進行安裝。

       官網地址:http://www.oracle.com/technetwork/topics/linuxx86-64soft-092277.html  

        下載包:

      oracle-instantclient11.2-basic-11.2.0.1.0-1.x86_64.rpm
      oracle-instantclient11.2-devel-11.2.0.1.0-1.x86_64.rpm
      oracle-instantclient11.2-sqlplus-11.2.0.1.0-1.x86_64.rpm

        安裝命令:

      rpm -ivh oracle-instantclient11.2-basic-11.2.0.1.0-1.x86_64.rpm 
      rpm -ivh oracle-instantclient11.2-sqlplus-11.2.0.1.0-1.x86_64.rpm
      rpm -ivh oracle-instantclient11.2-devel-11.2.0.1.0-1.x86_64.rpm  

   環境變量配置:

      /etc/profile,追加以下內容

      export ORACLE_HOME=/usr/lib/oracle/11.2/client64
      export TNS_ADMIN=$ORACLE_HOME/network/admin
      export NLS_LANG='AMERICAN_AMERICA.UTF8'
      export LD_LIBRARY_PATH=$ORACLE_HOME/lib
      export PATH=$ORACLE_HOME/bin:$PATH

      保存,source /etc/profile 使之生效。

    至此,oracle的安裝完畢。

/**       
因為對oracle的不熟悉,補充幾句,以免耽誤時間找各種配置。 sqlplus 使用時,網上很多資料講要配置$TNS_ADMIN下的tnsnames.org。 其實,OCCI使用時,oracleWriter的代碼里,我們使用以下方式去連接,是不需要配置的。 不需配置tnsnames.ora的形式: java代碼中: logon = username + "/" + password + "@//" + ip + ":" + port + "/" + dbname; 命令行中 : sqlplus user/pass@//ip:port/db 格式不對時,會報以下錯誤:    ERROR:     ORA-12541: TNS:no listener */

  

二:liboraclewriter.so的編譯及使用,以下為詳細安裝步驟。

1. 將datax下oracledumper下的代碼,下載到linux機上,准備編譯。(在之前裝好oracle客戶端的機器上)
2.  如果本地代碼使用的包名不同於源碼的,需要修改對應的文件。

修改文件:include中:xx_OracleWriterJni.h(修改文件名和 OracleWriterJni.h中方法的包路徑名。)

     src中:xx_OracleWriterJni.cpp (修改文件名)

3.修改src下Makefile 文件(不熟c++編譯的切記)。

  注意點:

   根據錯誤提示,有一個變量應該需要追加const

-Wl,-rpath  so文件運行時,依賴的包路徑。確保此路徑准確,oracle安裝后,確認路徑正確對應。
INCLUDE=-I../include -I$$JAVA_HOME/include/linux -I$$JAVA_HOME/include/
LIBS=-lclntsh -liconv -L../lib -L${ORACLE_HOME}/lib -L../../../../libs/  -L/usr/lib/oracle/11.2/client64/lib/
CC=g++
OBJS=liboraclewriter.so
CFLAGS=-shared -fPIC  -Wl,-rpath=/usr/lib/oracle/11.2/client64/lib/
CPP=common.cpp dumper.cpp oradumper.cpp strsplit.cpp com_suning_dc_cybertron_datax_plugins_writer_oraclewriter_OracleWriterJni.cpp

OBJS: $(CPP)
    $(CC) $(INCLUDE) -o $(OBJS) $(CPP) $(CFLAGS) $(LIBS) 
clean:
    rm -rf $(OBJS)

4.make 生成liboraclewriter.so。(make環境缺少g++等,自行安裝,不贅述)。

至此,可以happy的去代碼中使用適合本地環境的occi接口的oraclewriter了,其他幾個so包使用源碼中自帶的就好。

System.load(PropertyReader.getJETFIRE_HOME() + "libs"
                    + "/libcommon.so");
System.load(PropertyReader.getJETFIRE_HOME() + "libs"
                    + "/libcharset.so");
System.load(PropertyReader.getJETFIRE_HOME() + "libs"
                    + "/libiconv.so.2");
System.load(PropertyReader.getJETFIRE_HOME() + "libs"
                    + "/liboraclewriter.so");

 

OCCI 和 jdbc writer的性能對比:

occi :
Total time costs          :                142s
Average byte speed        :               1MB/s
Average line speed        :            35211L/s
Total transferred records :             5000000
Total discarded records   :                   0
jdbc:
Total time costs          :               2229s
Average byte speed        :              68KB/s
Average line speed        :             2243L/s
Total transferred records :             5000000

  

問題及異常:

1.OCI使用的是direct-insert的方式,需要鎖表,有時報錯如下:

現象:
OCI Error -1 occurred at File oradumper.cpp:1605
Error[32661] - ORA-00604: error occurred at recursive SQL level 1
ORA-01031: insufficient privileges

status: -1, 1605, oradumper.cpp, init_load
1605, oradumper.cpp, init_load

對策:
oracle上除了賦予用戶insert select權限以外,還要
grant lock any table to 你的用戶。

 

 

附:jdbcWriter代碼

public class OracleJdbcWriter extends Writer {

    private Logger logger = Logger.getLogger(OracleJdbcWriter.class);

    private String password;

    private String username;

    private String dbname;

    private String table;

    private String pre;

    private String post;

    private String encoding;

    private int limit;

    private int failCount;// count error lines

    private long concurrency;

    private int batchSize;

    private String sourceUniqKey = "";

    private String port;

    private String insert;

    private String host;

    private String DRIVER_NAME = "oracle.jdbc.driver.OracleDriver";

    private Connection connection;

    private String writeColumns;

    @Override
    public int init() {
        password = param.getValue(ParamKey.password, "");
        username = param.getValue(ParamKey.username, "");
        host = param.getValue(ParamKey.ip);
        port = param.getValue(ParamKey.port, "3306");
        dbname = param.getValue(ParamKey.dbname, "");
        table = param.getValue(ParamKey.table, "");
        pre = param.getValue(ParamKey.pre, "");
        post = param.getValue(ParamKey.post, "");
        insert = param.getValue(ParamKey.insert, "");
        encoding = param.getValue(ParamKey.encoding, "UTF-8");
        limit = param.getIntValue(ParamKey.limit, 1000);
        concurrency = param.getIntValue(ParamKey.concurrency, 1);
        writeColumns = param.getValue(ParamKey.writeColumns, "");
        batchSize = param.getIntValue(ParamKey.batchSize, 50000);
        this.sourceUniqKey = DBSource.genKey(this.getClass(), host, port,
                dbname);
        this.host = param.getValue(ParamKey.ip);
        this.port = param.getValue(ParamKey.port, "3306");
        this.dbname = param.getValue(ParamKey.dbname);

        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int prepare(PluginParam param) {
        this.init();
        this.setParam(param);

        DBSource.register(this.sourceUniqKey, this.genProperties());

        if (StringUtils.isBlank(this.pre))
            return PluginStatus.SUCCESS.value();

        Statement stmt = null;
        try {
            this.connection = DBSource.getConnection(this.sourceUniqKey);

            stmt = this.connection.createStatement(
                    ResultSet.TYPE_SCROLL_INSENSITIVE,
                    ResultSet.CONCUR_UPDATABLE);

            for (String subSql : this.pre.split(";")) {
                this.logger.info(String.format("Excute prepare sql %s .",
                        subSql));
                stmt.execute(subSql);
            }
            this.connection.commit();
            return PluginStatus.SUCCESS.value();
        } catch (Exception e) {
            throw new DataExchangeException(e.getCause());
        } finally {
            try {
                if (null != stmt) {
                    stmt.close();
                }
                if (null != this.connection) {
                    this.connection.close();
                    this.connection = null;
                }
            } catch (SQLException e) {
            }
        }
    }

    @Override
    public int connect() {
        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int startWrite(LineReceiver receiver) {
        PreparedStatement ps = null;
        try {
            this.connection = DBSource.getConnection(this.sourceUniqKey);
            Line line = null;
            int lines = 0;
            line = receiver.getFromReader();
            if (line == null) {// 讀取數據為空的情況。
                return PluginStatus.SUCCESS.value();
            }
            this.insert = buildInsertStr(line);
            ps = this.connection.prepareStatement(this.insert,
                    ResultSet.TYPE_SCROLL_INSENSITIVE,
                    ResultSet.CONCUR_UPDATABLE);
            this.connection.setAutoCommit(false);
            while (line != null) {
                try {
                    for (int i = 0; i < line.getFieldNum(); i++) {
                        ps.setObject(i + 1, line.getField(i));
                    }
                    ps.execute();
                } catch (SQLException e) {
                    logger.error("Invalid Data: " + line.toString(','));
                    logger.error(e.getMessage());
                    failCount++;
                    if (failCount >= this.limit) {
                        logger.error("出錯條數 (" + failCount + ") 超過限制條數。");
                        e.printStackTrace();
                        throw new DataExchangeException(e);
                    } else {
                        continue;
                    }

                }
                if (lines++ == this.batchSize) {
                    logger.info(lines + " committed by worker "
                            + Thread.currentThread().getName() + " .");
                    lines = 0;
                    this.connection.commit();

                }
                line = receiver.getFromReader();
            }
            this.connection.commit();
            this.connection.setAutoCommit(true);
            this.getMonitor().setFailedLines(this.failCount);
            return PluginStatus.SUCCESS.value();
        } catch (Exception e2) {
            throw new DataExchangeException(e2.getCause());
        } finally {
            if (null != ps)
                try {
                    ps.close();
                } catch (SQLException e3) {
                }
        }
    }

    @Override
    public int post(PluginParam param) {
        if (StringUtils.isBlank(this.post))
            return PluginStatus.SUCCESS.value();

        Statement stmt = null;
        try {
            this.connection = DBSource.getConnection(this.sourceUniqKey);

            stmt = this.connection.createStatement(
                    ResultSet.TYPE_SCROLL_INSENSITIVE,
                    ResultSet.CONCUR_UPDATABLE);

            for (String subSql : this.post.split(";")) {
                this.logger.info(String.format("Excute prepare sql %s .",
                        subSql));
                stmt.execute(subSql);
            }

            return PluginStatus.SUCCESS.value();
        } catch (Exception e) {
            e.printStackTrace();
            throw new DataExchangeException(e.getCause());
        } finally {
            try {
                if (null != stmt) {
                    stmt.close();
                }
                if (null != this.connection) {
                    this.connection.close();
                    this.connection = null;
                }
            } catch (Exception e2) {
            }

        }
    }

    @Override
    public List<PluginParam> split(PluginParam param) {
        OracleJdbcWriterSplitter splitter = new OracleJdbcWriterSplitter();
        splitter.setParam(param);
        splitter.init();
        return splitter.split();
    }

    @Override
    public int commit() {
        return PluginStatus.SUCCESS.value();
    }

    @Override
    public int finish() {
        return PluginStatus.SUCCESS.value();
    }

    private Properties genProperties() {
        Properties p = new Properties();
        p.setProperty("driverClassName", this.DRIVER_NAME);
        String url = "jdbc:oracle:thin:@" + this.host + ":" + this.port + "/"
                + this.dbname;
        p.setProperty("url", url);
        p.setProperty("username", this.username);
        p.setProperty("password", this.password);
        p.setProperty("maxActive", String.valueOf(this.concurrency + 2));

        return p;
    }

    private String buildInsertStr(Line line) {
        String sql = "";
        String[] subcos = null;
        // 判斷到底是全部插入表還是部分插入,最終確定sql
        if (!writeColumns.equals("")) {
            // 如果是部分插入的話
            sql = "insert into " + table + "(";
            subcos = writeColumns.split(",");
            sql += writeColumns;
            sql += ") values(";
            for (int i = 0; i < subcos.length; i++) {
                sql += "?";
                sql += ",";
            }
            sql = sql.substring(0, sql.length() - 1);
            sql += ")";
        } else {
            sql = "insert into " + table + " values(";
            for (int i = 0; i < line.getFieldNum(); i++) {
                sql += "?";
                sql += ",";
            }
            sql = sql.substring(0, sql.length() - 1);
            sql += ")";
        }
        return sql;
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM