在使用datax的oraclewriter時,由於對oracle的不熟悉,以及c++編譯的不熟悉,頗費了一些周折。在此,記錄一下,供再次使用的人參考。
1.oracleWriter :oracle提供了OCCI接口,便於直接往oracle里load數據,但是是c++的接口,所以,datax的oracleWriter通過對cpp代碼的包裝,使用JNI的方式去調用。
2.oracleJdbcWriter使用起來就簡單多了,后面附上代碼,不再贅述。
准備工作為:oracle客戶端的安裝和liboraclewriter.so的編譯。
一:oracle客戶端安裝。本位在redhat64上使用了oracle11g,以下為詳細安裝步驟。
1.oracle官網下載rpm包,進行安裝。
官網地址:http://www.oracle.com/technetwork/topics/linuxx86-64soft-092277.html
下載包:
oracle-instantclient11.2-basic-11.2.0.1.0-1.x86_64.rpm
oracle-instantclient11.2-devel-11.2.0.1.0-1.x86_64.rpm
oracle-instantclient11.2-sqlplus-11.2.0.1.0-1.x86_64.rpm
安裝命令:
rpm -ivh oracle-instantclient11.2-basic-11.2.0.1.0-1.x86_64.rpm
rpm -ivh oracle-instantclient11.2-sqlplus-11.2.0.1.0-1.x86_64.rpm
rpm -ivh oracle-instantclient11.2-devel-11.2.0.1.0-1.x86_64.rpm
環境變量配置:
/etc/profile,追加以下內容
export ORACLE_HOME=/usr/lib/oracle/11.2/client64
export TNS_ADMIN=$ORACLE_HOME/network/admin
export NLS_LANG='AMERICAN_AMERICA.UTF8'
export LD_LIBRARY_PATH=$ORACLE_HOME/lib
export PATH=$ORACLE_HOME/bin:$PATH
保存,source /etc/profile 使之生效。
至此,oracle的安裝完畢。
/**
因為對oracle的不熟悉,補充幾句,以免耽誤時間找各種配置。 sqlplus 使用時,網上很多資料講要配置$TNS_ADMIN下的tnsnames.org。 其實,OCCI使用時,oracleWriter的代碼里,我們使用以下方式去連接,是不需要配置的。 不需配置tnsnames.ora的形式: java代碼中: logon = username + "/" + password + "@//" + ip + ":" + port + "/" + dbname; 命令行中 : sqlplus user/pass@//ip:port/db 格式不對時,會報以下錯誤: ERROR: ORA-12541: TNS:no listener */
二:liboraclewriter.so的編譯及使用,以下為詳細安裝步驟。
1. 將datax下oracledumper下的代碼,下載到linux機上,准備編譯。(在之前裝好oracle客戶端的機器上)
2. 如果本地代碼使用的包名不同於源碼的,需要修改對應的文件。
修改文件:include中:xx_OracleWriterJni.h(修改文件名和 OracleWriterJni.h中方法的包路徑名。)
src中:xx_OracleWriterJni.cpp (修改文件名)
3.修改src下Makefile 文件(不熟c++編譯的切記)。
注意點:
根據錯誤提示,有一個變量應該需要追加const
-Wl,-rpath so文件運行時,依賴的包路徑。確保此路徑准確,oracle安裝后,確認路徑正確對應。
INCLUDE=-I../include -I$$JAVA_HOME/include/linux -I$$JAVA_HOME/include/ LIBS=-lclntsh -liconv -L../lib -L${ORACLE_HOME}/lib -L../../../../libs/ -L/usr/lib/oracle/11.2/client64/lib/ CC=g++ OBJS=liboraclewriter.so CFLAGS=-shared -fPIC -Wl,-rpath=/usr/lib/oracle/11.2/client64/lib/ CPP=common.cpp dumper.cpp oradumper.cpp strsplit.cpp com_suning_dc_cybertron_datax_plugins_writer_oraclewriter_OracleWriterJni.cpp OBJS: $(CPP) $(CC) $(INCLUDE) -o $(OBJS) $(CPP) $(CFLAGS) $(LIBS) clean: rm -rf $(OBJS)
4.make 生成liboraclewriter.so。(make環境缺少g++等,自行安裝,不贅述)。
至此,可以happy的去代碼中使用適合本地環境的occi接口的oraclewriter了,其他幾個so包使用源碼中自帶的就好。
System.load(PropertyReader.getJETFIRE_HOME() + "libs" + "/libcommon.so"); System.load(PropertyReader.getJETFIRE_HOME() + "libs" + "/libcharset.so"); System.load(PropertyReader.getJETFIRE_HOME() + "libs" + "/libiconv.so.2"); System.load(PropertyReader.getJETFIRE_HOME() + "libs" + "/liboraclewriter.so");
OCCI 和 jdbc writer的性能對比:
occi :
Total time costs : 142s Average byte speed : 1MB/s Average line speed : 35211L/s Total transferred records : 5000000 Total discarded records : 0
jdbc:
Total time costs : 2229s Average byte speed : 68KB/s Average line speed : 2243L/s Total transferred records : 5000000
問題及異常:
1.OCI使用的是direct-insert的方式,需要鎖表,有時報錯如下: 現象: OCI Error -1 occurred at File oradumper.cpp:1605 Error[32661] - ORA-00604: error occurred at recursive SQL level 1 ORA-01031: insufficient privileges status: -1, 1605, oradumper.cpp, init_load 1605, oradumper.cpp, init_load 對策: oracle上除了賦予用戶insert select權限以外,還要 grant lock any table to 你的用戶。
附:jdbcWriter代碼
public class OracleJdbcWriter extends Writer { private Logger logger = Logger.getLogger(OracleJdbcWriter.class); private String password; private String username; private String dbname; private String table; private String pre; private String post; private String encoding; private int limit; private int failCount;// count error lines private long concurrency; private int batchSize; private String sourceUniqKey = ""; private String port; private String insert; private String host; private String DRIVER_NAME = "oracle.jdbc.driver.OracleDriver"; private Connection connection; private String writeColumns; @Override public int init() { password = param.getValue(ParamKey.password, ""); username = param.getValue(ParamKey.username, ""); host = param.getValue(ParamKey.ip); port = param.getValue(ParamKey.port, "3306"); dbname = param.getValue(ParamKey.dbname, ""); table = param.getValue(ParamKey.table, ""); pre = param.getValue(ParamKey.pre, ""); post = param.getValue(ParamKey.post, ""); insert = param.getValue(ParamKey.insert, ""); encoding = param.getValue(ParamKey.encoding, "UTF-8"); limit = param.getIntValue(ParamKey.limit, 1000); concurrency = param.getIntValue(ParamKey.concurrency, 1); writeColumns = param.getValue(ParamKey.writeColumns, ""); batchSize = param.getIntValue(ParamKey.batchSize, 50000); this.sourceUniqKey = DBSource.genKey(this.getClass(), host, port, dbname); this.host = param.getValue(ParamKey.ip); this.port = param.getValue(ParamKey.port, "3306"); this.dbname = param.getValue(ParamKey.dbname); return PluginStatus.SUCCESS.value(); } @Override public int prepare(PluginParam param) { this.init(); this.setParam(param); DBSource.register(this.sourceUniqKey, this.genProperties()); if (StringUtils.isBlank(this.pre)) return PluginStatus.SUCCESS.value(); Statement stmt = null; try { this.connection = DBSource.getConnection(this.sourceUniqKey); stmt = this.connection.createStatement( ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE); for (String subSql : this.pre.split(";")) { this.logger.info(String.format("Excute prepare sql %s .", subSql)); stmt.execute(subSql); } this.connection.commit(); return PluginStatus.SUCCESS.value(); } catch (Exception e) { throw new DataExchangeException(e.getCause()); } finally { try { if (null != stmt) { stmt.close(); } if (null != this.connection) { this.connection.close(); this.connection = null; } } catch (SQLException e) { } } } @Override public int connect() { return PluginStatus.SUCCESS.value(); } @Override public int startWrite(LineReceiver receiver) { PreparedStatement ps = null; try { this.connection = DBSource.getConnection(this.sourceUniqKey); Line line = null; int lines = 0; line = receiver.getFromReader(); if (line == null) {// 讀取數據為空的情況。 return PluginStatus.SUCCESS.value(); } this.insert = buildInsertStr(line); ps = this.connection.prepareStatement(this.insert, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE); this.connection.setAutoCommit(false); while (line != null) { try { for (int i = 0; i < line.getFieldNum(); i++) { ps.setObject(i + 1, line.getField(i)); } ps.execute(); } catch (SQLException e) { logger.error("Invalid Data: " + line.toString(',')); logger.error(e.getMessage()); failCount++; if (failCount >= this.limit) { logger.error("出錯條數 (" + failCount + ") 超過限制條數。"); e.printStackTrace(); throw new DataExchangeException(e); } else { continue; } } if (lines++ == this.batchSize) { logger.info(lines + " committed by worker " + Thread.currentThread().getName() + " ."); lines = 0; this.connection.commit(); } line = receiver.getFromReader(); } this.connection.commit(); this.connection.setAutoCommit(true); this.getMonitor().setFailedLines(this.failCount); return PluginStatus.SUCCESS.value(); } catch (Exception e2) { throw new DataExchangeException(e2.getCause()); } finally { if (null != ps) try { ps.close(); } catch (SQLException e3) { } } } @Override public int post(PluginParam param) { if (StringUtils.isBlank(this.post)) return PluginStatus.SUCCESS.value(); Statement stmt = null; try { this.connection = DBSource.getConnection(this.sourceUniqKey); stmt = this.connection.createStatement( ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_UPDATABLE); for (String subSql : this.post.split(";")) { this.logger.info(String.format("Excute prepare sql %s .", subSql)); stmt.execute(subSql); } return PluginStatus.SUCCESS.value(); } catch (Exception e) { e.printStackTrace(); throw new DataExchangeException(e.getCause()); } finally { try { if (null != stmt) { stmt.close(); } if (null != this.connection) { this.connection.close(); this.connection = null; } } catch (Exception e2) { } } } @Override public List<PluginParam> split(PluginParam param) { OracleJdbcWriterSplitter splitter = new OracleJdbcWriterSplitter(); splitter.setParam(param); splitter.init(); return splitter.split(); } @Override public int commit() { return PluginStatus.SUCCESS.value(); } @Override public int finish() { return PluginStatus.SUCCESS.value(); } private Properties genProperties() { Properties p = new Properties(); p.setProperty("driverClassName", this.DRIVER_NAME); String url = "jdbc:oracle:thin:@" + this.host + ":" + this.port + "/" + this.dbname; p.setProperty("url", url); p.setProperty("username", this.username); p.setProperty("password", this.password); p.setProperty("maxActive", String.valueOf(this.concurrency + 2)); return p; } private String buildInsertStr(Line line) { String sql = ""; String[] subcos = null; // 判斷到底是全部插入表還是部分插入,最終確定sql if (!writeColumns.equals("")) { // 如果是部分插入的話 sql = "insert into " + table + "("; subcos = writeColumns.split(","); sql += writeColumns; sql += ") values("; for (int i = 0; i < subcos.length; i++) { sql += "?"; sql += ","; } sql = sql.substring(0, sql.length() - 1); sql += ")"; } else { sql = "insert into " + table + " values("; for (int i = 0; i < line.getFieldNum(); i++) { sql += "?"; sql += ","; } sql = sql.substring(0, sql.length() - 1); sql += ")"; } return sql; } }