Apache Camel之FTP組件學習


寫在最前面

哎,最近提了離職,手頭的活也基本上清理的差不多了。想着這個把月可以舒服的晃悠晃悠的離開,但是運維的小伙伴總是不架勢,走之前還是提了個新需求。

先說下需求吧,我們的系統概括的講就是一個接口系統,對外的方式無外乎三種,MQ、WEBSERVICE以及FTP了。因為FTP的業務是前人留下來東西,而它恰好一直不出問題,邏輯也比較復雜,所以一直都懶得看里面的內容,只是初步的知道是用的Apache Camel的ftp的路由。

這次運維同學提出的需求正好就是關於ftp的,想着離職還要一段時間,索性就研究這東西吧。因為我也是初步接觸,有不對的地方還請指正。具體的需求就是,我們的系統需要監控遠程ftp上的ZIP包文件,將ZIP包下載到本地解壓,然后解析導入數據庫中。ps:類似JMS監控MQ。

一、Apache Camel是什么東西?

Apache Camel 的官網是http://camel.apache.org 。它是一個通用的基於已知的開源集成框架企業集成模式。簡單的講,Apache Camel就是集成現有的一系列中間件架構,如HTTP, ActiveMQ, JMS, JBI, SCA, MINA or CXF等,實現傳輸協議和消息格式的轉換等等。看起來功能還挺強大,不過大部分可能在垂直領域有更好的解決方案。本文只說一下FTP的這個路由組件。(這段話可能說的不是太清楚,我自己也是迷迷糊糊,有能力的同學自己看下官網的解釋吧)

二、官網FTP API 翻譯整理

來源:http://camel.apache.org/ftp

2.1 關於jar包

<!-- https://mvnrepository.com/artifact/org.apache.camel/camel-ftp -->
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-ftp</artifactId>
    <version>2.13.1</version>
</dependency>

這個是我們項目中用的Camel版本,比較老了,最新的是2.19.2

<!-- https://mvnrepository.com/artifact/org.apache.camel/camel-ftp -->
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-ftp</artifactId>
    <version>2.19.2</version>
</dependency>

 2.2 URI的格式

ftp://[username@]hostname[:port]/directoryname[?options]

sftp://[username@]hostname[:port]/directoryname[?options]

ftps://[username@]hostname[:port]/directoryname[?options]

下面這個鏈接是我本地測試使用的,可以參考下,具體參數的意義請往下看。

ftp://173.5.206.53:2121/MHE/?username=ftpAdmin&amp;password=123456&amp;binary=true&amp;passiveMode=true&amp;delete=true&amp;delay=60000

其中directoryname表示文件夾目錄。目錄名稱是一個相對路徑。不支持絕對路徑。相對路徑可以包含嵌套文件夾,例如 /inbox/us。

對於Camel2.16之前的版本,directoryName必須是已經存在的,這個配置不支持autoCreate選項(自動創建哪個文件夾)。原因是FTP服務器上有權限限制。

對於Camel2.16,是支持autoCreate選項的。當消費者啟動,在執行輪詢計划之前,會自動到FTP服務器創建對應的文件目錄。默認值autoCreate是true.

如果沒有提供用戶名,會使用anonymous匿名登錄,密碼會隨機嘗試。
如果沒有提供端口號,Camel將根據協議提供缺省值(ftp sftp = ftp = 21日22日= 2222)。

你可以添加以下格式的URI查詢配置, ?option=value&option=value&...  如上文我的測試鏈接。&amp;是我在pom文件中轉譯使用的。

這個配置使用兩個不同的庫操作FTP。FTP和FTPS使用Apache Commons Net,而SFTP使用JCraft JSCH

FTPS組件僅可在Camel2.2及更新的版本。
FTPS(也稱為安全FTP)是一個擴展添加支持FTP傳輸層安全性(TLS)和安全套接字層(SSL)加密協議。

ps:因為我目前使用的是2.13.1,關於新版本的特性我也不是太了解。

2.3 URI的各種配置選項

Name

Default Value

Description

username

null

指定要使用的用戶名登錄遠程服務器。

password

null

指定的密碼用來登錄到遠程服務器。

account

null Camel 2.15.2: 指定的帳戶用於登錄到遠程FTP服務器(只對FTP和FTPS)

binary

false

指定文件傳輸模式,二進制或ASCII。默認是ASCII(false)。

disconnect

false

Camel 2.2: 使用后是否要斷開遠程FTP服務器。可用於消費者和生產者。只會斷開斷開當前連接到FTP服務器。如果你有一個你想要停止消費,那么你需要停止消費者路由。

localWorkDirectory

null

在使用時,可以使用本地工作目錄將遠程文件內容直接存儲在本地文件中,以避免將內容加載到內存中。這個是有好處的,如果你下載一個非常大的遠程文件,可以節省內存。詳情見下文。

passiveMode

false

FTP and FTPS only: 指定是否使用被動模式連接。默認是false。

download

true

Camel 2.11: FTP消費者是否應該下載該文件。如果將此選項設置為false,那么消息體將null,但消費者仍將觸發一個Camel Exchange獲得文件的詳細信息,如文件名,文件大小,等等。只是不能下載的文件。

streamDownload

false

Camel 2.11: 消費者是否應該下載整個文件前,默認的行為,或者是否應該通過InputStream從路由遠程資源讀取而不是從內存中的Camel Exchange數組獲取。如果下載失敗的或是本地目錄提供,這個選項可以忽略。此選項對於處理大型遠程文件非常有用。

execProt

null

Camel 2.4: FTPS only: 默認情況下,如果沒有禁用安全數據通道默認值,則使用選項p。可能的值是:
C: Clear 
S: Safe (SSL protocol only) 
E: Confidential (SSL protocol only) 
P: Private

execPbsz

null

Camel 2.4: FTPS only: 此選項指定安全數據通道的緩沖區大小。如果選擇useSecureDataChannel,但是沒有被顯式設置,就沒有使用價值了。

fastExistsCheck

false

Camel 2.8.2, 2.9: 如果將此選項設置為true,camel-ftp將直接使用列表文件檢查文件是否存在。由於一些FTP服務器可能不支持直接列出文件,如果選項是錯誤的,camel-ftp將使用舊的方法列出目錄並檢查文件是否存在。注意從Camel 2.10.1起這個選項也影響readLock=changed控制是否執行一個快速檢查更新文件信息。如果FTP服務器有大量文件,則可以使用它來加快進程。

reconnectDelay

1000

Delay in millis Camel will wait before performing a reconnect attempt.

connectTimeout

10000

Camel 2.4: Is the connect timeout in millis. This corresponds to using ftpClient.connectTimeoutfor the FTP/FTPS. For SFTP this option is also used when attempting to connect.

soTimeout

null / 30000

FTP and FTPS Only: Camel 2.4: Is the SocketOptions.SO_TIMEOUT value in millis. A good idea is to configure this to a value such as 300000 (5 minutes) to not hang a connection. On SFTP this option is set as timeout on the JSCH Session instance.

Also SFTP from Camel 2.14.3/2.15.3/2.16 onwards.

From Camel 2.16 onwards the default is 300000 (300 sec).

timeout

30000

FTP and FTPS Only: Camel 2.4: Is the data timeout in millis. This corresponds to usingftpClient.dataTimeout for the FTP/FTPS. For SFTP there is no data timeout.

chmod

null

SFTP Producer Only: Camel 2.9: 允許你設置chmod存儲文件。例如 chmod=640.

receiveBufferSize

32768 FTP/FTPS Only: Camel 2.15.1: 下載文件的緩沖區大小。默認大小是32 kb。

ftpClient

null

FTP and FTPS Only: Camel 2.1: Allows you to use a customorg.apache.commons.net.ftp.FTPClient instance.

disconnectOnBatchComplete false Camel 2.18: 是否在批處理完成后從遠程FTP服務器斷開連接。可用於消費者和生產者。斷開連接只會斷開與FTP服務器的當前連接。如果你有一個想要停止的消費者,那么你需要停止消費者路由
activePortRange   Camel 2.18: 在主動模式設置客戶端端口范圍。語法是:minPort-maxPort。端口號都是包括的,如10000 - 19999包括所有xxxx端口。

2.4 Camel默認的一些配置

FTP消費者在默認情況下將消耗遠程FTP服務器上的文件。你必須顯式地配置讀取后如何處理這個文件,如果你想要刪除的文件或移動到另一個位置。例如你可以使用delete=true刪除的文件,或者使用move=.done將文件移動到一個隱藏的子目錄。

常規的文件消費是不同的,因為它將默認移動文件.camel子目錄。Camel不做FTP默認的處理行為是它可能缺少權限,默認情況下無法移動或刪除文件。

三、具體功能實現

先放一個小DEMO

/**
 * Apache Camel FTP Demo
 * @author 小賣鋪的老爺爺
 */
public class HelloWorld extends RouteBuilder {
  //啟動FTP路由,實際項目中初始化應該是單獨的一個類
    public static void main(String[] args) throws Exception {
        // 這是camel上下文對象,整個路由的驅動全靠它了。
        ModelCamelContext camelContext = new DefaultCamelContext();
        // 啟動route
        camelContext.start();
        // 將我們的路由處理加入到上下文中
        camelContext.addRoutes(new HelloWorld());
    }

    @Override
    public void configure() throws Exception {
        //從FTP上下載文件到本地目錄,相關參數的意義,參考我上文貼出的API,實際項目中這些地址一般寫在配置文件中
        from("ftp://173.5.206.53:2121/MHE/?username=ftpAdmin&password=123456&binary=true&passiveMode=true&delete=true&delay=60000")
        //自定義的處理器,可以做各種邏輯處理,如文件名匹配下載等
        .process(new HttpProcessor())
        .to("file:d:/wms-fe/inFile");
    }
}

上文的demo是每隔一分鍾掃描ftp服務器目錄上是否有新文件,如果有匹配文件名復核條件的下載到本地,並將服務器上的文件刪除。當然我們也可以不刪除,將文件移到一個固定目錄備份,這里可以用move=XXX參數。.process(new HttpProcessor()) 其中這句是可以刪除的,刪除后就是只有有文件,Camel就會下載到本地了。

可能小Demo中代碼不是很齊全,因為Processor的處理邏輯的代碼可能有點問題,被我刪掉了。

下面看下我實際項目里面的處理邏輯吧,

/**
 * 
 * 此類描述的是:FTP下載、加壓、解析處理
*
*/ public class FTPMheRoute extends RouteBuilder { private String ftpDownURI; private String downDir; private String unpackDir; private String ftpFileCharset; private void initialize() { ftpDownURI = WmsFEUtil.getSysConfigValue("camel.ftp.download.uri").trim(); downDir = WmsFEUtil.getSysConfigValue("camel.ftp.download.zip.dir").trim(); unpackDir = WmsFEUtil.getSysConfigValue("camel.ftp.download.zip.unpack.dir").trim(); ftpFileCharset = WmsFEUtil.getSysConfigValue("camel.ftp.file.charset"); } // @Override public void configure() throws Exception { initialize(); downRoute(); unpackRoute(); parseRoute(); } /** * 此方法描述的是:下載路由 */ private void downRoute() { from(ftpDownURI).to(downDir).process(new MHEDownloadedProcessor()); // from(ftpDownURI) // .choice() // .when(new FTPFilter(ftpFileCharset, FTPFilter.TYPE_FTP_FILE_NORMAL)) // .process(new FTPProcessor(ftpFileCharset)) // .to(downDir); //.process(new MHEDownloadedProcessor()); OutUtil.log( FTPMheRoute.class,"Register Route :"+ " when find the zip file it will download to "+downDir ); } /** * 此方法描述的是:解壓路由 */ private void unpackRoute() { from(downDir).process(new MHEDownUnpackProcessor()); OutUtil.log( FTPMheRoute.class,"Register Route :"+ "camel listner @ "+ downDir , " when find the zip file it will unpack to "+unpackDir ); } /** * 此方法描述的是:解析路由 */ private void parseRoute() { from(unpackDir).process(new MHEDownExecuteProcessor()); OutUtil.log( FTPMheRoute.class,"Register Route :"+ "camel listner @ "+ unpackDir , " when find the txt file it will parse to DB." ); } }

 上面的代碼中,我使用三個路由連接,上一個路由中由最后一個元素處理完的Exchange對象,將被發送至由Direct連接的下一個路由起始位置(http://camel.apache.org/direct.html)。注意,兩個被連接的路由一定要是可用的,並且存在於同一個Camel服務中。

最后說一下Process,它用於接收從控制端點、路由選擇條件又或者另一個處理器的Exchange中傳來的消息信息,並進行處理。這里process(Exchange exchange)方法是必須進行實現的。

因為某些原因,這里我只貼下我解壓路由的處理邏輯。

/**
 * 
 * 此類描述的是:FTP解壓處理
 */
public class MHEDownUnpackProcessor implements Processor {

    private static final String TXT_PATTERN = ".*?\\.txt";
    private static final String ZIP_PATTERN = ".*?\\.zip";
    private static final String BUSINESSTYPE_FTPMHE = "FTPMHE";

    @Override
    public void process(Exchange exchange) throws Exception {
        Message message = exchange.getIn();
        GenericFile<?> gf =  (GenericFile<?>)message.getBody();
        File zipFile = (File) gf.getFile();
        unzip(zipFile);
    }

    /**
     * 此方法描述的是:解壓縮文件
     * @throws IOException 
     */
    private List<File> unzip(File zipFile) throws IOException {
        String zipName = zipFile.getName();
        if (Pattern.matches(ZIP_PATTERN, zipName)){
            List<File> list = ZipUtil.unzip(zipFile, getDownUnpackDir(), TXT_PATTERN);
            if (list == null){
                BaseLogUtil.addMsgLog(BUSINESSTYPE_FTPMHE, zipName, "", BaseLogUtil.SOURCE_OTHER, 
                        BaseLogUtil.TO_SYS_OTHER, BaseLogUtil.CHANNEL_FTP, BaseLogUtil.STATUS_FAIL, "", "解壓失敗,文件內容為空");
                ParseFileTools.move2Dir(zipFile, "fail");
            }else{
                //FileUtil.deleteFile(zipFile);
                ParseFileTools.move2Dir(zipFile, "success");
            }
            return list;
        }else{
            BaseLogUtil.addMsgLog(BUSINESSTYPE_FTPMHE, zipName, "", BaseLogUtil.SOURCE_OTHER, 
                    BaseLogUtil.TO_SYS_OTHER, BaseLogUtil.CHANNEL_FTP, BaseLogUtil.STATUS_FAIL, "", "解壓失敗,壓縮包為非法格式");
            ParseFileTools.move2Dir(zipFile, "fail");
        }
        return null;
    }

    /**
     * 此方法描述的是:獲得解壓臨時目錄
     */
    public static String getDownUnpackDir() {
        return getURI(WmsFEUtil.getSysConfigValue("camel.ftp.download.zip.unpack.dir"));
    }

    /**
     * 此方法描述的是:獲得文件路徑
     */
    public static String getURI(String fileURI) {
        String uri = fileURI.replaceFirst("file://", "");
        int lastParam = uri.lastIndexOf("?");
        if (lastParam != -1){
            uri = uri.substring(0, lastParam);
        }
        return uri;
    }
}
public class ZipUtil {
    
    /**
     * 此方法描述的是:ZIP壓縮
     * @param source 源文件
     * @param dest 壓縮文件
     * @version: 2015年3月2日 上午9:17:26
     */
    public static String zip(File source, File dest) {
        ZipOutputStream out = null;
        BufferedOutputStream bo = null;
        try{
            File zipParent = dest.getParentFile();
            if (!zipParent.exists()){
                zipParent.mkdirs();
            }
            out = new ZipOutputStream(new FileOutputStream(dest));
            bo = new BufferedOutputStream(out);
            zip(out, source, source.getName(), bo);
            return dest.getAbsolutePath();
        }catch(Exception e){
            OutUtil.error(e,e.getMessage());
        }finally{
            if (bo!=null){
                try {
                    bo.close();
                } catch (IOException e) {
                }
            }
            if (out!=null){
                try {
                    out.close(); // 輸出流關閉
                } catch (IOException e) {
                }
            }
        }
        return null;
    }

    public static void zip(ZipOutputStream out, File f, String base,BufferedOutputStream bo) throws Exception { // 方法重載
        out.putNextEntry(new ZipEntry(base)); // 創建zip壓縮進入點base
        FileInputStream in = new FileInputStream(f);
        BufferedInputStream bi = new BufferedInputStream(in);
        int b;
        try{
            while ((b = bi.read()) != -1) {
                bo.write(b); // 將字節流寫入當前zip目錄
            }
        }finally{
            bi.close();
            in.close(); // 輸入流關閉
            bo.close();
        }
    }
    
    
    /**  解壓縮(壓縮文件中包含多個文件)可代替上面的方法使用。
     * ZipInputStream類
     * 當我們需要解壓縮多個文件的時候,ZipEntry就無法使用了,
     * 如果想操作更加復雜的壓縮文件,我們就必須使用ZipInputStream類
     * */
    public static List<File> unzip(File sourceFile ,String outPath,String fileNameRegexp){
        List<File> listDestFile = new ArrayList<File>();
        File outFile = null;
        ZipFile zipFile = null;
        FileInputStream sourceInput = null;
        ZipInputStream zipInput = null;
        ZipEntry entry = null;
        InputStream input = null;
        OutputStream output = null;
        String sourceName = sourceFile.getName();
        try {
            zipFile = new ZipFile(sourceFile);
            sourceInput = new FileInputStream(sourceFile);
            zipInput = new ZipInputStream(sourceInput);
            while((entry = zipInput.getNextEntry()) != null){
                String entryName = entry.getName();
                if (!Pattern.matches(fileNameRegexp, entryName)){
                    continue ;
                }
                outFile = new File(outPath + File.separator + sourceName+"-"+entryName);
                if(!outFile.getParentFile().exists()){
                    outFile.getParentFile().mkdir();
                }
                if(!outFile.exists()){
                    outFile.createNewFile();
                }
                input = zipFile.getInputStream(entry);
                output = new FileOutputStream(outFile);
                int temp = 0;
                while((temp = input.read()) != -1){
                    output.write(temp);
                }
                input.close();
                output.close();
                listDestFile.add(outFile);
            }
            return listDestFile;
        } catch (Exception e) {
            e.printStackTrace();
        } finally{
            try {
                if (input!=null)
                    input.close();
                if (output!=null)
                    output.close();
                if (zipInput!=null)
                    zipInput.close();
                if (sourceInput!=null)
                    sourceInput.close();
                if (zipFile!=null)
                    zipFile.close();
            } catch (IOException e) {}
        }
        return null;
    }
    
    

    /**
     * 此方法描述的是:獲得ZIP文件同名解壓目錄
     * @version: 2015年3月5日 下午2:10:41
     */
    public static String getUnpackForder(File zipFile) {
        String filePath = zipFile.getAbsolutePath();
        return filePath.substring(0, filePath.lastIndexOf("."));
    }

    
    /**
     * 此方法描述的是:獲得ZIP文件指定解壓目錄
     * @version: 2015年3月6日 下午10:28:36
     */
    public static String getUnpackForder(File zipFile, String subDir) {
        return zipFile.getParent()+File.separator+subDir;
    }
    
}

 基本上就是上面這些,貼出的代碼只具有參考意義。最后目前還有一個問題,在解壓和解析處理的時候,我做了備份,成功的會移到succ文件夾,失敗的會移到fail文件夾,但是.camel默認生成的文件夾雖然沒什么用還是一直存在,不知道該如何禁止這個文件夾的生成。還請玩過Camel的大牛指導下。謝謝。

參考:

http://camel.apache.org

http://camel.apache.org/ftp.html

http://blog.csdn.net/column/details/sys-communication.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM