【並發】5、多線程並發解析單文件大數據了量解析入庫,1800萬數據8線程5分鍾入庫


1.首先機器要求8核,不然可能會慢點

2.數據庫建表的時候,最后建那種nologging類型的表,不然歸檔日志滿了,數據庫入庫會很慢,甚至丟數據,因為數據量很大,我們不可能一次性提交所有數據,只能分批提交

package com.ztesoft.interfaces.predeal.util;

import com.ztesoft.interfaces.predeal.bl.IHandle;

import java.io.ByteArrayOutputStream;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @ProjectName: cutter-point
 * @Package: io.bigData
 * @ClassName: CsvBigTask
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/3/8 11:03
 * @Version: 1.0
 */
public class CsvBigTask implements Runnable {

    //攔截器
    private CyclicBarrier cyclicBarrier;
    private AtomicLong atomicLong; //查詢統計個數
    private long start; //文件讀取起始位置
    private long totalSize; //結束位置
    private int buffSize;
    private byte[] buff; //讀取緩沖大小
    private RandomAccessFile randomAccessFile; //隨機讀取文件
    private IHandle iHandle; //接口對象,用來實現業務邏輯
    private List tempData;

    public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, long start, long totalSize, int buffSize, RandomAccessFile randomAccessFile, IHandle iHandle) {
        this.cyclicBarrier = cyclicBarrier;
        this.atomicLong = atomicLong;
        this.start = start;
        this.totalSize = totalSize;
        this.buffSize = buffSize;
        this.buff = new byte[buffSize];
        this.randomAccessFile = randomAccessFile;
        this.iHandle = iHandle;
    }

    public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, PartitionPair partitionPair, int buffSize, RandomAccessFile randomAccessFile, IHandle iHandle) {
        this.cyclicBarrier = cyclicBarrier;
        this.atomicLong = atomicLong;
        this.start = partitionPair.getStart();
        this.totalSize = partitionPair.getEnd() - partitionPair.getStart() + 1;
        this.buffSize = buffSize;
        this.buff = new byte[buffSize];
        this.randomAccessFile = randomAccessFile;
        this.iHandle = iHandle;
    }

    public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, PartitionPair partitionPair, int buffSize, RandomAccessFile randomAccessFile, List tempData, IHandle iHandle) {
        this.cyclicBarrier = cyclicBarrier;
        this.atomicLong = atomicLong;
        this.start = partitionPair.getStart();
        this.totalSize = partitionPair.getEnd() - partitionPair.getStart() + 1;
        this.buffSize = buffSize;
        this.buff = new byte[buffSize];
        this.randomAccessFile = randomAccessFile;
        this.iHandle = iHandle;
        this.tempData = tempData;
    }

    @Override
    public void run() {

        MappedByteBuffer mappedByteBuffer = null;
        //1.讀取文件映射到內存中
        try {
            //只讀模式,不需要加鎖,因為不涉及到資源的共享
            mappedByteBuffer = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_ONLY, start, this.totalSize);
            //2.讀取指定內存大小,並判斷是否有進行換行
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            //依次循環讀取一定緩存的數據量
            for(int i = 0; i < totalSize; i += buffSize) {
                //確定會讀取的數據
                int realReadLength = 0;
                if(i + buffSize > totalSize) {
                    //如果超出范圍
                    realReadLength = (int) (totalSize - i);
                } else {
                    realReadLength = buffSize;
                }
                //3.如果進行了換行了,那么就清空一次輸出,輸出一行數據
                //讀取一次數據,這里為0的原因是randomAccessFile會進行seek位置起始的索引
                //並且get之后,buffer會更新當前位置索引
                mappedByteBuffer.get(buff, 0, realReadLength);
                //遍歷這個buf,確定是否需要進行調用業務邏輯
                for(int j = 0; j < realReadLength; ++j) {
                    //遍歷每一個字符,判斷是不是換行,如果遍歷到了換行符,那么就進行處理
                    byte temp = buff[j];
                    if(temp == '\n' || temp == '\r') {
                        //這里要進行非空校驗
                        String result = byteArrayOutputStream.toString("gbk");
                        if(result != null && !result.equals("")) {
                            iHandle.handle(result, false, tempData);
                            atomicLong.incrementAndGet();
                        }
                        //輸出之后,置空文件
                        byteArrayOutputStream.reset();
                    } else if (temp == 0) {
                        break;
                    } else {
                        //如果不是換行符那么就把這個數據放入輸出流緩存中
                        byteArrayOutputStream.write(temp);
                    }
                }
            }

            //4.最后清空一次緩沖數據,循環結束之后,如果output對象中還有數據沒有清空,說明那就是最后一行
            if(byteArrayOutputStream.size() > 0) {
                String result = byteArrayOutputStream.toString("gbk");
                if(result != null && !result.equals("")) {
                    iHandle.handle(result, true, tempData);
                    atomicLong.incrementAndGet();
                }
                //輸出之后,置空文件
                byteArrayOutputStream.reset();
            } else {
                //通知最后一行,如果為空
                iHandle.handle("", true, tempData);
            }
            //5.柵欄最后攔截完成
            cyclicBarrier.await();

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

業務邏輯實現接口類

package com.ztesoft.interfaces.predeal.bl;

import java.util.List;

/**
 *
 * @program:
 * @description: 
 * @auther: xiaof
 * @date: 2019/3/1 18:08
 */
public interface IHandle {

    void handle(String line, boolean lastLine, List list);

}

 

一些輔助類,可要可不要,看業務邏輯

package com.ztesoft.interfaces.predeal.util;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @ProjectName: cutter-point
 * @Package: io.util
 * @ClassName: ConcurrentDateUtil
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/2/27 11:30
 * @Version: 1.0
 */
public class ConcurrentDateUtil {

    private static ThreadLocal<DateFormat> threadLocal = new ThreadLocal<DateFormat>(){
        @Override
        protected DateFormat initialValue() {
            //("yyyy/MM/dd HH:mm:ss");
            return new SimpleDateFormat("yyyyMMdd");
        }
    };

    private static ThreadLocal<DateFormat> threadLocalDateDir = new ThreadLocal<DateFormat>(){
        @Override
        protected DateFormat initialValue() {
            //("yyyy/MM/dd HH:mm:ss");
            return new SimpleDateFormat("yyyy/MM/dd");
        }
    };

    private static ThreadLocal<DateFormat> threadDatabase = new ThreadLocal<DateFormat>(){
        @Override
        protected DateFormat initialValue() {
            //("yyyy/MM/dd HH:mm:ss");
            return new SimpleDateFormat("yyyyMMddHHmmss");
        }
    };

    private static ThreadLocal<DateFormat> threadResourceFile = new ThreadLocal<DateFormat>(){
        @Override
        protected DateFormat initialValue() {
            //("yyyy/MM/dd HH:mm:ss");
            return new SimpleDateFormat("yyyyMMdd000000");
        }
    };

    public static Date parse(String dateStr) throws ParseException {
        return threadLocal.get().parse(dateStr);
    }

    public static Date parseDatabase(String dateStr) throws ParseException {
        return threadDatabase.get().parse(dateStr);
    }

    public static String format(Date date) {
        return threadLocal.get().format(date);
    }

    public static String formatDateDir(Date date) {
        return threadLocalDateDir.get().format(date);
    }

    public static Date parseDateDir(String dateStr) throws ParseException {
        return threadLocalDateDir.get().parse(dateStr);
    }

    public static String formatResourceFile(Date date) {
        return threadResourceFile.get().format(date);
    }
}

 

package com.ztesoft.interfaces.predeal.util;

import java.io.*;
import java.nio.charset.Charset;
import java.util.Enumeration;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;

/**
 * @ProjectName: cutter-point
 * @Package: io.util
 * @ClassName: CreateFileUtil
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/3/1 17:23
 * @Version: 1.0
 */
public class CreateFileUtil {

    public static boolean createFile(File file) throws IOException {

        if(file.getParentFile().exists()) {
            //如果上級存在,那么直接創建
            return file.createNewFile();
        } else {
            file.getParentFile().mkdir();
            return createFile(file);
        }
    }

    public static void unZipFile() {

    }


    /**
     *
     * @program: cn.cutter.common.util.ZipUtil
     * @description: 解壓單個文件到當前目錄
     * @auther: xiaof
     * @date: 2019/3/3 13:33
     */
    public static String unZipSingleFileCurrentDir(File zipFile) throws IOException {
        //1.獲取解壓文件
        ZipFile zipFile1 = new ZipFile(zipFile);
        String fileName = "";
        //2.循環壓縮文件中的文件內容
        for(Enumeration enumeration = zipFile1.entries(); enumeration.hasMoreElements();) {
            //3.獲取輸出路徑,也即是文件的父級目錄
            ZipEntry entry = (ZipEntry) enumeration.nextElement();
            fileName = entry.getName();
            // 判斷路徑是否存在,不存在則創建文件路徑
            InputStream in = zipFile1.getInputStream(entry);
            String outPath = zipFile.getParentFile().getPath();
            // 判斷路徑是否存在,不存在則創建文件路徑
            File fileDir = zipFile.getParentFile();
            if (!fileDir.exists()) {
                fileDir.mkdirs();
            }

            //4.輸出文件信息到當前目錄
            FileOutputStream out = new FileOutputStream((outPath + "/" + fileName).replaceAll("\\*", "/"));
            byte[] buf1 = new byte[1024];
            int len;
            while ((len = in.read(buf1)) > 0) {
                out.write(buf1, 0, len);
            }
            in.close();
            out.close();
        }

        return fileName;
    }


}

 

package com.ztesoft.interfaces.predeal.util;

import org.apache.log4j.Logger;

import java.io.*;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.sql.Date;
import java.text.ParseException;
import java.util.HashSet;
import java.util.Set;

/**
 * @ProjectName: cutter-point
 * @Package: io.util
 * @ClassName: DataUtil
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/2/21 14:38
 * @Version: 1.0
 */
public class DataUtil {

    private final static Logger logger = Logger.getLogger(DataUtil.class);

    /**
     *
     * @program: io.util.DataUtil
     * @description: 對數據進行分區
     * @auther: xiaof
     * @date: 2019/2/21 14:39
     */
    public static void partition(long start, long length, long totalSize, RandomAccessFile randomAccessFile, Set partitionPairs) throws IOException {
        if(start > totalSize - 1) {
            return;
        }

        //每次獲取length長度,並判斷這個位置是否是換行符
        PartitionPair partitionPair = new PartitionPair();
        partitionPair.setStart(start);
        //判斷這個length是否是換行符
        long index = start + length;

        //遞歸終止條件
        if(index > totalSize - 1) {
            //最后一個遞歸終止
            partitionPair.setEnd(totalSize - 1);
            partitionPairs.add(partitionPair);

        } else {
            //設置位置並讀取一個字節
            randomAccessFile.seek(index);
            byte oneByte = randomAccessFile.readByte();
            //判斷是否是換行符號,如果不是換行符,那么讀取到換行符為止
            while(oneByte != '\n' && oneByte != '\r') {
                //不能越界
                if(++index > totalSize - 1) {
                    index = totalSize-1;
                    break;
                }

                randomAccessFile.seek(index);
                oneByte = randomAccessFile.readByte();
            }

            partitionPair.setEnd(index);
            //遞歸下一個位置
            partitionPairs.add(partitionPair);

            partition(index + 1, length, totalSize, randomAccessFile, partitionPairs);
        }

    }


    /**
     *
     * @program: io.util.DataUtil
     * @description: 分片數據
     * @auther: xiaof
     * @date: 2019/2/22 15:20
     */
    public static Set partition(long start, long length, long totalSize, RandomAccessFile randomAccessFile) throws IOException {
        if(start > totalSize - 1) {
            return null;
        }

        //每次獲取length長度,並判斷這個位置是否是換行符
        Set partitionPairs = new HashSet();
        PartitionPair partitionPair = new PartitionPair();
        partitionPair.setStart(start);
        //判斷這個length是否是換行符
        long index = start + length;

        //遞歸終止條件
        if(index > totalSize - 1) {
            //最后一個遞歸終止
            partitionPair.setEnd(totalSize - 1);
            partitionPairs.add(partitionPair);
            return partitionPairs;

        } else {
            //設置位置並讀取一個字節
            randomAccessFile.seek(index);
            byte oneByte = randomAccessFile.readByte();
            //判斷是否是換行符號,如果不是換行符,那么讀取到換行符為止
            while(oneByte != '\n' && oneByte != '\r') {
                //不能越界
                if(++index > totalSize - 1) {
                    index = totalSize-1;
                    break;
                }

                randomAccessFile.seek(index);
                oneByte = randomAccessFile.readByte();
            }

            partitionPair.setEnd(index);
            //遞歸下一個位置
            partitionPairs.add(partitionPair);

            partitionPairs.addAll(partition(index + 1, length, totalSize, randomAccessFile));
        }

        return partitionPairs;
    }

    public static Date getSQLDateThreadSafe(String dateStr) throws ParseException {
        return new Date(ConcurrentDateUtil.parse(dateStr).getTime());
    }

    /**
     *  復制單個文件,這里考慮使用文件鎖,保證線程安全
     *  @param  oldPath  String  原文件路徑  如:c:/fqf.txt
     *  @param  newPath  String  復制后路徑  如:f:/fqf.txt
     *  @return  boolean
     */
    public static void copyFile(String  oldPath,  String  newPath) throws Exception {
//        int  byteread  =  0;
        File oldfile  =  new  File(oldPath);
        if  (oldfile.exists())  {  //文件存在時
            //對文件枷鎖,然后進行復制操作,
            InputStream inStream  =  new  FileInputStream(oldPath);  //讀入原文件
            FileOutputStream fs  =  new  FileOutputStream(newPath);
            FileChannel fileChannel = fs.getChannel();
            //開始加鎖
            FileLock fileLock = null;
            try {
                while (true) {
                    fileLock = fileChannel.lock(); //直接上鎖
                    if(fileLock != null) {
                        break;
                    } else {
                        //文件無法被鎖定,1s后嘗試
                        logger.warn(oldPath + " 文件無法被鎖定,1s后嘗試");
                        Thread.sleep(1000);
                    }
                }

                //開始拷貝數據
                byte[] buf = new byte[2048];
                int len = 0;
                while((len = inStream.read(buf)) != -1) {
                    fs.write(buf, 0, len);
                }

                //刷新
                fileLock.release();
                fs.flush();
                fs.close();
                inStream.close();

            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                if(fileLock.isValid()) {

                    fileLock.release();
                }
            }
        }

    }

    /**
     *
     * @program: com.ztesoft.interfaces.predeal.util.DataUtil
     * @description: 刪除文件
     * @auther: xiaof
     * @date: 2019/3/5 18:08
     */
    public static void deletFile(String filePath) {
        File file = new File(filePath);
        file.delete();
    }

}

 

package com.ztesoft.interfaces.predeal.util;

/**
 * @ProjectName: cutter-point
 * @Package: io.util
 * @ClassName: PartitionPair
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/2/21 14:36
 * @Version: 1.0
 */
public class PartitionPair {
    private long start;
    private long end;

    @Override
    public String toString() {
        return "start="+start+";end="+end;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + (int) (end ^ (end >>> 32));
        result = prime * result + (int) (start ^ (start >>> 32));
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        PartitionPair other = (PartitionPair) obj;
        if (end != other.end)
            return false;
        return start == other.start;
    }

    public long getStart() {
        return start;
    }

    public void setStart(long start) {
        this.start = start;
    }

    public long getEnd() {
        return end;
    }

    public void setEnd(long end) {
        this.end = end;
    }
}

 

這里開始,我們實戰使用這個方法解析入庫

package com.ztesoft.interfaces.predeal.bl;

import com.ztesoft.interfaces.common.bll.CommonQueuePool;
import com.ztesoft.interfaces.common.vo.CommonQueueVo;
import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant;
import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao;
import com.ztesoft.isa.service.common.util.SpringContextUtil;
import org.apache.commons.collections.MapUtils;
import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @ProjectName: 湖北移動智慧裝維支撐系統
 * @Package: com.ztesoft.interfaces.predeal.bl
 * @ClassName: PreDealResourceServer
 * @Author: xiaof
 * @Description: 預處理綜資ftp服務數據同步
 * @Date: 2019/3/8 16:21
 * @Version: 1.0
 */
public class PreDealResourceServer extends Thread {

    private Logger logger = Logger.getLogger(PreDealResourceServer.class);

    private static Map<String, Object> config = new HashMap<>();

    //獲取dao層操作類
    private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao");

    private CommonQueuePool commonQueuePool;

    //配置線程池數量
    private ScheduledExecutorService service;

    @Override
    public void run() {
        //        1.啟動定時線程,設置定時啟動時間(通過定時器),要求定時可配置,最好實時生效(這里目前考慮取消定時器,重新生成定時器)
        Map paramMap = new HashMap();
        paramMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
        paramMap.put("pkey", PreDealResourceConstrant.SERVER_KEY_INFO);
        paramMap = preDealResourceDao.qryPreDealResourceConfig(paramMap);
//            codea-線程池數量
//            codeb-隊列長度
        service = Executors.newScheduledThreadPool(MapUtils.getInteger(paramMap, "CODEA", 8));
        commonQueuePool = new CommonQueuePool<CommonQueueVo>(MapUtils.getInteger(paramMap, "CODEB", 3000));

        //啟動資源信息生產者
        this.startResourceProducer();
        //啟動資源信息消費者
        this.startResourceConsum();
    }

    private void startResourceProducer() {
        //設置生產線程&消費線程
        Map producerMap = new HashMap();
        producerMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
        producerMap.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_THREAD_KEY_INFO);
        producerMap = preDealResourceDao.qryPreDealResourceConfig(producerMap);
        //獲取啟動時間,和獲取間隔時間
        int initialDelay = MapUtils.getInteger(producerMap, "CODEA", 30);
        int period = MapUtils.getInteger(producerMap, "CODEB", 86400);
        PreDealResourceProducer preDealResourceProducer = new PreDealResourceProducer(commonQueuePool);
        Future preDealResourceUserFuture = service.scheduleAtFixedRate(preDealResourceProducer, initialDelay, period, TimeUnit.SECONDS);

        //吧結果存放進入map中,以便后面更新間隔時間
        List preDealAAAUserFutureList = new ArrayList();
        preDealAAAUserFutureList.add(preDealResourceUserFuture);
        config.put(MapUtils.getString(producerMap, "CODEC", "ProducerResourceThread"), preDealAAAUserFutureList);
    }

    private void startResourceConsum() {
        //啟動消費者
        Map consumMap = new HashMap();
        consumMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
        consumMap.put("pkey", PreDealResourceConstrant.CONSUM_RESOURCE_THREAD_KEY_INFO);
        consumMap = preDealResourceDao.qryPreDealResourceConfig(consumMap);
        int initialDelay = MapUtils.getInteger(consumMap, "CODEA", 30);
        int threadNum = MapUtils.getInteger(consumMap, "CODEB", 3);
        List preDealAAAUserFutureList = new ArrayList();
        for(int i = 0; i < threadNum; ++i) {
            PreDealResourceConsum preDealResourceConsum = new PreDealResourceConsum(commonQueuePool);
            Future future = service.schedule(preDealResourceConsum, initialDelay, TimeUnit.SECONDS);
            //吧結果存放進入map中,以便后面更新間隔時間
            preDealAAAUserFutureList.add(future);
        }
        config.put(MapUtils.getString(consumMap, "CODEC", "ConsumResourceThread"), preDealAAAUserFutureList);
    }

    /**
     * 啟動,多態方式,避免重復啟動
     */
    public void start(String... args) {
        this.start();
    }

    public static Map<String, Object> getConfig() {
        return config;
    }

    public static void setConfig(Map<String, Object> config) {
        PreDealResourceServer.config = config;
    }

}

 

package com.ztesoft.interfaces.predeal.bl;

import com.ztesoft.interfaces.common.bll.CommonQueuePool;
import com.ztesoft.interfaces.common.invoke.FtpInvoke;
import com.ztesoft.interfaces.common.util.FtpTemplate;
import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant;
import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao;
import com.ztesoft.interfaces.predeal.dto.OmPredealSyncLogDto;
import com.ztesoft.interfaces.predeal.util.ConcurrentDateUtil;
import com.ztesoft.interfaces.predeal.util.DataUtil;
import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo;
import com.ztesoft.isa.service.common.util.SpringContextUtil;
import com.ztesoft.services.common.CommonHelper;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.log4j.Logger;

import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.*;

/**
 * @ProjectName: 湖北移動智慧裝維支撐系統
 * @Package: com.ztesoft.interfaces.predeal.bl
 * @ClassName: PreDealResourceProducer
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/3/10 15:03
 * @Version: 1.0
 */
public class PreDealResourceProducer implements Runnable {


    private final static Logger logger = Logger.getLogger(PreDealResourceProducer.class);

    //這個是要用來控制多線的隊列
    private final CommonQueuePool commonQueuePool;

    //獲取dao層操作類
    private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao");

    public PreDealResourceProducer(CommonQueuePool commonQueuePool) {
        this.commonQueuePool = commonQueuePool;
    }

    @Override
    public void run() {
//        2.定時啟動之后,掃描ftp文件,下載到本地(並記錄數據),(要求本地上傳如果有文件,要能直接開始入庫,不用通過遠程服務器)
        try {
            //1.ftp連接服務器,獲取AAAftp服務器
            Map paramMap = new HashMap();
            paramMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
            paramMap.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_KEY);
            paramMap = preDealResourceDao.qryPreDealResourceConfig(paramMap);
            String resourceIp = MapUtils.getString(paramMap, "CODEA", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_IP);
            int resourcePort = MapUtils.getInteger(paramMap, "CODEB", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_PORT);
            String resourceUserName = MapUtils.getString(paramMap, "CODEC", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_USERNAME);
            String resourcePasswd = MapUtils.getString(paramMap, "CODED", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_PASSWD);
            String resourceRemoteDir = MapUtils.getString(paramMap, "CODEE", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_REMOTEDIR);
            String resourceLocalDir = MapUtils.getString(paramMap, "CODEH", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_LOCALDIR);
            String resourceDeleteMark = MapUtils.getString(paramMap, "CODEG", "0");
            //PRODUCER_RESOURCE_CONSUM_LOCALDIR
            String resourceConsumLocalDir = MapUtils.getString(paramMap, "CODEI", PreDealResourceConstrant.PRODUCER_RESOURCE_CONSUM_LOCALDIR);

//            resourceLocalDir = "D:\\湖北移動\\任務\\預處理\\綜資\\ftp文件\\sync";
//            resourceConsumLocalDir = resourceLocalDir + "\\consum";

            //獲取需要下載的文件目錄,不在包含的文件里面的,那么就不用下載
            Map paramMapFile = new HashMap();
            paramMapFile.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
            paramMapFile.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_RULE);
            paramMapFile = preDealResourceDao.qryPreDealResourceConfig(paramMapFile);
            String fileNameContain = MapUtils.getString(paramMapFile, "CODEA", "");
            String endWith = MapUtils.getString(paramMapFile, "CODEB", "");

            //1.獲取指定時間
            FtpTemplate ftpTemplate = new FtpTemplate(resourceIp, resourcePort, resourceUserName, resourcePasswd);
            //2.下載文件進入本地服務器,並刪除服務器上文件(刪除操作,我們做個開關,並且只有所有數據下載完成才能刪除)
            try {
                ftpTemplate.operatorByPathAndName(resourceRemoteDir, resourceLocalDir, new FtpInvoke() {
                    @Override
                    public void doOperator(FTPClient ftp, String remoteDir, String localDir) throws Exception {
                        // 轉移到FTP服務器目錄至指定的目錄下
                        //1.獲取遠程文件,並存放進入隊列匯總等待處理
                        //2.存放進入本地路徑
                        //設置被動模式,服務端開端口給我用
                        ftp.enterLocalPassiveMode();
                        //獲取所有文件
                        FTPFile[] ftpFile = ftp.listFiles(remoteDir);
                        //建立輸出到本地文件的輸出流
                        OutputStream outputStream = null;

                        for(int i = 0; i < ftpFile.length; ++i) {
                            String ftpFileName = ftpFile[i].getName();
                            //判斷文件過濾規則
                            String localFilePath = localDir + "/" + ftpFileName;
                            File file = new File(localFilePath);
                            //判斷本地是否已經存在,如果存在,那么也不用獲取,並且只獲取
                            if(fileNameContain.contains(ftpFileName.substring(0, ftpFileName.lastIndexOf("_")))
                                    && !file.exists() && ftpFileName.contains(endWith)) { //判斷是話單文件
                                String filePath = remoteDir + "/" + ftpFileName;
                                //下載文件
                                outputStream = new FileOutputStream(localDir + "/" + ftpFileName);
                                ftp.retrieveFile(filePath, outputStream);
                                logger.info("下載文件:" + ftpFileName + " 完成");
                                if(outputStream != null) {
                                    outputStream.flush();
                                    outputStream.close();
                                    outputStream = null;
                                }
                            }
                        }

                        if(outputStream != null) {
                            outputStream.flush();
                            outputStream.close();
                            outputStream = null;
                        }


                    }
                });
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
            //3.下載到本地之后,我們遍歷本地所有文件
            File localDir = new File(resourceLocalDir);
            //獲取本地文件的所有文件
            if(!localDir.exists()) {
                localDir.mkdirs();
            }
            //4.獲取所有文件(不是目錄)之后,解析文件名,並依次吧數據送入隊列中,
            File[] recFiles = localDir.listFiles();
            for(int i = 0; i < recFiles.length; ++i) {
                if(recFiles[i].isDirectory()) {
                    continue; //目錄不操作
                }
                //移動文件數據到相應的日期目錄
                String fileName = recFiles[i].getName();
                String filePath = recFiles[i].getPath();
                Calendar calendar = Calendar.getInstance();//獲取當前日期
                String consumLocalDir = resourceConsumLocalDir + "/" + ConcurrentDateUtil.formatDateDir(calendar.getTime());
                File file = new File(consumLocalDir);
                if(!file.exists()) {
                    file.mkdirs();
                }
                //5.記錄同步日志記錄
                //入庫存放一條記錄
                PreDealResourceVo preDealResourceVo = new PreDealResourceVo();
                String desPath = consumLocalDir + "/" + fileName;
                long omPredealSyncLogSeq = CommonHelper.getCommonDAO().getSeqNextVal(PreDealResourceConstrant.OM_PREDEAL_SYNC_LOG_SEQ);
                OmPredealSyncLogDto omPredealSyncLogDto = new OmPredealSyncLogDto();
                omPredealSyncLogDto.setId(String.valueOf(omPredealSyncLogSeq));
                omPredealSyncLogDto.setCreateTime(new Date());
                omPredealSyncLogDto.setUpdateTime(new Date());
                omPredealSyncLogDto.setState("0");
                omPredealSyncLogDto.setTimes("0");
                omPredealSyncLogDto.setLogName(recFiles[i].getName() + "資源信息同步");
                omPredealSyncLogDto.setLogType(PreDealResourceConstrant.PRE_DEAL_RESOURCE_LOG_TYPE);
                omPredealSyncLogDto.setRemark(filePath);
                omPredealSyncLogDto.add();
                //先交給消費者消費,只有當消費完畢,我們才移動和刪除文件
                preDealResourceVo.setOmPredealSyncLogDto(omPredealSyncLogDto);
                preDealResourceVo.setFileName(recFiles[i].getName());
                preDealResourceVo.setFileFullPath(desPath);

                //移動數據進入對應的消費目錄
                DataUtil.copyFile(filePath, desPath);
                DataUtil.deletFile(filePath);

                commonQueuePool.put(preDealResourceVo);
            }
            //更新配置信息,由於文件是生成前就會刪除,所以這個邏輯就不做了,有文件就全下載下來
            paramMapFile.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
            paramMapFile.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_RULE);
            //更新為今天
            paramMapFile.put("codeb", ConcurrentDateUtil.formatResourceFile(new Date()));
            preDealResourceDao.updateConfig(paramMapFile);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
}

 

package com.ztesoft.interfaces.predeal.bl;

import com.ztesoft.interfaces.common.bll.CommonQueuePool;
import com.ztesoft.interfaces.common.bll.CommonQueueWork;
import com.ztesoft.interfaces.common.vo.CommonQueueVo;
import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo;
import org.apache.log4j.Logger;

/**
 * @ProjectName: 湖北移動智慧裝維支撐系統
 * @Package: com.ztesoft.interfaces.predeal.bl
 * @ClassName: PreDealResourceConsum
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/3/10 14:44
 * @Version: 1.0
 */
public class PreDealResourceConsum implements Runnable {

    private Logger logger = Logger.getLogger(PreDealResourceConsum.class);

    //這個是要用來控制多線的隊列
    private final CommonQueuePool commonQueuePool;
    public PreDealResourceConsum(CommonQueuePool commonQueuePool) {
        this.commonQueuePool = commonQueuePool;
    }

    @Override
    public void run() {
        //只要不為空,那么就可以一直取
        while(true) {
            //3.刪除遠端路徑文件(考慮延后處理,比如解析完成之后刪除),那么這里我們放消費者中處理
            try {
                CommonQueueVo commonQueueVo = (CommonQueueVo) commonQueuePool.take();
                CommonQueueWork commonQueueWork = null;
                //這里進行區分類型
                if(commonQueueVo instanceof PreDealResourceVo) {
                    commonQueueWork = new PreDealResourceWork();
                }

                if(commonQueueWork != null) {
                    commonQueueWork.doWork(commonQueueVo);
                }
            } catch (InterruptedException e) {
//                e.printStackTrace();
                logger.error(e.getMessage(), e);
            }

        }
    }
}

 

最后核心解析類實現

package com.ztesoft.interfaces.predeal.bl;

import com.ztesoft.interfaces.common.bll.CommonQueueWork;
import com.ztesoft.interfaces.common.vo.CommonQueueVo;
import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant;
import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao;
import com.ztesoft.interfaces.predeal.dto.OmPredealSyncLogDto;
import com.ztesoft.interfaces.predeal.util.CreateFileUtil;
import com.ztesoft.interfaces.predeal.util.CsvBigTask;
import com.ztesoft.interfaces.predeal.util.DataUtil;
import com.ztesoft.interfaces.predeal.util.PartitionPair;
import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo;
import com.ztesoft.isa.service.common.util.SpringContextUtil;
import org.apache.commons.collections.MapUtils;
import org.apache.log4j.Logger;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.*;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @ProjectName: 湖北移動智慧裝維支撐系統
 * @Package: com.ztesoft.interfaces.predeal.bl
 * @ClassName: PreDealResourceWork
 * @Author: xiaof
 * @Description: 解析資源ftp文件,並准備入庫
 * @Date: 2019/3/10 15:00
 * @Version: 1.0
 */
public class PreDealResourceWork implements CommonQueueWork {

    private final static Logger logger = Logger.getLogger(PreDealResourceWork.class);

    private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao");

    private static ThreadLocal<List> tempDataThreadLocal = new ThreadLocal<List>() {
        @Override
        protected List initialValue() {
            return new ArrayList();
        }
    };

    @Override
    public void doWork(CommonQueueVo commonQueueVo) {

        PreDealResourceVo preDealResourceVo = (PreDealResourceVo) commonQueueVo;
        OmPredealSyncLogDto omPredealSyncLogDto = preDealResourceVo.getOmPredealSyncLogDto();
        //1.獲取文件
        logger.info("===========1開始解壓文件=======" + System.currentTimeMillis());
        File zipFile = new File(preDealResourceVo.getFileFullPath());
        //解壓文件
        try {
            String fileName = CreateFileUtil.unZipSingleFileCurrentDir(zipFile);
            String tableName = this.getTableName(fileName);
            //2.判斷文件是否存在
            logger.info("===========2判斷文件是否存在=======" + System.currentTimeMillis());
            File resourceFile = new File(zipFile.getParent() + "/" + fileName);
            if(!resourceFile.exists()) {
                omPredealSyncLogDto.setState("0");
                omPredealSyncLogDto.setUpdateTime(new Date());
                omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + "");
                omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + "\n" + resourceFile.getPath() +  " 文件不存在");
                return;
            }
            //3.獲取隨機文件索引
            logger.info("===========3獲取隨機文件索引=======" + System.currentTimeMillis());
            RandomAccessFile randomAccessFile = new RandomAccessFile(resourceFile, "r");
            long length = resourceFile.length();
            //文件大小1G,獲取處理器核心數
            int availProcessors = Runtime.getRuntime().availableProcessors();
            logger.info("===========3。1可使用線程=======" + availProcessors);
            long blockLength = length / availProcessors;
            byte b = '0';
            int index = 0;
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            while(b != '\r' && b != '\n') {
                b = randomAccessFile.readByte();
                byteArrayOutputStream.write(b);
                ++index;
            }
            //獲取首行
            String[] firstLine = byteArrayOutputStream.toString().split("\t");
            //文件分片
            Set pairSets = DataUtil.partition(index, blockLength, length, randomAccessFile);
            //4.數據分片之后,分別啟動處理線程
            logger.info("===========4數據分片之后,分別啟動處理線程=======" + pairSets + " 時間:" + System.currentTimeMillis());
            final long startTime = System.currentTimeMillis();
            AtomicLong atomicLong = new AtomicLong(0);
            //6.數據入庫,這里做成可配置模式
            logger.info("===========5數據入庫=======" + System.currentTimeMillis());
            Map workConfigMap = new HashMap();
            workConfigMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
            workConfigMap.put("pkey", PreDealResourceConstrant.PRE_DEAL_RESOURCE_WORK);
            workConfigMap = preDealResourceDao.qryPreDealResourceConfig(workConfigMap);
            int mapSize = MapUtils.getInteger(workConfigMap, "CODEA", 1000);
            int bufSize = MapUtils.getInteger(workConfigMap, "CODEB", 3) * 1024 * 1024;
            String splitMark = MapUtils.getString(workConfigMap, "CODEC", "\t");
            //先清空表數據,備份數據放在文件中
            initTable(tableName, omPredealSyncLogDto);

            CyclicBarrier cyclicBarrier = new CyclicBarrier(pairSets.size(), new Runnable() {
                @Override
                public void run() {
                    //吧最后的數據提交上去
                    logger.info("===========5數據入庫結束=======" + System.currentTimeMillis());
                    omPredealSyncLogDto.setUpdateTime(new Date());
                    omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + "");
                    omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + "\n" + resourceFile.getPath() +  " 入庫完畢 use time: "+(System.currentTimeMillis()-startTime));
                    omPredealSyncLogDto.setState("1");
                    omPredealSyncLogDto.setNewTotal(String.valueOf(atomicLong.get()));
                    omPredealSyncLogDto.update();
                }
            });

            for(Object pair : pairSets) {
//                List tempData = new ArrayList<>();
                PartitionPair partitionPair = (PartitionPair) pair;
                CsvBigTask csvBigTask = new CsvBigTask(cyclicBarrier, atomicLong, partitionPair, bufSize, randomAccessFile, new ArrayList(), new IHandle() {
                    @Override
                    public void handle(String line, boolean lastLine, List tempData) {
                        try {
                            //轉換數據為map
//                            logger.info("===========5讀取數據=======" + line + System.currentTimeMillis());
                            if(!line.equals("")) {
                                String[] elements = line.split(splitMark);
                                tempData.add(elements);
                            }

                            //每個3000條一提交
                            if((tempData.size() % mapSize == 0 && tempData.size() >= mapSize) || lastLine) {
                                preDealResourceDao.addResourceBatch(tableName, firstLine, tempData);
                                tempData.clear();
//                                logger.info("===========6批量提交數據=======" + System.currentTimeMillis());
                            }
                        } catch (Exception e) {
                            logger.error(line + e.getMessage(), e);
                        }
                    }
                });
                Thread thread = new Thread(csvBigTask);
                thread.start();
            }


        } catch (IOException e) {
            omPredealSyncLogDto.setState("0");
            omPredealSyncLogDto.setUpdateTime(new Date());
            omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + "");
            omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + "\n" + preDealResourceVo.getFileFullPath() + e.getMessage());
            logger.error(e.getMessage(), e);
        } finally {
            //7.最后更新日志
            logger.info("===========7最后更新日志=======" + System.currentTimeMillis());
            omPredealSyncLogDto.update();
        }
    }

    private String getTableName(String fileName) {
        String tableName = "";
        tableName = PreDealResourceConstrant.PRE_RESOURCE_TABLE + "_" + fileName.substring(0, fileName.lastIndexOf("_"));
        return tableName;
    }

    private void initTable(String tableName, OmPredealSyncLogDto omPredealSyncLogDto) {
        preDealResourceDao.truncateResourceTable(omPredealSyncLogDto, tableName);
    }

}

 

 

最后公布一小段入庫代碼,這個就不全給了

PreDealResourceDaoImpl

 

@Override
    public void addResourceBatch(String tableName, String[] fields, List params) {
        try {
//            String sql = "insert into PRE_AAA_STAFF (domain_name,internet_account,brasid,band_info,registTime,FirstUseTime,lastLoginTime,status,bandwidth_M,broadband_account,CREATE_DATE,UPDATE_DATE)\n" +
//                    "values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?,sysdate,sysdate) ";
            //組裝sql
            StringBuffer stringBuffer = new StringBuffer(" insert /*+ append */ into " + tableName + " (");
            StringBuffer stringValue = new StringBuffer(" values (");
//            Set<String> keySet = tableMap.keySet();
            for(int i = 0; i < fields.length; ++i) {
                stringBuffer.append(fields[i] + ",");
                stringValue.append("?,");
            }

            //添加時間
            stringBuffer.append("create_date, update_date,");
            stringValue.append("sysdate,sysdate,");

            stringBuffer = stringBuffer.deleteCharAt(stringBuffer.length() - 1).append(" ) ");
            stringValue = stringValue.deleteCharAt(stringValue.length() - 1).append(")");
            String sql = stringBuffer.append(stringValue).toString();

            super.getJdbcOperations().batchUpdate(sql, new BatchPreparedStatementSetter() {
                @Override
                public void setValues(PreparedStatement ps, int i) throws SQLException {
                    Object[] values = (Object[]) params.get(i);
                    for(int j = 0; j < fields.length; ++j) {
                        if(j < values.length) {
                            ps.setString(j + 1, String.valueOf(values[j]));
                        } else {
                            ps.setObject(j + 1, null);
                        }
                    }
                }

                @Override
                public int getBatchSize() {
                    return params.size();
                }
            });

            logger.info("====" + tableName + "入庫 " + params.size() + " 條");

        } catch (DataAccessException e) {
            logger.error(e.getMessage(), e);
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM