Spring Integration sftp 專欄詳解


Spring Integration Sftp 文件傳送

目前在國內項目開發中,使用Spring Integration技術的比較少,尤其是中文的參考文獻和項目案例,更是罕有。鑒於此,本文詳細介紹spring integration sftp模塊在Sftp服務器和本地服務器之間文件的傳送。
SFTP(Secure File Transfer Protocol) 安全文件傳送協議,允許文件通過流的形式在網絡之間進行傳輸。文件傳輸的過程涉及到兩個重要的因素,安全渠道(secure channel,如SSH)以及SFTP聯接身份的識別(SFTP session)。Spring Integration提供三種方式來支持文件在SFTP服務器的發送和接收:Inbound Channel Adapter,Outbound Channel Adapter,Outbound Gateway。

幾個概念:

(1)SFTP Session Factory
在配置SFTP adapters之前,需要配置SFTP Session Factory;Spring Integration提供了如下xml和spring boot的定義方式。
每次使用 SFTP adapter,都需要Session Factory會話對象,一般情況,都會創建一個新的SFTP會話。同時還提供了Session的緩存功能。Spring integration中的Session Factory是依賴於JSch庫來提供

JSch支持在一個連接配置上多個channel的操作。原生的JSch技術開發,在打開一個channel操作之前,需要建立Session的連接。同樣的,默認情況,Spring Integration為每一個channel操作使用單獨的物理連接。在3.0版本發布之后,Cache Session Factory 出現 (CachingSessionFactory),將Session Factory包裝在緩存中,支持Session共享,可以在一個連接上支持多個JSch Channel的操作。如果緩存被重置,在最后一次channel關閉之后,才會斷開連接。

 

下面是XML方式定義的Session Factory的bean:

<beans:bean id="sftpSessionFactory"
    class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <beans:property name="host" value="localhost"/>
    <beans:property name="privateKey" value="classpath:META-INF/keys/sftpTest"/>
    <beans:property name="privateKeyPassphrase" value="springIntegration"/>
    <beans:property name="port" value="22"/>
    <beans:property name="user" value="songhj"/>
</beans:bean>

 下面是使用springboot方式定義的Session Factory:

@Bean
public SessionFactory<LsEntry> sftpSessionFactory(){
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setUser("songhj");
    factory.setHost("127.0.0.1");
    factory.setPort("22");
    factory.setPassword("password");
    return factory;
}

(2)SFTP Session Caching

在3.0之前的版本中,會話默認情況下是自動緩存的。可以使用cache-sessions屬性禁用自動緩存,但是該解決方案沒有提供配置其他會話緩存屬性的方法。例如,您不能限制緩存會話的數量。
3.0版本之后,提供了一個CachingSessionFactory。它提供了sessionCacheSize和sessionWaitTimeout屬性。控制緩存中維護多少活動會話(默認情況下是無界的)。如果已經達到了sessionCacheSize閾值,那么任何獲取另一個會話的嘗試都將被阻塞,直到其中一個緩存會話可用,或者直到會話的等待時間過期(默認等待時間是Integer.MAX_VALUE)。sessionWaitTimeout屬性啟用該值的配置。緩存會話,然后將其包裝在CachingSessionFactory的實例中。

<bean id="sftpSessionFactory"
    class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <property name="host" value="localhost"/>
</bean>
<bean id="cachingSessionFactory"
    class="org.springframework.integration.file.remote.session.CachingSessionFactory">
    <constructor-arg ref="sftpSessionFactory"/>
    <property name="sessionCacheSize" value="10"/>
    <property name="sessionWaitTimeout" value="1000"/>
</bean>

 從Spring Integration version 3.0開始,CachingConnectionFactory提供了resetCache()方法。當被調用時,所有空閑會話將立即關閉,而正在使用的會話將在返回到緩存時關閉。當使用isSharedSession=true時,通道是關閉的,而共享會話僅在最后一個通道關閉時才關閉。對會話的新請求將根據需要建立新的會話。

(3)RemoteFileTemplate

 從Spring Integration 3.0開始,通過SftpSession對象提供了一個新類Remote File Template。提供了Sftp文件發送、檢索、刪除和重命名文件的方法。此外,還提供了一個執行方法,允許調用者在會話上執行多個操作。在所有情況下,模板負責可靠地關閉會話。
SftpRemoteFileTemplate繼承Remote File Template,可以很容易的實現對SFTP文件的發送(包括文件追加,替換等),檢索,刪除和重命名。

 

 

 

 

 (4)SFTP Inbound Channel Adapter

SFTP Inbound Channel Adapter 是一個特殊的監聽器,這個認識很重要,它將連接到服務器,並監聽遠程文件操作事件(例如,創建新文件),並初始化文件傳輸實例。它的作用實質就是將sftp遠程服務器文件發送到本地文件夾下。
下面是XML方式的 sftp inbound channel adapter配置實例:

<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
     session-factory="sftpSessionFactory"
   channel="requestChannel"
   filename-pattern="*.txt"
   remote-directory="/foo/bar"
   preserve-timestamp="true"
   local-directory="file:target/foo"
   auto-create-local-directory="true"
   local-filename-generator-expression="#this.toUpperCase() + '.a'"
   local-filter="myFilter"
   delete-remote-files="false">
  <int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>

 從上面的配置可以理解,由inbound-channel-adapter元素創建,同時提供各種屬性的值,local-directory:文件被轉移的目標文件夾;remote-directory:文件來源遠程文件夾目錄;session-factory:session 會話工廠;channel:需要的渠道;文件過濾器以及是否刪除轉移后的文件等屬性。

下面是springboot方式配置的Sftp Inbound channel adapter:

@Bean
@InboundChannelAdapter(value = "inboundFileChannel",
        poller = @Poller(cron = "1/10 * * * * *", maxMessagesPerPoll = "1"))
public MessageSource<File> fileMessageSource() {
 
    //創建sftpInboundFileSynchronizer,並綁定到message source.
    SftpInboundFileSynchronizingMessageSource source =
            new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());

    //自動創建本地文件夾
    source.setAutoCreateLocalDirectory(true);
    source.setLocalDirectory(new File(sftpProperty.getLocalTempDir()));
    //設置文件過濾器
    source.setLocalFilter(new AcceptOnceFileListFilter<File>());
    return source;
}

/**
 * 為Inbound-channel-adapter提供bean
 */
@Bean
public DirectChannel inboundFileChannel() {
    return new DirectChannel();
}

/**
 * SftpInboundFileSynchronizer,
 *
 *  同步sftp文件至本地服務器.
 *      <1> 可以放在service中獲取bean使用.toLocal方法;
 *      <2> 也可以使用inbound-channel-adapter中,做監控文件服務器的動態。
 *
 * @return SftpInboundFileSynchronizer
 */
@Bean(name = "synFileChannel")
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer (){
    SftpInboundFileSynchronizer fileSynchronize =
            new SftpInboundFileSynchronizer(sftpSessionFactory());
    fileSynchronize.setDeleteRemoteFiles(true);
    fileSynchronize.setPreserveTimestamp(true);
    //!important
    fileSynchronize.setRemoteDirectory(sftpProperty.getSftpSendPath());
    fileSynchronize.setFilter(new SftpSimplePatternFileListFilter("*.*"));
    //fileSynchronize.setLocalFilenameGeneratorExpression( );
    fileSynchronize.setPreserveTimestamp(true);
    return fileSynchronize;
}

文件名設置:默認情況下,傳輸的文件將與原始文件具有相同的名稱。Inbound channel adapter提供local-filename-generator- Expression屬性,允許提供SpEL表達式來生成本地文件的名稱。

文件修改時間戳:從Spring Integration 3.0開始,reserve-timestamp屬性(默認為false);設置為true時,本地文件修改后的時間戳將設置為從服務器接收到的時間;否則將設置為當前時間。

文件過濾器:filename-pattern屬性指定了簡單的文件過濾模式。同時還提供了filename-regex屬性來指定使用正則表達式(例如filename-regex=".*.test$")來過濾文件,以確定檢索哪些遠程文件。除了上面的過濾器之外,spring integration還提供了其他篩選器,如AcceptOnceFileListFilter,CompositeFileListFilter,SftpPersistentAcceptOnceFileListFilter,前者的作用是避免同步以前獲取的文件。

(5)SFTP Outbound Channel Adapter

Spring Integration的 sftp outbound channel adapter是一個特殊的Message Handler,它將連接到遠程sftp服務器目錄。並為接收到的Message的payload,初始化一個文件傳輸渠道。 sftp outbound adapter支持以下幾種傳輸對象:
1)Java.io.File:實際文件對象;
2)byte[]:字節數組
3)Java.lang.String:字符串

下面是基於xml格式配置:

<int-sftp:outbound-channel-adapter id="sftpOutboundAdapter"
				session-factory="sftpSessionFactory"
				channel="inputChannel"
				charset="UTF-8"
				remote-directory="foo/bar"
				remote-filename-generator-expression="payload.getName() + '-foo'"/>

由上面的配置,可以看到outbound channel adapter的屬性;可以動態計算文件名或現有目錄路徑的表達式。目前Spring Integration並沒有提供單獨的注解來配置此adapter,可以使用@ServiceActivator 定義bean,來實現該配置。

下面是基於Spring boot框架配置,配置了Inbound Channel Adapter。

@SpringBootApplication
public class SftpJavaApplication {
    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(port);
        factory.setUser("foo");
        factory.setPassword("foo");
        factory.setAllowUnknownKeys(true);
        factory.setTestSession(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }
    //配置Sftp Inbound Channel Adapter.
    //Inbound Channel Adapter實質就是一個服務監控器,監控sftp上服務器上文件的創建。
    @Bean
    @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("sftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }
    //配置Sftp Outbound Channel Adapter.
    //Outbound Channel Adapter實質上就是一個MessageHandler,接收發送的消息。
    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }
        };
    }
}

 Channel Adapter*
管道適配器,其就是一個端點Endpoint,顧名思義,就是用來適配連接Message channel與其他系統的。Channel Adapter分為inbound和outbound。剛一開始接觸spring-integration時對這兩個概念的非常不理解,在這里可以簡單化,以當前的項目spring-integration為基准,所謂inbound就是從其他系統接收Message,獲取資源,如sftp,file,Http等。而outbound就是將資源通過消息管道,發送資源到target目標服務器。

 

 

 

 (6)Service Activator

Service activator的作用時將存在的service注冊為一個端點,將該服務實例同spring-integration消息服務連接起來。該端點必須配置input channel,若調用的方法有返回值,還可以配置out channel。

Service activator調用某個服務的操作來處理消息,如在下面的發送文件到sftp服務,就是將SftpMessageHandler來處理channel中的消息。

Spring-Integration-Sftp實戰測試:

一、導入依賴

<dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-sftp</artifactId>
</dependency>

二、Sftp配置類

@Configuration
@EnableIntegration
public class SpringSftpConfig {
 
    @Autowired
    private SftpProperties sftpProperties;
 
    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(sftpProperties.getHost());
        factory.setPort(sftpProperties.getPort());
        factory.setUser(sftpProperties.getUsername());
        if (StringUtils.isNotEmpty(sftpProperties.getPrivateKey())) {
            factory.setPrivateKey(new ClassPathResource(sftpProperties.getPrivateKey()));
            factory.setPrivateKeyPassphrase(sftpProperties.getPassphrase());
        } else {
            factory.setPassword(sftpProperties.getPassword());
        }
        factory.setAllowUnknownKeys(true);
        CachingSessionFactory<ChannelSftp.LsEntry> cachingSessionFactory = new CachingSessionFactory<>(factory);
        cachingSessionFactory.setPoolSize(10);
        cachingSessionFactory.setSessionWaitTimeout(10000);
        return cachingSessionFactory;
    }
 
    @Bean
    public SftpRemoteFileTemplate sftpRemoteFileTemplate() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }
 
    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(true);
        fileSynchronizer.setRemoteDirectory(sftpProperties.getRoot());
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.txt"));
        return fileSynchronizer;
    }
 
    @Bean
    @ServiceActivator(inputChannel = "lsChannel")
    public MessageHandler lsHandler() {
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), "ls", "payload");
        sftpOutboundGateway.setOptions("-dirs"); //配置項
        return sftpOutboundGateway;
    }
 
    @Bean
    @ServiceActivator(inputChannel = "downloadChannel")
    public MessageHandler downloadHandler() {
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), "mget", "payload");
        sftpOutboundGateway.setOptions("-R");
        sftpOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE_IF_MODIFIED);
        sftpOutboundGateway.setLocalDirectory(new File(sftpProperties.getLocalPath()));
        sftpOutboundGateway.setAutoCreateLocalDirectory(true);
        return sftpOutboundGateway;
    }
 
    @Bean
    @ServiceActivator(inputChannel = "uploadChannel")
    public MessageHandler uploadHandler() {
        SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
        handler.setRemoteDirectoryExpression(new LiteralExpression(sftpProperties.getRoot()));
        handler.setFileNameGenerator(message -> {
            if (message.getPayload() instanceof File) {
                return ((File) message.getPayload()).getName();
            } else {
                throw new IllegalArgumentException("File expected as payload.");
            }
        });
        return handler;
    }
 
    @MessagingGateway
    public interface SftpGateway {
        @Gateway(requestChannel = "lsChannel")
        List<FileInfo> listFile(String dir);
 
        @Gateway(requestChannel = "downloadChannel")
        List<File> downloadFiles(String dir);
 
        @Gateway(requestChannel = "uploadChannel")
        void uploadFile(File file);
    }
 
}

其中,SftpProperties是第二篇中的配置類。SessionFactory來創建連接工廠,進行連接操作。SftpRemoteFileTemplate是Spring-integration-sftp進行sftp操作的模板類,相當於jsch的ChannelSftp。但是SftpRemoteFileTemplate的上傳下載方法不大好用,是通過InputStream和OutputStream進行操作的。因此這里通過Spring integration的MessageHandler來進行上傳下載操作。

@Autowired
private SpringSftpConfig.SftpGateway sftpGateway;
sftpGateway.listFile("/Users/kungfupanda").stream().forEach(System.out::println);
sftpGateway.downloadFiles("/Users/kungfupanda/sftpserver/RELREVMOD_ddmmyyyy.txt").stream().forEach(System.out::println);
sftpGateway.uploadFile(new File("RELREVMOD_ddmmyyyy.txt"));

三、啟動集成掃描

在啟動類上加上@IntegrationComponentScan注解

@IntegrationComponentScan
@SpringBootApplication
@EnableScheduling
public class CDDAlertApplication{
 
    public static void main(String... args){
        SpringApplication.run(CDDAlertApplication.class, args);
    }
}

sftp.properties配置信息文件

sftp.client.protocol=sftp
sftp.client.host=localhost
sftp.client.port=22
sftp.client.username=kungfupanda
sftp.client.password=xxxxx
sftp.client.root=/Users/kungfupanda
sftp.client.privateKey=
sftp.client.passphrase=
sftp.client.sessionStrictHostKeyChecking=no
sftp.client.sessionConnectTimeout=15000
sftp.client.channelConnectedTimeout=15000
sftp.client.localPath=./_files

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM