Spring Integration Sftp 文件傳送
目前在國內項目開發中,使用Spring Integration技術的比較少,尤其是中文的參考文獻和項目案例,更是罕有。鑒於此,本文詳細介紹spring integration sftp模塊在Sftp服務器和本地服務器之間文件的傳送。
SFTP(Secure File Transfer Protocol) 安全文件傳送協議,允許文件通過流的形式在網絡之間進行傳輸。文件傳輸的過程涉及到兩個重要的因素,安全渠道(secure channel,如SSH)以及SFTP聯接身份的識別(SFTP session)。Spring Integration提供三種方式來支持文件在SFTP服務器的發送和接收:Inbound Channel Adapter,Outbound Channel Adapter,Outbound Gateway。
幾個概念:
(1)SFTP Session Factory
在配置SFTP adapters之前,需要配置SFTP Session Factory;Spring Integration提供了如下xml和spring boot的定義方式。
每次使用 SFTP adapter,都需要Session Factory會話對象,一般情況,都會創建一個新的SFTP會話。同時還提供了Session的緩存功能。Spring integration中的Session Factory是依賴於JSch庫來提供。
JSch支持在一個連接配置上多個channel的操作。原生的JSch技術開發,在打開一個channel操作之前,需要建立Session的連接。同樣的,默認情況,Spring Integration為每一個channel操作使用單獨的物理連接。在3.0版本發布之后,Cache Session Factory 出現 (CachingSessionFactory),將Session Factory包裝在緩存中,支持Session共享,可以在一個連接上支持多個JSch Channel的操作。如果緩存被重置,在最后一次channel關閉之后,才會斷開連接。
下面是XML方式定義的Session Factory的bean:
<beans:bean id="sftpSessionFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory"> <beans:property name="host" value="localhost"/> <beans:property name="privateKey" value="classpath:META-INF/keys/sftpTest"/> <beans:property name="privateKeyPassphrase" value="springIntegration"/> <beans:property name="port" value="22"/> <beans:property name="user" value="songhj"/> </beans:bean>
下面是使用springboot方式定義的Session Factory:
@Bean public SessionFactory<LsEntry> sftpSessionFactory(){ DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true); factory.setUser("songhj"); factory.setHost("127.0.0.1"); factory.setPort("22"); factory.setPassword("password"); return factory; }
(2)SFTP Session Caching
在3.0之前的版本中,會話默認情況下是自動緩存的。可以使用cache-sessions屬性禁用自動緩存,但是該解決方案沒有提供配置其他會話緩存屬性的方法。例如,您不能限制緩存會話的數量。
3.0版本之后,提供了一個CachingSessionFactory。它提供了sessionCacheSize和sessionWaitTimeout屬性。控制緩存中維護多少活動會話(默認情況下是無界的)。如果已經達到了sessionCacheSize閾值,那么任何獲取另一個會話的嘗試都將被阻塞,直到其中一個緩存會話可用,或者直到會話的等待時間過期(默認等待時間是Integer.MAX_VALUE)。sessionWaitTimeout屬性啟用該值的配置。緩存會話,然后將其包裝在CachingSessionFactory的實例中。
<bean id="sftpSessionFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory"> <property name="host" value="localhost"/> </bean> <bean id="cachingSessionFactory" class="org.springframework.integration.file.remote.session.CachingSessionFactory"> <constructor-arg ref="sftpSessionFactory"/> <property name="sessionCacheSize" value="10"/> <property name="sessionWaitTimeout" value="1000"/> </bean>
從Spring Integration version 3.0開始,CachingConnectionFactory提供了resetCache()方法。當被調用時,所有空閑會話將立即關閉,而正在使用的會話將在返回到緩存時關閉。當使用isSharedSession=true時,通道是關閉的,而共享會話僅在最后一個通道關閉時才關閉。對會話的新請求將根據需要建立新的會話。
(3)RemoteFileTemplate
從Spring Integration 3.0開始,通過SftpSession對象提供了一個新類Remote File Template。提供了Sftp文件發送、檢索、刪除和重命名文件的方法。此外,還提供了一個執行方法,允許調用者在會話上執行多個操作。在所有情況下,模板負責可靠地關閉會話。
SftpRemoteFileTemplate繼承Remote File Template,可以很容易的實現對SFTP文件的發送(包括文件追加,替換等),檢索,刪除和重命名。
(4)SFTP Inbound Channel Adapter
SFTP Inbound Channel Adapter 是一個特殊的監聽器,這個認識很重要,它將連接到服務器,並監聽遠程文件操作事件(例如,創建新文件),並初始化文件傳輸實例。它的作用實質就是將sftp遠程服務器文件發送到本地文件夾下。
下面是XML方式的 sftp inbound channel adapter配置實例:
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate" session-factory="sftpSessionFactory" channel="requestChannel" filename-pattern="*.txt" remote-directory="/foo/bar" preserve-timestamp="true" local-directory="file:target/foo" auto-create-local-directory="true" local-filename-generator-expression="#this.toUpperCase() + '.a'" local-filter="myFilter" delete-remote-files="false"> <int:poller fixed-rate="1000"/> </int-sftp:inbound-channel-adapter>
從上面的配置可以理解,由inbound-channel-adapter元素創建,同時提供各種屬性的值,local-directory:文件被轉移的目標文件夾;remote-directory:文件來源遠程文件夾目錄;session-factory:session 會話工廠;channel:需要的渠道;文件過濾器以及是否刪除轉移后的文件等屬性。
下面是springboot方式配置的Sftp Inbound channel adapter:
@Bean @InboundChannelAdapter(value = "inboundFileChannel", poller = @Poller(cron = "1/10 * * * * *", maxMessagesPerPoll = "1")) public MessageSource<File> fileMessageSource() { //創建sftpInboundFileSynchronizer,並綁定到message source. SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer()); //自動創建本地文件夾 source.setAutoCreateLocalDirectory(true); source.setLocalDirectory(new File(sftpProperty.getLocalTempDir())); //設置文件過濾器 source.setLocalFilter(new AcceptOnceFileListFilter<File>()); return source; } /** * 為Inbound-channel-adapter提供bean */ @Bean public DirectChannel inboundFileChannel() { return new DirectChannel(); } /** * SftpInboundFileSynchronizer, * * 同步sftp文件至本地服務器. * <1> 可以放在service中獲取bean使用.toLocal方法; * <2> 也可以使用inbound-channel-adapter中,做監控文件服務器的動態。 * * @return SftpInboundFileSynchronizer */ @Bean(name = "synFileChannel") public SftpInboundFileSynchronizer sftpInboundFileSynchronizer (){ SftpInboundFileSynchronizer fileSynchronize = new SftpInboundFileSynchronizer(sftpSessionFactory()); fileSynchronize.setDeleteRemoteFiles(true); fileSynchronize.setPreserveTimestamp(true); //!important fileSynchronize.setRemoteDirectory(sftpProperty.getSftpSendPath()); fileSynchronize.setFilter(new SftpSimplePatternFileListFilter("*.*")); //fileSynchronize.setLocalFilenameGeneratorExpression( ); fileSynchronize.setPreserveTimestamp(true); return fileSynchronize; }
文件名設置:默認情況下,傳輸的文件將與原始文件具有相同的名稱。Inbound channel adapter提供local-filename-generator- Expression屬性,允許提供SpEL表達式來生成本地文件的名稱。
文件修改時間戳:從Spring Integration 3.0開始,reserve-timestamp屬性(默認為false);設置為true時,本地文件修改后的時間戳將設置為從服務器接收到的時間;否則將設置為當前時間。
文件過濾器:filename-pattern屬性指定了簡單的文件過濾模式。同時還提供了filename-regex屬性來指定使用正則表達式(例如filename-regex=".*.test$")來過濾文件,以確定檢索哪些遠程文件。除了上面的過濾器之外,spring integration還提供了其他篩選器,如AcceptOnceFileListFilter,CompositeFileListFilter,SftpPersistentAcceptOnceFileListFilter,前者的作用是避免同步以前獲取的文件。
(5)SFTP Outbound Channel Adapter
Spring Integration的 sftp outbound channel adapter是一個特殊的Message Handler,它將連接到遠程sftp服務器目錄。並為接收到的Message的payload,初始化一個文件傳輸渠道。 sftp outbound adapter支持以下幾種傳輸對象:
1)Java.io.File:實際文件對象;
2)byte[]:字節數組
3)Java.lang.String:字符串
下面是基於xml格式配置:
<int-sftp:outbound-channel-adapter id="sftpOutboundAdapter" session-factory="sftpSessionFactory" channel="inputChannel" charset="UTF-8" remote-directory="foo/bar" remote-filename-generator-expression="payload.getName() + '-foo'"/>
由上面的配置,可以看到outbound channel adapter的屬性;可以動態計算文件名或現有目錄路徑的表達式。目前Spring Integration並沒有提供單獨的注解來配置此adapter,可以使用@ServiceActivator 定義bean,來實現該配置。
下面是基於Spring boot框架配置,配置了Inbound Channel Adapter。
@SpringBootApplication public class SftpJavaApplication { @Bean public SessionFactory<LsEntry> sftpSessionFactory() { DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true); factory.setHost("localhost"); factory.setPort(port); factory.setUser("foo"); factory.setPassword("foo"); factory.setAllowUnknownKeys(true); factory.setTestSession(true); return new CachingSessionFactory<LsEntry>(factory); } @Bean public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() { SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory()); fileSynchronizer.setDeleteRemoteFiles(false); fileSynchronizer.setRemoteDirectory("foo"); fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml")); return fileSynchronizer; } //配置Sftp Inbound Channel Adapter. //Inbound Channel Adapter實質就是一個服務監控器,監控sftp上服務器上文件的創建。 @Bean @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000")) public MessageSource<File> sftpMessageSource() { SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer()); source.setLocalDirectory(new File("sftp-inbound")); source.setAutoCreateLocalDirectory(true); source.setLocalFilter(new AcceptOnceFileListFilter<File>()); source.setMaxFetchSize(1); return source; } //配置Sftp Outbound Channel Adapter. //Outbound Channel Adapter實質上就是一個MessageHandler,接收發送的消息。 @Bean @ServiceActivator(inputChannel = "sftpChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println(message.getPayload()); } }; } }
Channel Adapter*
管道適配器,其就是一個端點Endpoint,顧名思義,就是用來適配連接Message channel與其他系統的。Channel Adapter分為inbound和outbound。剛一開始接觸spring-integration時對這兩個概念的非常不理解,在這里可以簡單化,以當前的項目spring-integration為基准,所謂inbound就是從其他系統接收Message,獲取資源,如sftp,file,Http等。而outbound就是將資源通過消息管道,發送資源到target目標服務器。
(6)Service Activator
Service activator的作用時將存在的service注冊為一個端點,將該服務實例同spring-integration消息服務連接起來。該端點必須配置input channel,若調用的方法有返回值,還可以配置out channel。
Service activator調用某個服務的操作來處理消息,如在下面的發送文件到sftp服務,就是將SftpMessageHandler來處理channel中的消息。
Spring-Integration-Sftp實戰測試:
一、導入依賴
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-sftp</artifactId> </dependency>
二、Sftp配置類
@Configuration @EnableIntegration public class SpringSftpConfig { @Autowired private SftpProperties sftpProperties; @Bean public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() { DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true); factory.setHost(sftpProperties.getHost()); factory.setPort(sftpProperties.getPort()); factory.setUser(sftpProperties.getUsername()); if (StringUtils.isNotEmpty(sftpProperties.getPrivateKey())) { factory.setPrivateKey(new ClassPathResource(sftpProperties.getPrivateKey())); factory.setPrivateKeyPassphrase(sftpProperties.getPassphrase()); } else { factory.setPassword(sftpProperties.getPassword()); } factory.setAllowUnknownKeys(true); CachingSessionFactory<ChannelSftp.LsEntry> cachingSessionFactory = new CachingSessionFactory<>(factory); cachingSessionFactory.setPoolSize(10); cachingSessionFactory.setSessionWaitTimeout(10000); return cachingSessionFactory; } @Bean public SftpRemoteFileTemplate sftpRemoteFileTemplate() { return new SftpRemoteFileTemplate(sftpSessionFactory()); } @Bean public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() { SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory()); fileSynchronizer.setDeleteRemoteFiles(true); fileSynchronizer.setRemoteDirectory(sftpProperties.getRoot()); fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.txt")); return fileSynchronizer; } @Bean @ServiceActivator(inputChannel = "lsChannel") public MessageHandler lsHandler() { SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), "ls", "payload"); sftpOutboundGateway.setOptions("-dirs"); //配置項 return sftpOutboundGateway; } @Bean @ServiceActivator(inputChannel = "downloadChannel") public MessageHandler downloadHandler() { SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(), "mget", "payload"); sftpOutboundGateway.setOptions("-R"); sftpOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE_IF_MODIFIED); sftpOutboundGateway.setLocalDirectory(new File(sftpProperties.getLocalPath())); sftpOutboundGateway.setAutoCreateLocalDirectory(true); return sftpOutboundGateway; } @Bean @ServiceActivator(inputChannel = "uploadChannel") public MessageHandler uploadHandler() { SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory()); handler.setRemoteDirectoryExpression(new LiteralExpression(sftpProperties.getRoot())); handler.setFileNameGenerator(message -> { if (message.getPayload() instanceof File) { return ((File) message.getPayload()).getName(); } else { throw new IllegalArgumentException("File expected as payload."); } }); return handler; } @MessagingGateway public interface SftpGateway { @Gateway(requestChannel = "lsChannel") List<FileInfo> listFile(String dir); @Gateway(requestChannel = "downloadChannel") List<File> downloadFiles(String dir); @Gateway(requestChannel = "uploadChannel") void uploadFile(File file); } }
其中,SftpProperties是第二篇中的配置類。SessionFactory來創建連接工廠,進行連接操作。SftpRemoteFileTemplate是Spring-integration-sftp進行sftp操作的模板類,相當於jsch的ChannelSftp。但是SftpRemoteFileTemplate的上傳下載方法不大好用,是通過InputStream和OutputStream進行操作的。因此這里通過Spring integration的MessageHandler來進行上傳下載操作。
@Autowired private SpringSftpConfig.SftpGateway sftpGateway;
sftpGateway.listFile("/Users/kungfupanda").stream().forEach(System.out::println); sftpGateway.downloadFiles("/Users/kungfupanda/sftpserver/RELREVMOD_ddmmyyyy.txt").stream().forEach(System.out::println); sftpGateway.uploadFile(new File("RELREVMOD_ddmmyyyy.txt"));
三、啟動集成掃描
在啟動類上加上@IntegrationComponentScan注解
@IntegrationComponentScan @SpringBootApplication @EnableScheduling public class CDDAlertApplication{ public static void main(String... args){ SpringApplication.run(CDDAlertApplication.class, args); } }
sftp.properties配置信息文件
sftp.client.protocol=sftp sftp.client.host=localhost sftp.client.port=22 sftp.client.username=kungfupanda sftp.client.password=xxxxx sftp.client.root=/Users/kungfupanda sftp.client.privateKey= sftp.client.passphrase= sftp.client.sessionStrictHostKeyChecking=no sftp.client.sessionConnectTimeout=15000 sftp.client.channelConnectedTimeout=15000 sftp.client.localPath=./_files