Apache Camel繼承Spring Boot 實現文件遠程復制和轉移


pom.xml

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-ftp</artifactId>
    <version>2.18.0</version>
</dependency>

 

URI的格式

ftp://[username@]hostname[:port]/directoryname[?options]

sftp://[username@]hostname[:port]/directoryname[?options]

ftps://[username@]hostname[:port]/directoryname[?options]

 

如果未提供端口號,Camel將根據協議提供默認值(ftp = 21,sftp = 22,ftps = 2222)。

例:
ftp://173.5.206.53:2121/MHE/?username=ftpAdmin&password=123456&binary=true&passiveMode=true&delete=true&delay=60000

注:ftp的路徑必須是 當前ip:端口 + 當前賬號密碼登陸進去的目錄往下走(不能往上走,沒權限),路徑才會生效,否則不會報錯,也不會進行文件操作

配置文件信息:

#文件壓縮本地地址(備份)
ftp.local.backups.dir=file:D:/gz/backups?move=delete&delay=30s
#大數據服務器地址(備份)
ftp.bigdata.backups.adress=sftp://218.984.130.146:51268/../apps/chinamobile/bigdata/dataConsistencyRepair/backups/?username=root&password=Redoor2018@net123&delay=30s


#大數據服務器修復地址(運維存放文件夾,使用后存放到備份文件中)
ftp.bigdatarepair.adress=sftp://218.984.130.146:51268/../apps/chinamobile/bigdata/bigdatarepair/?username=root&password=Redoor2018@net123&move=/apps/chinamobile/bigdata/bigdatarepair/backups&delay=30s
#本地文件修復地址(復制到自定義文件夾)
ftp.local.repair=file:D:/localrepair


#對比大數據差異文件存放地址(通知運維)
ftp.bigdatadifference.adress=sftp://218.984.130.146:51268/../apps/chinamobile/bigdata/difference/?username=root&password=Redoor2018@net123&delay=30s

camel.springboot.main-run-controller=true

 

文件實現文件復制,轉移:

/**
 * 遠程文件監聽和拉取
 */
@Component
public class DownLoadRouteBuilder extends RouteBuilder{

    private Logger log = Logger.getLogger(DownLoadRouteBuilder.class);

    /**
     * 文件壓縮本地地址
     */
    @Value("${ftp.local.dir}")
    private String localFileDir;

    /**
     * 大數據服務器地址
     */
    @Value("${ftp.bigdata.adress}")
    private String bigDataServer;

    /**
     * 文件壓縮本地地址(上傳備份)
     */
    @Value("${ftp.local.backups.dir}")
    private String localFileDirBackups;

    /**
     * 大數據服務器地址(上傳備份)
     */
    @Value("${ftp.bigdata.backups.adress}")
    private String bigDataBackupsServer;

    /**
     * 本地文件修復地址
     */
    @Value("${ftp.local.repair}")
    private String localRepair;
    /**
     * 大數據服務器修復地址
     */
    @Value("${ftp.bigdatarepair.adress}")
    private String bigDataRepairServer;

    /**
     * 對比大數據差異文件存放地址
     */
    @Value("${ftp.bigdatadifference.adress}")
    private String bigdatadifference;

    @Override
    public void configure() throws Exception {
        //文件上傳大數據
        from(localFileDir).to(bigDataServer).process("transferDataProcessToBigData");
        //文件上傳大數據(上傳備份)  備份文件不存redis有一份就好
        from(localFileDirBackups).to(bigDataBackupsServer).process("transferDataProcessToBigData");

        //修復文件拉取
        from(bigDataRepairServer).to(localRepair).process("transferDataProcessBigDataRepairToLocal");
        //對比大數據差異文件存放地址
        from(bigdatadifference).to(bigdatadifference).process("transferDataProcessToBigDataDifference");

        System.out.println("file download ok...");
        log.info("遠程文件監聽和拉取 已啟動.......");
    }

}

文件復制獲取信息:

    文件遠程復制和移動以后,本對象中可以顯示

文件一:

/**
 * 監聽本地修復文件
 */
@Component
public class TransferDataProcessBigDataRepairToLocal implements Processor{

    private Logger log = Logger.getLogger(TransferDataProcessBigDataRepairToLocal.class);

    @Autowired
    private RepairRedisUtil repairRedisUtil;
    /**
     * 本地文件修復地址
     */
    @Value("${ftp.local.repair}")
    private String localRepair;

    /**
     * redis工具類
     */
    @Autowired
    RedisUtils redisUtils;


    /**
     * 讀取GZ壓縮文件,多線程
     */
    @Autowired
    AsyncService asyncService;

    
    @Override
    public void process(Exchange exchange) throws Exception {
        GenericFileMessage<RandomAccessFile> inFileMessage = (GenericFileMessage<RandomAccessFile>) exchange.getIn();
        String fileName = inFileMessage.getGenericFile().getFileName();//文件名
        String splitTag = File.separator;//系統文件分隔符
        String absolutePath = localRepair + splitTag + fileName;
        System.out.println(absolutePath);//文件的絕對路徑
        System.out.println("read file and push to falcon...");

        System.out.println("准備修復文件!");
        //文件的絕對路徑去掉:file:
        String filePath = absolutePath.substring(absolutePath.indexOf("file:")+"file:".length());

        //創建redisKey  項目名稱,模塊名稱,功能名稱,自定義名稱
        String bigdatarepair = redisUtils.buildRedisKey("OneLink","DataConsistencyRepair","bigdatarepair","list");

        //文件名稱解析並進行redis緩存
        DataRepairCacheModel dataRepairModel = repairRedisUtil.getFileInfo(fileName);
        //獲取緩存key
        String cacheKey = repairRedisUtil.getDataRepairCacheKey(dataRepairModel.getBusinessCode());
        //文件名稱緩存
        repairRedisUtil.CacheRepairData(cacheKey,dataRepairModel,0);

    }
}

文件二:

/**
 * 監聽大數據服務器是否上傳成功
 */
@Component
public class TransferDataProcessToBigData implements Processor{

    private Logger log = Logger.getLogger(TransferDataProcessToBigData.class);

    /**
     * redis工具類
     */
    @Autowired
    RedisUtils redisUtils;

    /**
     * 大數據服務器地址
     */
    @Value("${ftp.bigdata.adress}")
    private String bigDataServer;

    
    @Override
    public void process(Exchange exchange) throws Exception {
        GenericFileMessage<RandomAccessFile> inFileMessage = (GenericFileMessage<RandomAccessFile>) exchange.getIn();
        String fileName = inFileMessage.getGenericFile().getFileName();//文件名
        String splitTag = File.separator;//系統文件分隔符
        System.out.println(bigDataServer + splitTag + fileName);//文件的絕對路徑
        System.out.println("read file and push to falcon...");


        //添加:判斷redis是否存在,不存在就刪除,存在就不管

        //創建redisKey  項目名稱,模塊名稱,功能名稱,自定義名稱
        String redisName = redisUtils.buildRedisKey("OneLink","DataConsistencyRepair","upload","List");
        //獲取
        Map<String, DataUploadModel> stringObjectMap  = (Map<String, DataUploadModel>) JSON.parseObject((String)redisUtils.get(redisName),Map.class);

        //刪除
        stringObjectMap.remove(fileName);

        //將集合存入緩存中
        redisUtils.set(redisName, JSONObject.toJSONString(stringObjectMap));

        log.info("文件信息:"+fileName+"上傳大數據服務器成功!");
        log.info("文件信息:"+fileName+"從redis刪除標記!");
    }
}

 

文件三:

/**
 * 對比大數據差異文件存放地址監聽
 */
@Component
public class TransferDataProcessToBigDataDifference implements Processor{

    private Logger log = Logger.getLogger(TransferDataProcessToBigDataDifference.class);

    /**
     * redis工具類
     */
    @Autowired
    RedisUtils redisUtils;
@Autowired
private Task task; /** * 對比大數據差異文件存放地址 */ @Value("${ftp.bigdatadifference.adress}") private String bigdatadifference; @Override public void process(Exchange exchange) throws Exception { GenericFileMessage<RandomAccessFile> inFileMessage = (GenericFileMessage<RandomAccessFile>) exchange.getIn(); String fileName = inFileMessage.getGenericFile().getFileName();//文件名 String splitTag = File.separator;//系統文件分隔符 System.out.println(bigdatadifference + splitTag + fileName);//文件的絕對路徑 System.out.println("read file and push to falcon..."); task.send(); System.out.println("給運維發送郵件!"); } }

 

官方API文檔:http://camel.apache.org/ftp2.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM