1.首先機器要求8核,不然可能會慢點
2.數據庫建表的時候,最后建那種nologging類型的表,不然歸檔日志滿了,數據庫入庫會很慢,甚至丟數據,因為數據量很大,我們不可能一次性提交所有數據,只能分批提交
package com.ztesoft.interfaces.predeal.util; import com.ztesoft.interfaces.predeal.bl.IHandle; import java.io.ByteArrayOutputStream; import java.io.RandomAccessFile; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicLong; /** * @ProjectName: cutter-point * @Package: io.bigData * @ClassName: CsvBigTask * @Author: xiaof * @Description: ${description} * @Date: 2019/3/8 11:03 * @Version: 1.0 */ public class CsvBigTask implements Runnable { //攔截器 private CyclicBarrier cyclicBarrier; private AtomicLong atomicLong; //查詢統計個數 private long start; //文件讀取起始位置 private long totalSize; //結束位置 private int buffSize; private byte[] buff; //讀取緩沖大小 private RandomAccessFile randomAccessFile; //隨機讀取文件 private IHandle iHandle; //接口對象,用來實現業務邏輯 private List tempData; public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, long start, long totalSize, int buffSize, RandomAccessFile randomAccessFile, IHandle iHandle) { this.cyclicBarrier = cyclicBarrier; this.atomicLong = atomicLong; this.start = start; this.totalSize = totalSize; this.buffSize = buffSize; this.buff = new byte[buffSize]; this.randomAccessFile = randomAccessFile; this.iHandle = iHandle; } public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, PartitionPair partitionPair, int buffSize, RandomAccessFile randomAccessFile, IHandle iHandle) { this.cyclicBarrier = cyclicBarrier; this.atomicLong = atomicLong; this.start = partitionPair.getStart(); this.totalSize = partitionPair.getEnd() - partitionPair.getStart() + 1; this.buffSize = buffSize; this.buff = new byte[buffSize]; this.randomAccessFile = randomAccessFile; this.iHandle = iHandle; } public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, PartitionPair partitionPair, int buffSize, RandomAccessFile randomAccessFile, List tempData, IHandle iHandle) { this.cyclicBarrier = cyclicBarrier; this.atomicLong = atomicLong; this.start = partitionPair.getStart(); this.totalSize = partitionPair.getEnd() - partitionPair.getStart() + 1; this.buffSize = buffSize; this.buff = new byte[buffSize]; this.randomAccessFile = randomAccessFile; this.iHandle = iHandle; this.tempData = tempData; } @Override public void run() { MappedByteBuffer mappedByteBuffer = null; //1.讀取文件映射到內存中 try { //只讀模式,不需要加鎖,因為不涉及到資源的共享 mappedByteBuffer = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_ONLY, start, this.totalSize); //2.讀取指定內存大小,並判斷是否有進行換行 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); //依次循環讀取一定緩存的數據量 for(int i = 0; i < totalSize; i += buffSize) { //確定會讀取的數據 int realReadLength = 0; if(i + buffSize > totalSize) { //如果超出范圍 realReadLength = (int) (totalSize - i); } else { realReadLength = buffSize; } //3.如果進行了換行了,那么就清空一次輸出,輸出一行數據 //讀取一次數據,這里為0的原因是randomAccessFile會進行seek位置起始的索引 //並且get之后,buffer會更新當前位置索引 mappedByteBuffer.get(buff, 0, realReadLength); //遍歷這個buf,確定是否需要進行調用業務邏輯 for(int j = 0; j < realReadLength; ++j) { //遍歷每一個字符,判斷是不是換行,如果遍歷到了換行符,那么就進行處理 byte temp = buff[j]; if(temp == '\n' || temp == '\r') { //這里要進行非空校驗 String result = byteArrayOutputStream.toString("gbk"); if(result != null && !result.equals("")) { iHandle.handle(result, false, tempData); atomicLong.incrementAndGet(); } //輸出之后,置空文件 byteArrayOutputStream.reset(); } else if (temp == 0) { break; } else { //如果不是換行符那么就把這個數據放入輸出流緩存中 byteArrayOutputStream.write(temp); } } } //4.最后清空一次緩沖數據,循環結束之后,如果output對象中還有數據沒有清空,說明那就是最后一行 if(byteArrayOutputStream.size() > 0) { String result = byteArrayOutputStream.toString("gbk"); if(result != null && !result.equals("")) { iHandle.handle(result, true, tempData); atomicLong.incrementAndGet(); } //輸出之后,置空文件 byteArrayOutputStream.reset(); } else { //通知最后一行,如果為空 iHandle.handle("", true, tempData); } //5.柵欄最后攔截完成 cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } }
業務邏輯實現接口類
package com.ztesoft.interfaces.predeal.bl; import java.util.List; /** * * @program: * @description: * @auther: xiaof * @date: 2019/3/1 18:08 */ public interface IHandle { void handle(String line, boolean lastLine, List list); }
一些輔助類,可要可不要,看業務邏輯
package com.ztesoft.interfaces.predeal.util; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; /** * @ProjectName: cutter-point * @Package: io.util * @ClassName: ConcurrentDateUtil * @Author: xiaof * @Description: ${description} * @Date: 2019/2/27 11:30 * @Version: 1.0 */ public class ConcurrentDateUtil { private static ThreadLocal<DateFormat> threadLocal = new ThreadLocal<DateFormat>(){ @Override protected DateFormat initialValue() { //("yyyy/MM/dd HH:mm:ss"); return new SimpleDateFormat("yyyyMMdd"); } }; private static ThreadLocal<DateFormat> threadLocalDateDir = new ThreadLocal<DateFormat>(){ @Override protected DateFormat initialValue() { //("yyyy/MM/dd HH:mm:ss"); return new SimpleDateFormat("yyyy/MM/dd"); } }; private static ThreadLocal<DateFormat> threadDatabase = new ThreadLocal<DateFormat>(){ @Override protected DateFormat initialValue() { //("yyyy/MM/dd HH:mm:ss"); return new SimpleDateFormat("yyyyMMddHHmmss"); } }; private static ThreadLocal<DateFormat> threadResourceFile = new ThreadLocal<DateFormat>(){ @Override protected DateFormat initialValue() { //("yyyy/MM/dd HH:mm:ss"); return new SimpleDateFormat("yyyyMMdd000000"); } }; public static Date parse(String dateStr) throws ParseException { return threadLocal.get().parse(dateStr); } public static Date parseDatabase(String dateStr) throws ParseException { return threadDatabase.get().parse(dateStr); } public static String format(Date date) { return threadLocal.get().format(date); } public static String formatDateDir(Date date) { return threadLocalDateDir.get().format(date); } public static Date parseDateDir(String dateStr) throws ParseException { return threadLocalDateDir.get().parse(dateStr); } public static String formatResourceFile(Date date) { return threadResourceFile.get().format(date); } }
package com.ztesoft.interfaces.predeal.util; import java.io.*; import java.nio.charset.Charset; import java.util.Enumeration; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; /** * @ProjectName: cutter-point * @Package: io.util * @ClassName: CreateFileUtil * @Author: xiaof * @Description: ${description} * @Date: 2019/3/1 17:23 * @Version: 1.0 */ public class CreateFileUtil { public static boolean createFile(File file) throws IOException { if(file.getParentFile().exists()) { //如果上級存在,那么直接創建 return file.createNewFile(); } else { file.getParentFile().mkdir(); return createFile(file); } } public static void unZipFile() { } /** * * @program: cn.cutter.common.util.ZipUtil * @description: 解壓單個文件到當前目錄 * @auther: xiaof * @date: 2019/3/3 13:33 */ public static String unZipSingleFileCurrentDir(File zipFile) throws IOException { //1.獲取解壓文件 ZipFile zipFile1 = new ZipFile(zipFile); String fileName = ""; //2.循環壓縮文件中的文件內容 for(Enumeration enumeration = zipFile1.entries(); enumeration.hasMoreElements();) { //3.獲取輸出路徑,也即是文件的父級目錄 ZipEntry entry = (ZipEntry) enumeration.nextElement(); fileName = entry.getName(); // 判斷路徑是否存在,不存在則創建文件路徑 InputStream in = zipFile1.getInputStream(entry); String outPath = zipFile.getParentFile().getPath(); // 判斷路徑是否存在,不存在則創建文件路徑 File fileDir = zipFile.getParentFile(); if (!fileDir.exists()) { fileDir.mkdirs(); } //4.輸出文件信息到當前目錄 FileOutputStream out = new FileOutputStream((outPath + "/" + fileName).replaceAll("\\*", "/")); byte[] buf1 = new byte[1024]; int len; while ((len = in.read(buf1)) > 0) { out.write(buf1, 0, len); } in.close(); out.close(); } return fileName; } }
package com.ztesoft.interfaces.predeal.util; import org.apache.log4j.Logger; import java.io.*; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.sql.Date; import java.text.ParseException; import java.util.HashSet; import java.util.Set; /** * @ProjectName: cutter-point * @Package: io.util * @ClassName: DataUtil * @Author: xiaof * @Description: ${description} * @Date: 2019/2/21 14:38 * @Version: 1.0 */ public class DataUtil { private final static Logger logger = Logger.getLogger(DataUtil.class); /** * * @program: io.util.DataUtil * @description: 對數據進行分區 * @auther: xiaof * @date: 2019/2/21 14:39 */ public static void partition(long start, long length, long totalSize, RandomAccessFile randomAccessFile, Set partitionPairs) throws IOException { if(start > totalSize - 1) { return; } //每次獲取length長度,並判斷這個位置是否是換行符 PartitionPair partitionPair = new PartitionPair(); partitionPair.setStart(start); //判斷這個length是否是換行符 long index = start + length; //遞歸終止條件 if(index > totalSize - 1) { //最后一個遞歸終止 partitionPair.setEnd(totalSize - 1); partitionPairs.add(partitionPair); } else { //設置位置並讀取一個字節 randomAccessFile.seek(index); byte oneByte = randomAccessFile.readByte(); //判斷是否是換行符號,如果不是換行符,那么讀取到換行符為止 while(oneByte != '\n' && oneByte != '\r') { //不能越界 if(++index > totalSize - 1) { index = totalSize-1; break; } randomAccessFile.seek(index); oneByte = randomAccessFile.readByte(); } partitionPair.setEnd(index); //遞歸下一個位置 partitionPairs.add(partitionPair); partition(index + 1, length, totalSize, randomAccessFile, partitionPairs); } } /** * * @program: io.util.DataUtil * @description: 分片數據 * @auther: xiaof * @date: 2019/2/22 15:20 */ public static Set partition(long start, long length, long totalSize, RandomAccessFile randomAccessFile) throws IOException { if(start > totalSize - 1) { return null; } //每次獲取length長度,並判斷這個位置是否是換行符 Set partitionPairs = new HashSet(); PartitionPair partitionPair = new PartitionPair(); partitionPair.setStart(start); //判斷這個length是否是換行符 long index = start + length; //遞歸終止條件 if(index > totalSize - 1) { //最后一個遞歸終止 partitionPair.setEnd(totalSize - 1); partitionPairs.add(partitionPair); return partitionPairs; } else { //設置位置並讀取一個字節 randomAccessFile.seek(index); byte oneByte = randomAccessFile.readByte(); //判斷是否是換行符號,如果不是換行符,那么讀取到換行符為止 while(oneByte != '\n' && oneByte != '\r') { //不能越界 if(++index > totalSize - 1) { index = totalSize-1; break; } randomAccessFile.seek(index); oneByte = randomAccessFile.readByte(); } partitionPair.setEnd(index); //遞歸下一個位置 partitionPairs.add(partitionPair); partitionPairs.addAll(partition(index + 1, length, totalSize, randomAccessFile)); } return partitionPairs; } public static Date getSQLDateThreadSafe(String dateStr) throws ParseException { return new Date(ConcurrentDateUtil.parse(dateStr).getTime()); } /** * 復制單個文件,這里考慮使用文件鎖,保證線程安全 * @param oldPath String 原文件路徑 如:c:/fqf.txt * @param newPath String 復制后路徑 如:f:/fqf.txt * @return boolean */ public static void copyFile(String oldPath, String newPath) throws Exception { // int byteread = 0; File oldfile = new File(oldPath); if (oldfile.exists()) { //文件存在時 //對文件枷鎖,然后進行復制操作, InputStream inStream = new FileInputStream(oldPath); //讀入原文件 FileOutputStream fs = new FileOutputStream(newPath); FileChannel fileChannel = fs.getChannel(); //開始加鎖 FileLock fileLock = null; try { while (true) { fileLock = fileChannel.lock(); //直接上鎖 if(fileLock != null) { break; } else { //文件無法被鎖定,1s后嘗試 logger.warn(oldPath + " 文件無法被鎖定,1s后嘗試"); Thread.sleep(1000); } } //開始拷貝數據 byte[] buf = new byte[2048]; int len = 0; while((len = inStream.read(buf)) != -1) { fs.write(buf, 0, len); } //刷新 fileLock.release(); fs.flush(); fs.close(); inStream.close(); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { if(fileLock.isValid()) { fileLock.release(); } } } } /** * * @program: com.ztesoft.interfaces.predeal.util.DataUtil * @description: 刪除文件 * @auther: xiaof * @date: 2019/3/5 18:08 */ public static void deletFile(String filePath) { File file = new File(filePath); file.delete(); } }
package com.ztesoft.interfaces.predeal.util; /** * @ProjectName: cutter-point * @Package: io.util * @ClassName: PartitionPair * @Author: xiaof * @Description: ${description} * @Date: 2019/2/21 14:36 * @Version: 1.0 */ public class PartitionPair { private long start; private long end; @Override public String toString() { return "start="+start+";end="+end; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + (int) (end ^ (end >>> 32)); result = prime * result + (int) (start ^ (start >>> 32)); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; PartitionPair other = (PartitionPair) obj; if (end != other.end) return false; return start == other.start; } public long getStart() { return start; } public void setStart(long start) { this.start = start; } public long getEnd() { return end; } public void setEnd(long end) { this.end = end; } }
這里開始,我們實戰使用這個方法解析入庫
package com.ztesoft.interfaces.predeal.bl; import com.ztesoft.interfaces.common.bll.CommonQueuePool; import com.ztesoft.interfaces.common.vo.CommonQueueVo; import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant; import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao; import com.ztesoft.isa.service.common.util.SpringContextUtil; import org.apache.commons.collections.MapUtils; import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @ProjectName: 湖北移動智慧裝維支撐系統 * @Package: com.ztesoft.interfaces.predeal.bl * @ClassName: PreDealResourceServer * @Author: xiaof * @Description: 預處理綜資ftp服務數據同步 * @Date: 2019/3/8 16:21 * @Version: 1.0 */ public class PreDealResourceServer extends Thread { private Logger logger = Logger.getLogger(PreDealResourceServer.class); private static Map<String, Object> config = new HashMap<>(); //獲取dao層操作類 private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao"); private CommonQueuePool commonQueuePool; //配置線程池數量 private ScheduledExecutorService service; @Override public void run() { // 1.啟動定時線程,設置定時啟動時間(通過定時器),要求定時可配置,最好實時生效(這里目前考慮取消定時器,重新生成定時器) Map paramMap = new HashMap(); paramMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE); paramMap.put("pkey", PreDealResourceConstrant.SERVER_KEY_INFO); paramMap = preDealResourceDao.qryPreDealResourceConfig(paramMap); // codea-線程池數量 // codeb-隊列長度 service = Executors.newScheduledThreadPool(MapUtils.getInteger(paramMap, "CODEA", 8)); commonQueuePool = new CommonQueuePool<CommonQueueVo>(MapUtils.getInteger(paramMap, "CODEB", 3000)); //啟動資源信息生產者 this.startResourceProducer(); //啟動資源信息消費者 this.startResourceConsum(); } private void startResourceProducer() { //設置生產線程&消費線程 Map producerMap = new HashMap(); producerMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE); producerMap.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_THREAD_KEY_INFO); producerMap = preDealResourceDao.qryPreDealResourceConfig(producerMap); //獲取啟動時間,和獲取間隔時間 int initialDelay = MapUtils.getInteger(producerMap, "CODEA", 30); int period = MapUtils.getInteger(producerMap, "CODEB", 86400); PreDealResourceProducer preDealResourceProducer = new PreDealResourceProducer(commonQueuePool); Future preDealResourceUserFuture = service.scheduleAtFixedRate(preDealResourceProducer, initialDelay, period, TimeUnit.SECONDS); //吧結果存放進入map中,以便后面更新間隔時間 List preDealAAAUserFutureList = new ArrayList(); preDealAAAUserFutureList.add(preDealResourceUserFuture); config.put(MapUtils.getString(producerMap, "CODEC", "ProducerResourceThread"), preDealAAAUserFutureList); } private void startResourceConsum() { //啟動消費者 Map consumMap = new HashMap(); consumMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE); consumMap.put("pkey", PreDealResourceConstrant.CONSUM_RESOURCE_THREAD_KEY_INFO); consumMap = preDealResourceDao.qryPreDealResourceConfig(consumMap); int initialDelay = MapUtils.getInteger(consumMap, "CODEA", 30); int threadNum = MapUtils.getInteger(consumMap, "CODEB", 3); List preDealAAAUserFutureList = new ArrayList(); for(int i = 0; i < threadNum; ++i) { PreDealResourceConsum preDealResourceConsum = new PreDealResourceConsum(commonQueuePool); Future future = service.schedule(preDealResourceConsum, initialDelay, TimeUnit.SECONDS); //吧結果存放進入map中,以便后面更新間隔時間 preDealAAAUserFutureList.add(future); } config.put(MapUtils.getString(consumMap, "CODEC", "ConsumResourceThread"), preDealAAAUserFutureList); } /** * 啟動,多態方式,避免重復啟動 */ public void start(String... args) { this.start(); } public static Map<String, Object> getConfig() { return config; } public static void setConfig(Map<String, Object> config) { PreDealResourceServer.config = config; } }
package com.ztesoft.interfaces.predeal.bl; import com.ztesoft.interfaces.common.bll.CommonQueuePool; import com.ztesoft.interfaces.common.invoke.FtpInvoke; import com.ztesoft.interfaces.common.util.FtpTemplate; import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant; import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao; import com.ztesoft.interfaces.predeal.dto.OmPredealSyncLogDto; import com.ztesoft.interfaces.predeal.util.ConcurrentDateUtil; import com.ztesoft.interfaces.predeal.util.DataUtil; import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo; import com.ztesoft.isa.service.common.util.SpringContextUtil; import com.ztesoft.services.common.CommonHelper; import org.apache.commons.collections.MapUtils; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPFile; import org.apache.log4j.Logger; import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.util.*; /** * @ProjectName: 湖北移動智慧裝維支撐系統 * @Package: com.ztesoft.interfaces.predeal.bl * @ClassName: PreDealResourceProducer * @Author: xiaof * @Description: ${description} * @Date: 2019/3/10 15:03 * @Version: 1.0 */ public class PreDealResourceProducer implements Runnable { private final static Logger logger = Logger.getLogger(PreDealResourceProducer.class); //這個是要用來控制多線的隊列 private final CommonQueuePool commonQueuePool; //獲取dao層操作類 private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao"); public PreDealResourceProducer(CommonQueuePool commonQueuePool) { this.commonQueuePool = commonQueuePool; } @Override public void run() { // 2.定時啟動之后,掃描ftp文件,下載到本地(並記錄數據),(要求本地上傳如果有文件,要能直接開始入庫,不用通過遠程服務器) try { //1.ftp連接服務器,獲取AAAftp服務器 Map paramMap = new HashMap(); paramMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE); paramMap.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_KEY); paramMap = preDealResourceDao.qryPreDealResourceConfig(paramMap); String resourceIp = MapUtils.getString(paramMap, "CODEA", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_IP); int resourcePort = MapUtils.getInteger(paramMap, "CODEB", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_PORT); String resourceUserName = MapUtils.getString(paramMap, "CODEC", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_USERNAME); String resourcePasswd = MapUtils.getString(paramMap, "CODED", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_PASSWD); String resourceRemoteDir = MapUtils.getString(paramMap, "CODEE", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_REMOTEDIR); String resourceLocalDir = MapUtils.getString(paramMap, "CODEH", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_LOCALDIR); String resourceDeleteMark = MapUtils.getString(paramMap, "CODEG", "0"); //PRODUCER_RESOURCE_CONSUM_LOCALDIR String resourceConsumLocalDir = MapUtils.getString(paramMap, "CODEI", PreDealResourceConstrant.PRODUCER_RESOURCE_CONSUM_LOCALDIR); // resourceLocalDir = "D:\\湖北移動\\任務\\預處理\\綜資\\ftp文件\\sync"; // resourceConsumLocalDir = resourceLocalDir + "\\consum"; //獲取需要下載的文件目錄,不在包含的文件里面的,那么就不用下載 Map paramMapFile = new HashMap(); paramMapFile.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE); paramMapFile.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_RULE); paramMapFile = preDealResourceDao.qryPreDealResourceConfig(paramMapFile); String fileNameContain = MapUtils.getString(paramMapFile, "CODEA", ""); String endWith = MapUtils.getString(paramMapFile, "CODEB", ""); //1.獲取指定時間 FtpTemplate ftpTemplate = new FtpTemplate(resourceIp, resourcePort, resourceUserName, resourcePasswd); //2.下載文件進入本地服務器,並刪除服務器上文件(刪除操作,我們做個開關,並且只有所有數據下載完成才能刪除) try { ftpTemplate.operatorByPathAndName(resourceRemoteDir, resourceLocalDir, new FtpInvoke() { @Override public void doOperator(FTPClient ftp, String remoteDir, String localDir) throws Exception { // 轉移到FTP服務器目錄至指定的目錄下 //1.獲取遠程文件,並存放進入隊列匯總等待處理 //2.存放進入本地路徑 //設置被動模式,服務端開端口給我用 ftp.enterLocalPassiveMode(); //獲取所有文件 FTPFile[] ftpFile = ftp.listFiles(remoteDir); //建立輸出到本地文件的輸出流 OutputStream outputStream = null; for(int i = 0; i < ftpFile.length; ++i) { String ftpFileName = ftpFile[i].getName(); //判斷文件過濾規則 String localFilePath = localDir + "/" + ftpFileName; File file = new File(localFilePath); //判斷本地是否已經存在,如果存在,那么也不用獲取,並且只獲取 if(fileNameContain.contains(ftpFileName.substring(0, ftpFileName.lastIndexOf("_"))) && !file.exists() && ftpFileName.contains(endWith)) { //判斷是話單文件 String filePath = remoteDir + "/" + ftpFileName; //下載文件 outputStream = new FileOutputStream(localDir + "/" + ftpFileName); ftp.retrieveFile(filePath, outputStream); logger.info("下載文件:" + ftpFileName + " 完成"); if(outputStream != null) { outputStream.flush(); outputStream.close(); outputStream = null; } } } if(outputStream != null) { outputStream.flush(); outputStream.close(); outputStream = null; } } }); } catch (Exception e) { logger.error(e.getMessage(), e); } //3.下載到本地之后,我們遍歷本地所有文件 File localDir = new File(resourceLocalDir); //獲取本地文件的所有文件 if(!localDir.exists()) { localDir.mkdirs(); } //4.獲取所有文件(不是目錄)之后,解析文件名,並依次吧數據送入隊列中, File[] recFiles = localDir.listFiles(); for(int i = 0; i < recFiles.length; ++i) { if(recFiles[i].isDirectory()) { continue; //目錄不操作 } //移動文件數據到相應的日期目錄 String fileName = recFiles[i].getName(); String filePath = recFiles[i].getPath(); Calendar calendar = Calendar.getInstance();//獲取當前日期 String consumLocalDir = resourceConsumLocalDir + "/" + ConcurrentDateUtil.formatDateDir(calendar.getTime()); File file = new File(consumLocalDir); if(!file.exists()) { file.mkdirs(); } //5.記錄同步日志記錄 //入庫存放一條記錄 PreDealResourceVo preDealResourceVo = new PreDealResourceVo(); String desPath = consumLocalDir + "/" + fileName; long omPredealSyncLogSeq = CommonHelper.getCommonDAO().getSeqNextVal(PreDealResourceConstrant.OM_PREDEAL_SYNC_LOG_SEQ); OmPredealSyncLogDto omPredealSyncLogDto = new OmPredealSyncLogDto(); omPredealSyncLogDto.setId(String.valueOf(omPredealSyncLogSeq)); omPredealSyncLogDto.setCreateTime(new Date()); omPredealSyncLogDto.setUpdateTime(new Date()); omPredealSyncLogDto.setState("0"); omPredealSyncLogDto.setTimes("0"); omPredealSyncLogDto.setLogName(recFiles[i].getName() + "資源信息同步"); omPredealSyncLogDto.setLogType(PreDealResourceConstrant.PRE_DEAL_RESOURCE_LOG_TYPE); omPredealSyncLogDto.setRemark(filePath); omPredealSyncLogDto.add(); //先交給消費者消費,只有當消費完畢,我們才移動和刪除文件 preDealResourceVo.setOmPredealSyncLogDto(omPredealSyncLogDto); preDealResourceVo.setFileName(recFiles[i].getName()); preDealResourceVo.setFileFullPath(desPath); //移動數據進入對應的消費目錄 DataUtil.copyFile(filePath, desPath); DataUtil.deletFile(filePath); commonQueuePool.put(preDealResourceVo); } //更新配置信息,由於文件是生成前就會刪除,所以這個邏輯就不做了,有文件就全下載下來 paramMapFile.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE); paramMapFile.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_RULE); //更新為今天 paramMapFile.put("codeb", ConcurrentDateUtil.formatResourceFile(new Date())); preDealResourceDao.updateConfig(paramMapFile); } catch (Exception e) { logger.error(e.getMessage(), e); } } }
package com.ztesoft.interfaces.predeal.bl; import com.ztesoft.interfaces.common.bll.CommonQueuePool; import com.ztesoft.interfaces.common.bll.CommonQueueWork; import com.ztesoft.interfaces.common.vo.CommonQueueVo; import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo; import org.apache.log4j.Logger; /** * @ProjectName: 湖北移動智慧裝維支撐系統 * @Package: com.ztesoft.interfaces.predeal.bl * @ClassName: PreDealResourceConsum * @Author: xiaof * @Description: ${description} * @Date: 2019/3/10 14:44 * @Version: 1.0 */ public class PreDealResourceConsum implements Runnable { private Logger logger = Logger.getLogger(PreDealResourceConsum.class); //這個是要用來控制多線的隊列 private final CommonQueuePool commonQueuePool; public PreDealResourceConsum(CommonQueuePool commonQueuePool) { this.commonQueuePool = commonQueuePool; } @Override public void run() { //只要不為空,那么就可以一直取 while(true) { //3.刪除遠端路徑文件(考慮延后處理,比如解析完成之后刪除),那么這里我們放消費者中處理 try { CommonQueueVo commonQueueVo = (CommonQueueVo) commonQueuePool.take(); CommonQueueWork commonQueueWork = null; //這里進行區分類型 if(commonQueueVo instanceof PreDealResourceVo) { commonQueueWork = new PreDealResourceWork(); } if(commonQueueWork != null) { commonQueueWork.doWork(commonQueueVo); } } catch (InterruptedException e) { // e.printStackTrace(); logger.error(e.getMessage(), e); } } } }
最后核心解析類實現
package com.ztesoft.interfaces.predeal.bl; import com.ztesoft.interfaces.common.bll.CommonQueueWork; import com.ztesoft.interfaces.common.vo.CommonQueueVo; import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant; import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao; import com.ztesoft.interfaces.predeal.dto.OmPredealSyncLogDto; import com.ztesoft.interfaces.predeal.util.CreateFileUtil; import com.ztesoft.interfaces.predeal.util.CsvBigTask; import com.ztesoft.interfaces.predeal.util.DataUtil; import com.ztesoft.interfaces.predeal.util.PartitionPair; import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo; import com.ztesoft.isa.service.common.util.SpringContextUtil; import org.apache.commons.collections.MapUtils; import org.apache.log4j.Logger; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.util.*; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicLong; /** * @ProjectName: 湖北移動智慧裝維支撐系統 * @Package: com.ztesoft.interfaces.predeal.bl * @ClassName: PreDealResourceWork * @Author: xiaof * @Description: 解析資源ftp文件,並准備入庫 * @Date: 2019/3/10 15:00 * @Version: 1.0 */ public class PreDealResourceWork implements CommonQueueWork { private final static Logger logger = Logger.getLogger(PreDealResourceWork.class); private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao"); private static ThreadLocal<List> tempDataThreadLocal = new ThreadLocal<List>() { @Override protected List initialValue() { return new ArrayList(); } }; @Override public void doWork(CommonQueueVo commonQueueVo) { PreDealResourceVo preDealResourceVo = (PreDealResourceVo) commonQueueVo; OmPredealSyncLogDto omPredealSyncLogDto = preDealResourceVo.getOmPredealSyncLogDto(); //1.獲取文件 logger.info("===========1開始解壓文件=======" + System.currentTimeMillis()); File zipFile = new File(preDealResourceVo.getFileFullPath()); //解壓文件 try { String fileName = CreateFileUtil.unZipSingleFileCurrentDir(zipFile); String tableName = this.getTableName(fileName); //2.判斷文件是否存在 logger.info("===========2判斷文件是否存在=======" + System.currentTimeMillis()); File resourceFile = new File(zipFile.getParent() + "/" + fileName); if(!resourceFile.exists()) { omPredealSyncLogDto.setState("0"); omPredealSyncLogDto.setUpdateTime(new Date()); omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + ""); omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + "\n" + resourceFile.getPath() + " 文件不存在"); return; } //3.獲取隨機文件索引 logger.info("===========3獲取隨機文件索引=======" + System.currentTimeMillis()); RandomAccessFile randomAccessFile = new RandomAccessFile(resourceFile, "r"); long length = resourceFile.length(); //文件大小1G,獲取處理器核心數 int availProcessors = Runtime.getRuntime().availableProcessors(); logger.info("===========3。1可使用線程=======" + availProcessors); long blockLength = length / availProcessors; byte b = '0'; int index = 0; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); while(b != '\r' && b != '\n') { b = randomAccessFile.readByte(); byteArrayOutputStream.write(b); ++index; } //獲取首行 String[] firstLine = byteArrayOutputStream.toString().split("\t"); //文件分片 Set pairSets = DataUtil.partition(index, blockLength, length, randomAccessFile); //4.數據分片之后,分別啟動處理線程 logger.info("===========4數據分片之后,分別啟動處理線程=======" + pairSets + " 時間:" + System.currentTimeMillis()); final long startTime = System.currentTimeMillis(); AtomicLong atomicLong = new AtomicLong(0); //6.數據入庫,這里做成可配置模式 logger.info("===========5數據入庫=======" + System.currentTimeMillis()); Map workConfigMap = new HashMap(); workConfigMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE); workConfigMap.put("pkey", PreDealResourceConstrant.PRE_DEAL_RESOURCE_WORK); workConfigMap = preDealResourceDao.qryPreDealResourceConfig(workConfigMap); int mapSize = MapUtils.getInteger(workConfigMap, "CODEA", 1000); int bufSize = MapUtils.getInteger(workConfigMap, "CODEB", 3) * 1024 * 1024; String splitMark = MapUtils.getString(workConfigMap, "CODEC", "\t"); //先清空表數據,備份數據放在文件中 initTable(tableName, omPredealSyncLogDto); CyclicBarrier cyclicBarrier = new CyclicBarrier(pairSets.size(), new Runnable() { @Override public void run() { //吧最后的數據提交上去 logger.info("===========5數據入庫結束=======" + System.currentTimeMillis()); omPredealSyncLogDto.setUpdateTime(new Date()); omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + ""); omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + "\n" + resourceFile.getPath() + " 入庫完畢 use time: "+(System.currentTimeMillis()-startTime)); omPredealSyncLogDto.setState("1"); omPredealSyncLogDto.setNewTotal(String.valueOf(atomicLong.get())); omPredealSyncLogDto.update(); } }); for(Object pair : pairSets) { // List tempData = new ArrayList<>(); PartitionPair partitionPair = (PartitionPair) pair; CsvBigTask csvBigTask = new CsvBigTask(cyclicBarrier, atomicLong, partitionPair, bufSize, randomAccessFile, new ArrayList(), new IHandle() { @Override public void handle(String line, boolean lastLine, List tempData) { try { //轉換數據為map // logger.info("===========5讀取數據=======" + line + System.currentTimeMillis()); if(!line.equals("")) { String[] elements = line.split(splitMark); tempData.add(elements); } //每個3000條一提交 if((tempData.size() % mapSize == 0 && tempData.size() >= mapSize) || lastLine) { preDealResourceDao.addResourceBatch(tableName, firstLine, tempData); tempData.clear(); // logger.info("===========6批量提交數據=======" + System.currentTimeMillis()); } } catch (Exception e) { logger.error(line + e.getMessage(), e); } } }); Thread thread = new Thread(csvBigTask); thread.start(); } } catch (IOException e) { omPredealSyncLogDto.setState("0"); omPredealSyncLogDto.setUpdateTime(new Date()); omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + ""); omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + "\n" + preDealResourceVo.getFileFullPath() + e.getMessage()); logger.error(e.getMessage(), e); } finally { //7.最后更新日志 logger.info("===========7最后更新日志=======" + System.currentTimeMillis()); omPredealSyncLogDto.update(); } } private String getTableName(String fileName) { String tableName = ""; tableName = PreDealResourceConstrant.PRE_RESOURCE_TABLE + "_" + fileName.substring(0, fileName.lastIndexOf("_")); return tableName; } private void initTable(String tableName, OmPredealSyncLogDto omPredealSyncLogDto) { preDealResourceDao.truncateResourceTable(omPredealSyncLogDto, tableName); } }
最后公布一小段入庫代碼,這個就不全給了
PreDealResourceDaoImpl
@Override public void addResourceBatch(String tableName, String[] fields, List params) { try { // String sql = "insert into PRE_AAA_STAFF (domain_name,internet_account,brasid,band_info,registTime,FirstUseTime,lastLoginTime,status,bandwidth_M,broadband_account,CREATE_DATE,UPDATE_DATE)\n" + // "values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?,sysdate,sysdate) "; //組裝sql StringBuffer stringBuffer = new StringBuffer(" insert /*+ append */ into " + tableName + " ("); StringBuffer stringValue = new StringBuffer(" values ("); // Set<String> keySet = tableMap.keySet(); for(int i = 0; i < fields.length; ++i) { stringBuffer.append(fields[i] + ","); stringValue.append("?,"); } //添加時間 stringBuffer.append("create_date, update_date,"); stringValue.append("sysdate,sysdate,"); stringBuffer = stringBuffer.deleteCharAt(stringBuffer.length() - 1).append(" ) "); stringValue = stringValue.deleteCharAt(stringValue.length() - 1).append(")"); String sql = stringBuffer.append(stringValue).toString(); super.getJdbcOperations().batchUpdate(sql, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { Object[] values = (Object[]) params.get(i); for(int j = 0; j < fields.length; ++j) { if(j < values.length) { ps.setString(j + 1, String.valueOf(values[j])); } else { ps.setObject(j + 1, null); } } } @Override public int getBatchSize() { return params.size(); } }); logger.info("====" + tableName + "入庫 " + params.size() + " 條"); } catch (DataAccessException e) { logger.error(e.getMessage(), e); } }