Spring Integration Sftp 文件傳送
目前在國內項目開發中,使用Spring Integration技術的比較少,尤其是中文的參考文獻和項目案例,更是罕有。鑒於此,本文詳細介紹spring integration sftp模塊在Sftp服務器和本地服務器之間文件的傳送。
SFTP(Secure File Transfer Protocol) 安全文件傳送協議,允許文件通過流的形式在網絡之間進行傳輸。文件傳輸的過程涉及到兩個重要的因素,安全渠道(secure channel,如SSH)以及SFTP聯接身份的識別(SFTP session)。Spring Integration提供三種方式來支持文件在SFTP服務器的發送和接收:Inbound Channel Adapter,Outbound Channel Adapter,Outbound Gateway。
幾個概念:
(1)SFTP Session Factory
在配置SFTP adapters之前,需要配置SFTP Session Factory;Spring Integration提供了如下xml和spring boot的定義方式。
每次使用 SFTP adapter,都需要Session Factory會話對象,一般情況,都會創建一個新的SFTP會話。同時還提供了Session的緩存功能。Spring integration中的Session Factory是依賴於JSch庫來提供。
JSch支持在一個連接配置上多個channel的操作。原生的JSch技術開發,在打開一個channel操作之前,需要建立Session的連接。同樣的,默認情況,Spring Integration為每一個channel操作使用單獨的物理連接。在3.0版本發布之后,Cache Session Factory 出現 (CachingSessionFactory),將Session Factory包裝在緩存中,支持Session共享,可以在一個連接上支持多個JSch Channel的操作。如果緩存被重置,在最后一次channel關閉之后,才會斷開連接。

下面是XML方式定義的Session Factory的bean:
<beans:bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<beans:property name="host" value="localhost"/>
<beans:property name="privateKey" value="classpath:META-INF/keys/sftpTest"/>
<beans:property name="privateKeyPassphrase" value="springIntegration"/>
<beans:property name="port" value="22"/>
<beans:property name="user" value="songhj"/>
</beans:bean>
下面是使用springboot方式定義的Session Factory:
@Bean
public SessionFactory<LsEntry> sftpSessionFactory(){
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setUser("songhj");
factory.setHost("127.0.0.1");
factory.setPort("22");
factory.setPassword("password");
return factory;
}
(2)SFTP Session Caching
在3.0之前的版本中,會話默認情況下是自動緩存的。可以使用cache-sessions屬性禁用自動緩存,但是該解決方案沒有提供配置其他會話緩存屬性的方法。例如,您不能限制緩存會話的數量。
3.0版本之后,提供了一個CachingSessionFactory。它提供了sessionCacheSize和sessionWaitTimeout屬性。控制緩存中維護多少活動會話(默認情況下是無界的)。如果已經達到了sessionCacheSize閾值,那么任何獲取另一個會話的嘗試都將被阻塞,直到其中一個緩存會話可用,或者直到會話的等待時間過期(默認等待時間是Integer.MAX_VALUE)。sessionWaitTimeout屬性啟用該值的配置。緩存會話,然后將其包裝在CachingSessionFactory的實例中。
<bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<property name="host" value="localhost"/>
</bean>
<bean id="cachingSessionFactory"
class="org.springframework.integration.file.remote.session.CachingSessionFactory">
<constructor-arg ref="sftpSessionFactory"/>
<property name="sessionCacheSize" value="10"/>
<property name="sessionWaitTimeout" value="1000"/>
</bean>
從Spring Integration version 3.0開始,CachingConnectionFactory提供了resetCache()方法。當被調用時,所有空閑會話將立即關閉,而正在使用的會話將在返回到緩存時關閉。當使用isSharedSession=true時,通道是關閉的,而共享會話僅在最后一個通道關閉時才關閉。對會話的新請求將根據需要建立新的會話。
(3)RemoteFileTemplate
從Spring Integration 3.0開始,通過SftpSession對象提供了一個新類Remote File Template。提供了Sftp文件發送、檢索、刪除和重命名文件的方法。此外,還提供了一個執行方法,允許調用者在會話上執行多個操作。在所有情況下,模板負責可靠地關閉會話。
SftpRemoteFileTemplate繼承Remote File Template,可以很容易的實現對SFTP文件的發送(包括文件追加,替換等),檢索,刪除和重命名。



(4)SFTP Inbound Channel Adapter
SFTP Inbound Channel Adapter 是一個特殊的監聽器,這個認識很重要,它將連接到服務器,並監聽遠程文件操作事件(例如,創建新文件),並初始化文件傳輸實例。它的作用實質就是將sftp遠程服務器文件發送到本地文件夾下。
下面是XML方式的 sftp inbound channel adapter配置實例:
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
local-filter="myFilter"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>
從上面的配置可以理解,由inbound-channel-adapter元素創建,同時提供各種屬性的值,local-directory:文件被轉移的目標文件夾;remote-directory:文件來源遠程文件夾目錄;session-factory:session 會話工廠;channel:需要的渠道;文件過濾器以及是否刪除轉移后的文件等屬性。
下面是springboot方式配置的Sftp Inbound channel adapter:
@Bean
@InboundChannelAdapter(value = "inboundFileChannel",
poller = @Poller(cron = "1/10 * * * * *", maxMessagesPerPoll = "1"))
public MessageSource<File> fileMessageSource() {
//創建sftpInboundFileSynchronizer,並綁定到message source.
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
//自動創建本地文件夾
source.setAutoCreateLocalDirectory(true);
source.setLocalDirectory(new File(sftpProperty.getLocalTempDir()));
//設置文件過濾器
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
return source;
}
/**
* 為Inbound-channel-adapter提供bean
*/
@Bean
public DirectChannel inboundFileChannel() {
return new DirectChannel();
}
/**
* SftpInboundFileSynchronizer,
*
* 同步sftp文件至本地服務器.
* <1> 可以放在service中獲取bean使用.toLocal方法;
* <2> 也可以使用inbound-channel-adapter中,做監控文件服務器的動態。
*
* @return SftpInboundFileSynchronizer
*/
@Bean(name = "synFileChannel")
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer (){
SftpInboundFileSynchronizer fileSynchronize =
new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronize.setDeleteRemoteFiles(true);
fileSynchronize.setPreserveTimestamp(true);
//!important
fileSynchronize.setRemoteDirectory(sftpProperty.getSftpSendPath());
fileSynchronize.setFilter(new SftpSimplePatternFileListFilter("*.*"));
//fileSynchronize.setLocalFilenameGeneratorExpression( );
fileSynchronize.setPreserveTimestamp(true);
return fileSynchronize;
}
文件名設置:默認情況下,傳輸的文件將與原始文件具有相同的名稱。Inbound channel adapter提供local-filename-generator- Expression屬性,允許提供SpEL表達式來生成本地文件的名稱。
文件修改時間戳:從Spring Integration 3.0開始,reserve-timestamp屬性(默認為false);設置為true時,本地文件修改后的時間戳將設置為從服務器接收到的時間;否則將設置為當前時間。
文件過濾器:filename-pattern屬性指定了簡單的文件過濾模式。同時還提供了filename-regex屬性來指定使用正則表達式(例如filename-regex=".*.test$")來過濾文件,以確定檢索哪些遠程文件。除了上面的過濾器之外,spring integration還提供了其他篩選器,如AcceptOnceFileListFilter,CompositeFileListFilter,SftpPersistentAcceptOnceFileListFilter,前者的作用是避免同步以前獲取的文件。
(5)SFTP Outbound Channel Adapter
Spring Integration的 sftp outbound channel adapter是一個特殊的Message Handler,它將連接到遠程sftp服務器目錄。並為接收到的Message的payload,初始化一個文件傳輸渠道。 sftp outbound adapter支持以下幾種傳輸對象:
1)Java.io.File:實際文件對象;
2)byte[]:字節數組
3)Java.lang.String:字符串
下面是基於xml格式配置:
<int-sftp:outbound-channel-adapter id="sftpOutboundAdapter" session-factory="sftpSessionFactory" channel="inputChannel" charset="UTF-8" remote-directory="foo/bar" remote-filename-generator-expression="payload.getName() + '-foo'"/>
由上面的配置,可以看到outbound channel adapter的屬性;可以動態計算文件名或現有目錄路徑的表達式。目前Spring Integration並沒有提供單獨的注解來配置此adapter,可以使用@ServiceActivator 定義bean,來實現該配置。
下面是基於Spring boot框架配置,配置了Inbound Channel Adapter。
@SpringBootApplication
public class SftpJavaApplication {
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
//配置Sftp Inbound Channel Adapter.
//Inbound Channel Adapter實質就是一個服務監控器,監控sftp上服務器上文件的創建。
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
//配置Sftp Outbound Channel Adapter.
//Outbound Channel Adapter實質上就是一個MessageHandler,接收發送的消息。
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
Channel Adapter*
管道適配器,其就是一個端點Endpoint,顧名思義,就是用來適配連接Message channel與其他系統的。Channel Adapter分為inbound和outbound。剛一開始接觸spring-integration時對這兩個概念的非常不理解,在這里可以簡單化,以當前的項目spring-integration為基准,所謂inbound就是從其他系統接收Message,獲取資源,如sftp,file,Http等。而outbound就是將資源通過消息管道,發送資源到target目標服務器。


(6)Service Activator
Service activator的作用時將存在的service注冊為一個端點,將該服務實例同spring-integration消息服務連接起來。該端點必須配置input channel,若調用的方法有返回值,還可以配置out channel。
Service activator調用某個服務的操作來處理消息,如在下面的發送文件到sftp服務,就是將SftpMessageHandler來處理channel中的消息。

Spring-Integration-Sftp實戰測試:
一、導入依賴
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-sftp</artifactId>
</dependency>
二、Sftp配置類
@Configuration @EnableIntegration public class SpringSftpConfig { @Autowired private SftpProperties sftpProperties; @Bean public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() { DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true); factory.setHost(sftpProperties.getHost()); factory.setPort(sftpProperties.getPort()); factory.setUser(sftpProperties.getUsername()); if (StringUtils.isNotEmpty(sftpProperties.getPrivateKey())) { factory.setPrivateKey(new ClassPathResource(sftpProperties.getPrivateKey())); factory.setPrivateKeyPassphrase(sftpProperties.getPassphrase()); } else { factory.setPassword(sftpProperties.getPassword()); } factory.setAllowUnknownKeys(true); CachingSessionFactory<ChannelSftp.LsEntry> cachingSessionFactory = new CachingSessionFactory<>(factory); cachingSessionFactory.setPoolSize(10); cachingSessionFactory.setSessionWaitTimeout(10000); return cachingSessionFactory; } @Bean public SftpRemoteFileTemplate sftpRemoteFileTemplate() { return new SftpRemoteFileTemplate(sftpSessionFactory()); } @Bean public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() { SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory()); fileSynchronizer.setDeleteRemoteFiles(true); fileSynchronizer.setRemoteDirectory(sftpProperties.getRoot()); fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.txt")); return fileSynchronizer; } @Bean @ServiceActivator(inputChannel = "lsChannel") public MessageHandler lsHandler() { SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), "ls", "payload"); sftpOutboundGateway.setOptions("-dirs"); //配置項 return sftpOutboundGateway; } @Bean @ServiceActivator(inputChannel = "downloadChannel") public MessageHandler downloadHandler() { SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), "mget", "payload"); sftpOutboundGateway.setOptions("-R"); sftpOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE_IF_MODIFIED); sftpOutboundGateway.setLocalDirectory(new File(sftpProperties.getLocalPath())); sftpOutboundGateway.setAutoCreateLocalDirectory(true); return sftpOutboundGateway; } @Bean @ServiceActivator(inputChannel = "uploadChannel") public MessageHandler uploadHandler() { SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory()); handler.setRemoteDirectoryExpression(new LiteralExpression(sftpProperties.getRoot())); handler.setFileNameGenerator(message -> { if (message.getPayload() instanceof File) { return ((File) message.getPayload()).getName(); } else { throw new IllegalArgumentException("File expected as payload."); } }); return handler; } @MessagingGateway public interface SftpGateway { @Gateway(requestChannel = "lsChannel") List<FileInfo> listFile(String dir); @Gateway(requestChannel = "downloadChannel") List<File> downloadFiles(String dir); @Gateway(requestChannel = "uploadChannel") void uploadFile(File file); } }
其中,SftpProperties是第二篇中的配置類。SessionFactory來創建連接工廠,進行連接操作。SftpRemoteFileTemplate是Spring-integration-sftp進行sftp操作的模板類,相當於jsch的ChannelSftp。但是SftpRemoteFileTemplate的上傳下載方法不大好用,是通過InputStream和OutputStream進行操作的。因此這里通過Spring integration的MessageHandler來進行上傳下載操作。
@Autowired private SpringSftpConfig.SftpGateway sftpGateway;
sftpGateway.listFile("/Users/kungfupanda").stream().forEach(System.out::println);
sftpGateway.downloadFiles("/Users/kungfupanda/sftpserver/RELREVMOD_ddmmyyyy.txt").stream().forEach(System.out::println);
sftpGateway.uploadFile(new File("RELREVMOD_ddmmyyyy.txt"));
三、啟動集成掃描
在啟動類上加上@IntegrationComponentScan注解
@IntegrationComponentScan @SpringBootApplication @EnableScheduling public class CDDAlertApplication{ public static void main(String... args){ SpringApplication.run(CDDAlertApplication.class, args); } }
sftp.properties配置信息文件
sftp.client.protocol=sftp sftp.client.host=localhost sftp.client.port=22 sftp.client.username=kungfupanda sftp.client.password=xxxxx sftp.client.root=/Users/kungfupanda sftp.client.privateKey= sftp.client.passphrase= sftp.client.sessionStrictHostKeyChecking=no sftp.client.sessionConnectTimeout=15000 sftp.client.channelConnectedTimeout=15000 sftp.client.localPath=./_files
