集團內的業務數據以前是采用 地區采集—集團清洗-分發地區的ETL流程,自從集團成立軟件公司以后,子公司需要的業務數據都必須向集團申請而來,但是業務系統底層DC也沒提供相應的數據接口,所以就有了這次需求原型:從遠程FTP服務器上定時獲取txt數據文件,並將數據導入到本地Oracle數據庫。
每天需要從FTP下載的txt文件有40-50個,除第一次全量數據文件在10G左右較大、下載耗時較長外,后續的增量文件都在500M以內。
需要使用到的技術、工具:FTPClient、Java多線程、Oracle提供的sqlldr命令、Oracle提供Merge、spring的quartz定時包、EXTJS前台框架。
功能模塊的具體流程在此不做詳細介紹,主要記錄在實現過程中遇到的問題已經解決辦法:
下載文件:1、使用多線程從FTP下載文件時,需要每個線程開啟一個新的FTPClient,否則會出現FTP拒絕連接、鏈接超時等問題。
2、使用FTPClient的storeFile(remote, local)方法時,超過一定數量的文件之后就會出現程序阻塞、假死的現象,找了很多解決方案都不行,后來采用FTPClient的retrieveFileStream(remote)方法獲取輸入流手動循環read()保存txt文件之后,不存在假死、阻塞現象。以下為下載代碼:
1 public void run() { 2 try { 3 //ftp.enterLocalPassiveMode(); 切換到本地模式 4 ftp.setFileType(FTP.ASCII_FILE_TYPE); 5 6 File dirFile = new File(dir); 7 8 if(!dirFile.exists()){ 9 dirFile.mkdirs(); 10 } 11 File localFile = new File(dir+localFileName); 12 13 OutputStream os = new FileOutputStream(localFile); 14 15 InputStream is = ftp.retrieveFileStream(ftpFileName); 16 17 BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8")); 18 19 BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os, "GBK")); 20 21 int read = br.read(); 22 while(read!= -1){ 23 bw.write(read); 24 read = br.read(); 25 } 26 br.close(); 27 is.close(); 28 bw.flush(); 29 bw.close(); 30 os.close(); 31 //ftp.completePendingCommand(); 32 end = System.currentTimeMillis(); 33 34 //日志記錄 35 36 } catch (IOException e) { 37 //日志記錄 38 e.printStackTrace(); 39 } 40 }
執行sqlldr命令:
1、解決每張表的字段都存在需要格式化的轉換,所以通過每張表在文件系統中建立一個txt文件用來存取表頭字段信息,字段信息將用於生成sqlldr所需的ctl文件,表頭字段信息描述txt文件內容如下:
id,bill_no,bill_type,status,biz_type,ref_bill_no,ref_bill_type,ref_biz_type,company_no,order_unit_no,order_unit_name,sys_no,supplier_no,supplier_name,store_no,store_name,order_no,contract_no,invoice_no,send_out_date date 'YYYY-MM-DD HH24:MI:SS',tax_rate,merchandiser,create_user,create_time timestamp 'YYYY-MM-DD HH24:MI:SS',auditor,audit_time timestamp 'YYYY-MM-DD HH24:MI:SS.ff9',remark,update_time timestamp 'YYYY-MM-DD HH24:MI:SS',trans_no,send_detail_total,zone_yyyymm,sharding_flag,is_imported,yw_update_time timestamp 'YYYY-MM-DD HH24:MI:SS'
sqlldr支持的具體轉換格式可自行在網上查詢。
2、sqlldr所需ctl文件內容格式:
load data characterset ZHS16GBK infile '數據文件位置' --可以是txt、csv... append into table '需導入的表名' fields terminated by '' --字段值之間的分隔字符,此處為\u0001 trailing nullcols 空字段自動賦值了null值 ("表頭字段名稱,名稱之間用‘,’號隔開,存在格式轉換的需要在字段名后追加轉換格式....")
3、生成sqlldr命令:
sqlldr 數據庫用戶名+"/"+數據庫密碼+"@"+數據庫服務名); control='...' //控制文件絕對路徑 bad='...' //異常數據文件保存路徑 log='...' //log文件保存路徑 parallel=true //並行 errors=0 //允許的錯誤行數 rows=5000 //5000行提交一次 skip=1 //跳過數據文件的第一行 bindsize=10000000 readsize=10000000 //緩存區大小
4、開啟多線程在命令行中執行sqlldr命令:
(待續)
