Spring Integration sftp 技術之 SFTP Outbound Gateway


本篇博文介紹spring integration sftp技術中的sftp outbound gateway相關內容。Sftp outbound gateway 其實質就是提供一組命令(如圖1)來實現對服務器上文件的交互操作,包括文件的獲取(文件對象和文件名等)、上傳(單文件和多文件)、下載(單文件和多文件),刪除,移動。具體在開發的過程中可以使用多種配置方式如xml,springboot等。本文在介紹SFTP Outbound Gateway 的基礎上,使用SpringBoot開發框架進行相應的開發實踐。

1.命令組
1.1 ls

該命令的功能是獲取遠程文件,包括文件對象和文件路徑名稱等,具體返回值根據配置的選項:

  • -1 :獲取一組遠程文件的文件名;默認是獲取一組FileInfo對象;
  • -a:獲取所有的文件(包括開始的文件,遞歸時使用);
  • - f:檢索結果不用排序;
  • -dirs: 包括文件夾,默認是包括的;
  • -links:包括鏈接符號,默認是包括的;
  • -R:遞歸方式獲取遠程文件夾下所有文件,默認不遞歸的。

除此之外,還可以配置文件名過濾器等;
命令返回值: 通過ls命令獲取的message payload,是一組文件名或者FileInfo對象,對象中提供了有關文件的修改時間,權限以及其他的信息;

ls命令作用的遠程文件夾,由header頭的file_remoteDirectory屬性提供;

建議提醒:如果使用-R遞歸選擇項,文件名將含有子文件夾,表明遞歸文件的相對路徑;如果使用-dirs選項,每一個遞歸的子文件夾,返回的元素中將含有子文件夾名;在這種情況下,建議不用使用-1羅列文件名,因為返回的元素中不能夠區分是文件還是文件夾?建議返回FileInfo對象。

下面是開發示例:

@Bean
@ServiceActivator(inputChannel = "sftpChannel2")
public MessageHandler handler2() {
	//指定session配置和命令
    SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"ls","payload");
    sftpOutboundGateway.setOptions("-dirs"); //配置項
    return sftpOutboundGateway;
}
//使用Gateway觸發
@MessagingGateway
public interface MessageGateway {
    @Gateway(requestChannel = "sftpChannel2")
    List<FileInfo> listFileName(String dir); //指定遠程文件夾
 }
1.2 nlst

該命令提供檢索遠程文件名的功能,相當於ls -1的命令;支持如下配置:

  • -f:文件名不排序;
    nlst命令作用的遠程文件夾,由header頭的file_remoteDirectory提供。

返回值:通過nlst獲取的文件message payload,就是一組文件名列表;

1.3 get

該命令由於獲取一個遠程的文件,支持如下的選項:

  • -P:文件下載之后,保持文件在本地的時間戳同遠程服務器一致;
  • -stream:以流的方式獲取遠程文件;
  • -D:文件成功轉移之后,刪除遠程文件;如果FileExistsMode設置為IGNORE,遠程文件不會刪除。

file_remoteDirectory 頭包含了文件的遠程路徑,file_remoteFile屬性為文件名;

返回值:使用get方法獲取的message的payload是一個File對象,如果使用-straem,則payload就是一個InputStream文件流。

對於文本文件,有個通用的案例,使用file splitter 或 stream transformer。當以文件流的形式獲取遠程文件,Session在結束之后要及時關閉. Session由closeableResource屬性header頭文件,IntegrationMessageHeaderAccessor提供了流資源的關閉操作。

1.4 mget

該命令用來基於特定的文件模式過濾器獲取多個文件,支持如下的設置:

  • -P:保留遠程文件的時間戳;
  • -R:遞歸下載所有符合的文件;
  • -x:沒有文件匹配文件篩選模式,拋出異常,並返回空集合;
  • -D:文件成功轉移之后。如何FileExistsMode=IGNORE,本地文件存在,文件不會刪除;

message payload返回的是List< >對象,集合元素是File。

注意:
在5.0版本之后,若FileExistsMode=IGNORE,payload不再包含已經存在的文件對象。

remote path的表達式應該是以結尾,類似myfiles/,表示獲取完整的文件夾樹myfiles;

注意,在版本5.0之后,MGET命令可以設置FileExistsMode.REPLACE_IF_MODIFIED模式,去同步整個文件夾,被修改的文件的時間戳也會相應修改。不用關心-P模式;

-R模式,默認情況下是整個文件夾,同時也支持設置文件或文件夾過濾器FileListFilter; 該過濾器提供兩種方式filename-pattern或者filename-regex屬性;例如filename-regex="(subDir|.*1.txt)" 獲取subDir下所有以1.txt結尾的文件;
通常,將在local-directory-expression中使用#remoteDirectory變量,以便遠程目錄結構在本地保留。

下面是開發示例:

@Bean
@ServiceActivator(inputChannel = "sftpChannel3")
public MessageHandler handler3() {
    SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"mget","payload");
    sftpOutboundGateway.setOptions("-R");
    sftpOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE_IF_MODIFIED);
    sftpOutboundGateway.setLocalDirectory(new File("E:\\sftp_tmp_dir"));
    sftpOutboundGateway.setAutoCreateLocalDirectory(true);  
    return sftpOutboundGateway;
}
@MessagingGateway
public interface MessageGateway {
	@Gateway(requestChannel = "sftpChannel3")
	List<File> listFile(String dir);
}
1.5 put

該命令是發送單個文件到遠程服務器;
message的payload可以是File對象,byte[]數組,或者字符串;
remote-filename-generator用來命名遠程文件。其他的屬性如remote-directory,temporary-remote-directory等等;
返回值:put命令的message的payload的返回值是string,包含文件傳輸后在服務器上的整個路徑;

1.6 mput

該命令是發送多個文件到服務器,支持如下配置:

  • -R: 遞歸發送文件和子文件夾下的所有文件;

message payload必須是文件或者文件路徑字符串,代表了本地文件夾;自版本5.1之后,也支持文件或者路徑字符串集合;
put的配置,同樣適合mput,同時除此之外,還提供過濾文件的mput-pattern,mput-regex,mput-filter等;
版本4.3之后,支持設置文件的權限;

返回值:mput執行之后的返回值,是一個List,包含文件轉移之后的路徑集合。

下面是開發示例:

    //!important,put命令需要借助與sftpRemoteFileTemplate。
    //看源碼,可以發現outbound gateway 有多種構造函數;
    @Bean
    @ServiceActivator(inputChannel = "sftpChannel4")
    public MessageHandler handler4(){
        SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
        sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpRemoteFileTemplate,"put","payload");
        sftpOutboundGateway.setBeanFactory(beanFactory);
        return sftpOutboundGateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel5")
    public MessageHandler handler5(){
        SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
        sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));        
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpRemoteFileTemplate,"mput","payload");
       //配置過濾器
        sftpOutboundGateway.setMputFilter(new FileListFilter<File>() {
            @Override
            public List<File> filterFiles(File[] files) {
            	if(...){
            		...
            	}
                return null;
            }
        });
        sftpOutboundGateway.setBeanFactory(beanFactory);
        return sftpOutboundGateway;
    }
1.7 rm

該命令是刪除遠程文件。
如果刪除成功,message payload的返回值是Boolean.TRUE;否則是Boolean.FALSE。
file_remoteDirectory頭包含遠程文件屬性;

下面是開發示例:

@Bean
@ServiceActivator(inputChannel = "sftpChannel6")
public MessageHandler handler6(){
    SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"rm","payload");
    sftpOutboundGateway.setBeanFactory(beanFactory);
    return sftpOutboundGateway;
}
1.8 mv

該命令是移動文件在遠程服務器上的位置。
返回值:轉移成功,返回true,否則是false;

下面是開發示例:

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel7")
    public MessageHandler handler7(){
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"mv","'send/22.TXT'");
        sftpOutboundGateway.setRenameExpression(new LiteralExpression("send1/22.TXT"));
        sftpOutboundGateway.setBeanFactory(beanFactory);
        return sftpOutboundGateway;
    }

 以下是干貨(測試用例):

首先是POM文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.flower.springintegration</groupId>
    <artifactId>spring-integration-samples</artifactId>

    <version>v0.0.1</version>

    <name>SpringIntegrationExamples</name>

    <description>Spring Integration Samples</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

       <!-- <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>-->

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-sftp</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz -->
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.2.1</version>
        </dependency>


        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
            <version>1.4.0.RELEASE</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>4.3.2.RELEASE</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

 接下來是yml文件配置:

spring:
  datasource:
    type: com.zaxxer.hikari.HikariDataSource
    url: jdbc:mysql://localhost:3306/springbatchexample?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8
    username: root
    password: root

sftp:
    host: 127.0.0.1
    port: 23
    user: 47gamer
    password: wdnmd
    filePath:
        send: /send
        achieve: /achieve
        localPath: /sftp_tmp_dir

 然后是Sftp網關配置類SftpConfig.java

package com.flower.integration.sftp;

import com.jcraft.jsch.ChannelSftp.LsEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.annotation.*;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.FileListFilter;
import org.springframework.integration.file.remote.FileInfo;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter;
import org.springframework.integration.sftp.gateway.SftpOutboundGateway;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource;
import org.springframework.integration.sftp.outbound.SftpMessageHandler;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;
import java.io.File;
import java.util.List;
import java.util.Properties;

/**
 * Sftp configuration.
 *
 * @Autor 47Gamer
 * @Date 2019-01-18
 */
@Configuration
@DependsOn("sftpProperty")
public class SftpConfig {

    @Resource(name = "sftpProperty")
    private SftpProperty sftpProperty;

    private static Logger log = LoggerFactory.getLogger(SftpConfig.class);


    @Value("${sftp.host}")
    private String sftpHost;

    @Value("${sftp.port:23}")
    private int sftpPort;

    @Value("${sftp.user}")
    private String sftpUser;

    @Value("${sftp.privateKey:#{null}}")
    private org.springframework.core.io.Resource sftpPrivateKey;

    @Value("${sftp.privateKeyPassphrase:}")
    private String sftpPrivateKeyPassphrase;

    @Value("${sftp.password}")
    private String sftpPassword;

   /* @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(false);
        factory.setHost(sftpProperty.getHost());
        factory.setPort(sftpProperty.getPort());
        factory.setUser(sftpProperty.getUser());
        Properties jschProps = new Properties();
        //!important 必須配置PreferredAuthentications,否則程序控制台會詢問user name 和 password。
        jschProps.put("StrictHostKeyChecking", "no");
        jschProps.put("PreferredAuthentications",
                "password,gssapi-with-mic,publickey,keyboard-interactive");

        factory.setSessionConfig(jschProps);

      //  if (sftpPassword != null) {
            factory.setPassword(sftpProperty.getPassword());
//        } else {
//            factory.setPrivateKey(sftpPrivateKey);
//            factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
//        }

        factory.setAllowUnknownKeys(true);
        //        //設置緩存的屬性,緩存的size(), waitTimeout().
        CachingSessionFactory<LsEntry> cachingSessionFactory =
                new CachingSessionFactory<LsEntry>(factory);
        cachingSessionFactory.setPoolSize(10);
//        cachingSessionFactory.setSessionWaitTimeout(1000);

        return cachingSessionFactory;
//        return new CachingSessionFactory<LsEntry>(factory);
    }*/

    /**
     * 創建 spring-integration-sftp session
     * 避免使用jsch原生的創建session的方式
     *
     * @return SessionFactory<LsEntry>
     */
    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory(){
        System.out.println("######################################################");
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setUser(sftpProperty.getUser());
        factory.setHost(sftpProperty.getHost());
        factory.setPort(sftpProperty.getPort());
        factory.setPassword(sftpProperty.getPassword());

        Properties jschProps = new Properties();
        //!important 必須配置PreferredAuthentications,否則程序控制台會詢問user name 和 password。
        jschProps.put("StrictHostKeyChecking", "no");
        jschProps.put("PreferredAuthentications",
                "password,gssapi-with-mic,publickey,keyboard-interactive");

        factory.setSessionConfig(jschProps);
        factory.setAllowUnknownKeys(true);

        //設置緩存的屬性,緩存的size(), waitTimeout().
        CachingSessionFactory<LsEntry> cachingSessionFactory =
                new CachingSessionFactory<LsEntry>(factory);
//        cachingSessionFactory.setPoolSize(2000);


        return  cachingSessionFactory;
    }

    /**
     * 配置Outbound Channel Adapter.
     *
     * 實質上就是一個MessageHandler,接收Message Channel發送的信息流.
     * @return MessageHandler
     */
    @ServiceActivator(inputChannel = "fileInChannel")
    @Bean
    public SftpMessageHandler sftpMessageHandler(){
        SftpMessageHandler sftpMsgHandler = new SftpMessageHandler(sftpSessionFactory());

        sftpMsgHandler.setRemoteDirectoryExpression(
                new LiteralExpression(sftpProperty.getSftpAchievePath()));
        sftpMsgHandler.setAutoCreateDirectory(true);
        sftpMsgHandler.setCharset("UFT-8");
        return sftpMsgHandler;
    }


    /**
     * 配置 Inbound Channel Adapter
     *
     * 監控sftp服務器文件的狀態。一旦由符合條件的文件生成,就將其同步到本地服務器。
     * 需要條件:inboundFileChannel的bean;輪詢的機制;文件同步bean,SftpInboundFileSynchronizer;
     */
    @Bean
    @InboundChannelAdapter(value = "inboundFileChannel",
            poller = @Poller(cron = "0 1/10 * * * *", maxMessagesPerPoll = "1"))
    public MessageSource<File> fileMessageSource() {
        System.out.println("=========================================================");

        //創建sftpInboundFileSynchronizer,並綁定到message source.
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());

        //自動創建本地文件夾
        source.setAutoCreateLocalDirectory(true);
        source.setLocalDirectory(new File(sftpProperty.getLocalTempDir()));

        //設置文件過濾器
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());

        return source;

    }

    /**
     * 為Inbound-channel-adapter提供bean
     */
    @Bean
    public DirectChannel inboundFileChannel() {
        return new DirectChannel();
    }

    /**
     * SftpInboundFileSynchronizer,
     *
     *  同步sftp文件至本地服務器.
     *      <1> 可以放在service中獲取bean使用.toLocal方法;
     *      <2> 也可以使用inbound-channel-adapter中,做監控文件服務器的動態。
     *
     * @return SftpInboundFileSynchronizer
     */
    @Bean(name = "synFileChannel")
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer (){

        SftpInboundFileSynchronizer fileSynchronize =
                new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronize.setDeleteRemoteFiles(true);
        fileSynchronize.setPreserveTimestamp(true);

        //!important
        fileSynchronize.setRemoteDirectory(sftpProperty.getSftpSendPath());
        fileSynchronize.setFilter(new SftpSimplePatternFileListFilter("*.*"));
        //fileSynchronize.setLocalFilenameGeneratorExpression( );
        fileSynchronize.setPreserveTimestamp(true);
        return fileSynchronize;
    }

    ///////////////////////////////////////////////////////////////////////

    /**
     * 配置 SFTP Outbound Gateway
     *
     * @return MessageHandler
     */
    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {


        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"ls","payload");
//        MessageChannel message = sftpOutboundGateway.getOutputChannel();

        sftpOutboundGateway.setLocalDirectory(new File("E:\\sftp_tmp_dir"));
        sftpOutboundGateway.setAutoCreateLocalDirectory(true);  // TODO dynanic path
        return sftpOutboundGateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel2")
    public MessageHandler handler2() {
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"ls","payload");
        sftpOutboundGateway.setOptions("-dirs");
        sftpOutboundGateway.setLocalDirectory(new File("E:\\sftp_tmp_dir"));
        sftpOutboundGateway.setAutoCreateLocalDirectory(true);  // TODO dynanic path

        return sftpOutboundGateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel3")
    public MessageHandler handler3() {
        System.out.println("=========================         3         ================================");

        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"mget","payload");
        sftpOutboundGateway.setOptions("-R");
        sftpOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE_IF_MODIFIED);
        sftpOutboundGateway.setLocalDirectory(new File("E:\\sftp_tmp_dir"));
        sftpOutboundGateway.setAutoCreateLocalDirectory(true);  // TODO dynanic path

        return sftpOutboundGateway;
    }

    @Autowired
    private BeanFactory beanFactory;

//outbound gateway,put命令需要借助與sftpRemoteFileTemplate。
    //看源碼,可以發現outbound gateway 有多種構造函數;
    @Bean
    @ServiceActivator(inputChannel = "sftpChannel4")
    public MessageHandler handler4(){
        SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
        sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));

        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpRemoteFileTemplate,"put","payload");
//        sftpOutboundGateway.setLocalDirectoryExpressionString("/get/");
        sftpOutboundGateway.setBeanFactory(beanFactory);
        return sftpOutboundGateway;
    }


    @Bean
    @ServiceActivator(inputChannel = "sftpChannel5")
    public MessageHandler handler5(){
        SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
        sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));


        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpRemoteFileTemplate,"mput","payload");
//        sftpOutboundGateway.setLocalDirectoryExpressionString("/get/");
//        sftpOutboundGateway.setOptions("-R");
        sftpOutboundGateway.setMputFilter(new FileListFilter<File>() {
            @Override
            public List<File> filterFiles(File[] files) {
                return null;
            }
        });
        sftpOutboundGateway.setBeanFactory(beanFactory);
        return sftpOutboundGateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel6")
    public MessageHandler handler6(){

        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"rm","payload");
        sftpOutboundGateway.setBeanFactory(beanFactory);
        return sftpOutboundGateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel7")
    public MessageHandler handler7(){

//        SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
//        sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));


        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"mv","'send/22.TXT'");
//        sftpOutboundGateway.setRenameExpression(new LiteralExpression("/send1"));
//        sftpOutboundGateway.setChmod(777);
//        sftpOutboundGateway.setRenameExpressionString("send1");

        sftpOutboundGateway.setRenameExpression(new LiteralExpression("send1/22.TXT"));
//        sftpOutboundGateway.setAutoCreateLocalDirectory(true);
        sftpOutboundGateway.setBeanFactory(beanFactory);
        return sftpOutboundGateway;
    }


    @MessagingGateway
    public interface UploadGateway {

        @Gateway(requestChannel = "sftpChannel")
        List<FileInfo> listFileInfo(String dir);

        @Gateway(requestChannel = "sftpChannel2")
        List<FileInfo> listFileName(String dir);

        @Gateway(requestChannel = "sftpChannel3")
        List<File> listFile(String dir);

        @Gateway(requestChannel = "sftpChannel4")
        String putFile(File source);

        @Gateway(requestChannel = "sftpChannel5")
        List<String> mputFile(File directory);

        @Gateway(requestChannel = "sftpChannel6")
        boolean removeFile(String file);

        @Gateway(requestChannel = "sftpChannel7")
        boolean moveFile(String file);

    }

}

映射yml文件里的stfp配置實體SftpProperty .java

package com.flower.integration.sftp;


import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component("sftpProperty")
@ConfigurationProperties(prefix = "sftp")
public class SftpProperty {


    private String host;

    private Integer port;

    private String user;

    private String password;

    private Map<String,String> filePath;

    ////////////////////////////////////////////////////
    public String getSftpSendPath(){
        return filePath.get("send");
    }

    public String getSftpAchievePath(){
        return filePath.get("achieve");
    }

    public String getLocalTempDir(){
        return filePath.get("localPath");
    }

    ///////////////////////////////////////////////////
    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public Integer getPort() {
        return port;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public Map<String, String> getFilePath() {
        return filePath;
    }

    public void setFilePath(Map<String, String> filePath) {
        this.filePath = filePath;
    }
}

 Service層:SftpService.java

package com.flower.integration.sftp;

import com.jcraft.jsch.ChannelSftp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;

@Service("sftpService")
public class SftpService {


    private Logger log = LoggerFactory.getLogger(this.getClass());

    @Resource(name = "fileInChannel")
    protected MessageChannel messageChannel;

    @Autowired
    private SftpProperty sftpProperty;

    @Autowired
    private SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;

    /**
     * 發送文件到SFTP, 借用MessageChannel
     *
     * @param localFilePath file local path.
     */
    public void sendFileToSftp(String localFilePath) {

        Path filePath = Paths.get(localFilePath);
        if (filePath.toFile().exists()) {
            Message<File> fileMessage = MessageBuilder.withPayload(filePath.toFile()).build();
            boolean result = messageChannel.send(fileMessage);
            String resultMsg = result ? "Success" : "Failure";
            log.info("File send to sftp {}, File: {}.", resultMsg, filePath.getFileName());
        } else {
            log.warn("No found file. {}", filePath.getFileName());
        }
    }

    /**
     * 刪除sftp文件
     *
     * @param sessionFactory  sftp server.
     * @param remoteDirectory file directory.
     * @param fileName        file
     * @return return true is remove success,or false.
     */
    public boolean removeSftpRemoteFile(SessionFactory<ChannelSftp.LsEntry> sessionFactory, String remoteDirectory, String fileName) {
        SftpRemoteFileTemplate sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sessionFactory);

        boolean direCheck = remoteDirectory.endsWith(sftpRemoteFileTemplate.getRemoteFileSeparator());
        if (!direCheck) {
            remoteDirectory += sftpRemoteFileTemplate.getRemoteFileSeparator();
        }
        boolean fileExist = sftpRemoteFileTemplate.exists(remoteDirectory + fileName);
        if (fileExist) {
            return sftpRemoteFileTemplate.remove(remoteDirectory + fileName);
        } else {
            log.warn("No found file in the directory, {}.", remoteDirectory);
            return false;
        }
    }

    /**
     * sftp文件重命名
     *
     * @param sessionFactory  sftp server
     * @param remoteDirectory file directory path.
     * @param sourceFileName  source file name
     * @param targetFileName  rename target name
     */
    public void renameSftpRemoteFile(SessionFactory<ChannelSftp.LsEntry> sessionFactory, String remoteDirectory,
                                     String sourceFileName, String targetFileName) {
        SftpRemoteFileTemplate fileTemplate = new SftpRemoteFileTemplate(sessionFactory);

        boolean direCheck = remoteDirectory.endsWith(fileTemplate.getRemoteFileSeparator());
        if (!direCheck) {
            remoteDirectory += fileTemplate.getRemoteFileSeparator();
        }
        boolean fileExist = fileTemplate.exists(remoteDirectory + sourceFileName);
        if (fileExist) {
            fileTemplate.rename(remoteDirectory + sourceFileName, remoteDirectory + targetFileName);
        } else {
            log.warn("No found file in the directory, {}.", remoteDirectory);
        }
    }

    /**
     * sftp文件是否存在
     *
     * @param sessionFactory sftp server
     * @param directory      file directory
     * @param fileName       file name
     * @return true if file exist, or false.
     */
    public boolean fileExist(SessionFactory<ChannelSftp.LsEntry> sessionFactory, String directory, String fileName) {
        SftpRemoteFileTemplate fileTemplate = new SftpRemoteFileTemplate(sessionFactory);
        boolean fileNameCheck = directory.endsWith(fileTemplate.getRemoteFileSeparator());
        if (!fileNameCheck) {
            directory += fileTemplate.getRemoteFileSeparator();
        }

        return fileTemplate.exists(directory + fileName);
    }


    /**
     * sftp檢索文件
     *
     * @param sessionFactory sftp server
     * @param directory      file directory
     * @param fileNameFilter file name filter
     * @return file name list match filter
     */
    public List<String> lsFileOfDirectory(SessionFactory<ChannelSftp.LsEntry> sessionFactory,
                                          String directory, String fileNameFilter) {
        SftpRemoteFileTemplate fileTemplate = new SftpRemoteFileTemplate(sessionFactory);

        if (!directory.endsWith(fileTemplate.getRemoteFileSeparator())) {
            directory += fileTemplate.getRemoteFileSeparator();
        }
        ChannelSftp.LsEntry[] files = fileTemplate.list(directory + fileNameFilter);
        List<String> fileNames = new ArrayList<>();
        for (ChannelSftp.LsEntry lsEntry : files) {
            boolean isDir = lsEntry.getAttrs().isDir();
            if (!isDir) {
                fileNames.add(lsEntry.getFilename());
            }
        }
        return fileNames;
    }

    @Autowired
    private BeanFactory beanFactory;

    /**
     * 本地發送文件至sftp服務器
     *
     * @param sessionFactory sftp server
     * @param filePath file local path
     * @param targetPath target directory
     * @param mode FileExistsModel
     *             NULL:默認,替換文件;
     *             APPEND:若文件存在,追加內容;
     *             REPLACE:替換文件;
     *             APPEND_NO_FLUSH:
     *             FAIL:
     *             IGNORE:
     */
    public void sendSftpFile(SessionFactory<ChannelSftp.LsEntry> sessionFactory,
                             String filePath, String targetPath, FileExistsMode mode){
        SftpRemoteFileTemplate fileTemplate = new SftpRemoteFileTemplate(sessionFactory);
        try {
            //設置遠程sftp服務器配置
            fileTemplate.setRemoteDirectoryExpression(new LiteralExpression(targetPath));
            fileTemplate.setAutoCreateDirectory(true);
            fileTemplate.setCharset("UTF-8");
            fileTemplate.setBeanFactory(beanFactory);
            fileTemplate.afterPropertiesSet();
        } catch (Exception e){
            log.warn(e.getMessage());
        }

        Path file = Paths.get(filePath);
        if (file.toFile().exists()){
            Message<File> message = MessageBuilder.withPayload(file.toFile()).build();
            if (null == mode){
                fileTemplate.send(message);
            } else {
                //fileTemplate.setFileNameGenerator(new DefaultFileNameGenerator());
                if (fileTemplate.isUseTemporaryFileName()){
                    fileTemplate.setUseTemporaryFileName(false);
                }
                fileTemplate.send(message, mode);
            }
        }
    }


    @Resource(name = "synFileChannel")
    private SftpInboundFileSynchronizer sftpInboundFileSynchronizer;

    public void synchronizedFileToLocal(String localDir){
        File dir = Paths.get(localDir).toFile();
        sftpInboundFileSynchronizer.synchronizeToLocalDirectory(dir);
    }

}

Controller層:用於測試service層方法

package com.flower.integration.sftp;


import com.jcraft.jsch.ChannelSftp;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.file.remote.FileInfo;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.File;
import java.util.List;


@RestController
public class TestController {


    @Autowired
    private SftpService sftpService;

    @Autowired
    private SftpConfig.UploadGateway uploadGateway;

    @GetMapping("/sftp")
    public void testSftpSpringBatch() {

        List<FileInfo> fileList = uploadGateway.listFileInfo("/send");

        for (FileInfo file : fileList) {
            String fileName = file.getFilename();
            String filePath = file.getRemoteDirectory();
            ChannelSftp.LsEntry fileInfo = (ChannelSftp.LsEntry) file.getFileInfo();
            boolean isDir = file.isDirectory();
            boolean isLink = file.isLink();
            long modifyTime = file.getModified();
            System.out.println("=============================  " + fileName);
            System.out.println("==================  " + filePath);
            System.out.println("==================  " + fileInfo.getFilename());
            System.out.println("==================  " + isDir);
            System.out.println("==================  " + isLink);
            System.out.println("==================  " + modifyTime);
        }
    }

    @GetMapping("/sftp2")
    public void testSftpSpringBatch2() {

        List<FileInfo> fileNameList = uploadGateway.listFileName("/send");

        for (FileInfo fileName : fileNameList) {

            System.out.println("=============================  " + fileName);
        }
    }


    @GetMapping("/sftp3")
    public void testSftpSpringBatch3() throws InterruptedException {

        List<File> fileNameList = uploadGateway.listFile("/send");

        for (File fileName : fileNameList) {
            System.out.println("=============================  " + fileName);
        }
    }

    @GetMapping("/sftp4")
    public void testSftpSpringBatch4() throws InterruptedException {

        String result = uploadGateway.putFile(new File("G:\\Redis.pdf"));

        System.out.println("=============================  " + result);
    }

    @GetMapping("/sftp5")
    public void testSftpSpringBatch5() throws InterruptedException {

        List<String> result = uploadGateway.mputFile(new File("G:\\js"));


        for (String fileName : result) {
            System.out.println("=============================  " + fileName);
        }
    }

    @GetMapping("/sftp6")
    public void testSftpSpringBatch6() throws InterruptedException {

        boolean result = uploadGateway.removeFile("/send/2.txt");



            System.out.println("=============================  " + result);

    }

    @GetMapping("/sftp7")
    public void testSftpSpringBatch7() throws InterruptedException {

        boolean result = uploadGateway.moveFile("/22.TXT");



        System.out.println("=============================  " + result);

    }
}

SpringIntegrationApp.java啟動類

package com.flower.integration;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
//@EnableScheduling
public class SpringIntegrationApp {

    public static void main(String[] args) {
        SpringApplication.run(SpringIntegrationApp.class, args);
        System.out.println("Spring-Integration application start success.");
    }

}

 junit單元測試類:自行在test文件夾下建立並測試SpringIntegrationExamplesApplicationTests.java

package com.flower.integration.sftp;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringIntegrationExamplesApplicationTests {

    @Test
    public void contextLoads() {
    }

}

進行單元測試SftpServiceTest.java

package com.flower.integration.sftp;

import com.flower.integration.SpringIntegrationApp;
import com.jcraft.jsch.ChannelSftp;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.List;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = SpringIntegrationApp.class)
@EnableIntegration
public class SftpServiceTest {

    @Autowired
    private SftpService sftpService;

    @Autowired
    private SftpProperty sftpProperty;

    @Autowired
    private SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;

    @Before
    public void before (){
        System.out.println("00000000000000000000000000000000000000000000000000000");
    }

    @After
    public void after(){

    }

    @Test
    public void sendFileToSftp() {

    //sftpService.sendFileToSftp();
    }

    @Test
    public void testRemoveSftpRemoteFile(){
        boolean result = sftpService.removeSftpRemoteFile(
                sftpSessionFactory, sftpProperty.getSftpSendPath(),"user333.csv");

        System.out.println("=======" + result);
    }

    @Test
    public void testRenameSftpRemoteFile(){
        sftpService.renameSftpRemoteFile(sftpSessionFactory, sftpProperty.getSftpSendPath(),"user.csv",
                "user111.csv");
    }

    @Test
    public void testfileExist(){
       boolean result = sftpService.fileExist(sftpSessionFactory, sftpProperty.getSftpSendPath(),"user111.csv");
        System.out.println("++++++++++++" + result);
    }

    @Test
    public void testlsFileOfDirectory(){
        List<String> result = sftpService.lsFileOfDirectory(sftpSessionFactory,
                sftpProperty.getSftpSendPath(),"*TXT");
        System.out.println("-------------------" + result.toString());
    }

    @Test
    public void testSendSftpFile() throws Exception {
        sftpService.sendSftpFile(sftpSessionFactory,
                "G:\\jquery.txt", sftpProperty.getSftpAchievePath(), FileExistsMode.REPLACE);
    }

    @Test
    public void testSynchronizedFileToLocal(){
        sftpService.synchronizedFileToLocal(sftpProperty.getLocalTempDir());
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM