算是一個比較常見的問題,銀行對賬(sftp讀取使用csv格式,比ftp安全點),一般都是定時數據處理(可以使用spring boot 的Scheduled 可以方便處理)
以下只是說明關於sftp讀取以及csv處理以及jdbc寫入的
依賴的組件
spring-integration-sftp(包裝處理sftp的,jsch也可以直接拿來用),opencsv 簡單靈活的csv讀取以及bean轉換
easy-batch一個簡單靈活的batch處理框架
參考csv 內容格式說明
截取了部分,實際比這個多,只是演示 
環境准備
使用docker-compose運行sftp以及依賴的pg 數據庫
- docker-compos 文件
version: "3"
services:
postgres:
image: postgres:alpine
environment:
- POSTGRES_PASSWORD=dalong
ports:
- 5432:5432
ftp:
image: mikatux/ftps-server
environment:
- "USER=demo"
- "PASSWORD=demoapp"
restart: always
volumes:
- "./data:/home/demo"
network_mode: host
sftp:
image: atmoz/sftp
volumes:
- ./data2/upload:/home/demo/upload
- ./sshd_config:/etc/ssh/sshd_config
restart: always
cap_add:
- ALL
ports:
- "2222:22"
command: demo:demoapp:1001
sshd_config (此文件還比較重要,在測試的過程中發現直接使用默認的居然有問題)
# $OpenBSD: sshd_config,v 1.100 2016/08/15 12:32:04 naddy Exp $
# This is the sshd server system-wide configuration file. See
# sshd_config(5) for more information.
# This sshd was compiled with PATH=/usr/local/bin:/usr/bin
# The strategy used for options in the default sshd_config shipped with
# OpenSSH is to specify options with their default value where
# possible, but leave them commented. Uncommented options override the
# default value.
# If you want to change the port on a SELinux system, you have to tell
# SELinux about this change.
# semanage port -a -t ssh_port_t -p tcp #PORTNUMBER
#
Port 22
#AddressFamily any
#ListenAddress 0.0.0.0
#ListenAddress ::
HostKey /etc/ssh/ssh_host_rsa_key
#HostKey /etc/ssh/ssh_host_dsa_key
#HostKey /etc/ssh/ssh_host_ecdsa_key
HostKey /etc/ssh/ssh_host_ed25519_key
# Ciphers and keying
#RekeyLimit default none
# Logging
#SyslogFacility AUTH
SyslogFacility AUTHPRIV
#LogLevel INFO
# Authentication:
#LoginGraceTime 2m
#PermitRootLogin no
#StrictModes yes
#MaxAuthTries 6
#MaxSessions 10
#PubkeyAuthentication yes
# The default is to check both .ssh/authorized_keys and .ssh/authorized_keys2
# but this is overridden so installations will only check .ssh/authorized_keys
#AuthorizedPrincipalsFile none
#AuthorizedKeysCommand none
#AuthorizedKeysCommandUser nobody
# For this to work you will also need host keys in /etc/ssh/ssh_known_hosts
#HostbasedAuthentication no
# Change to yes if you don't trust ~/.ssh/known_hosts for
# HostbasedAuthentication
#IgnoreUserKnownHosts no
# Don't read the user's ~/.rhosts and ~/.shosts files
#IgnoreRhosts yes
# To disable tunneled clear text passwords, change to no here!
#PasswordAuthentication yes
#PermitEmptyPasswords no
PasswordAuthentication yes
# Change to no to disable s/key passwords
#ChallengeResponseAuthentication yes
ChallengeResponseAuthentication no
# Kerberos options
#KerberosAuthentication no
#KerberosOrLocalPasswd yes
#KerberosTicketCleanup yes
#KerberosGetAFSToken no
#KerberosUseKuserok yes
# GSSAPI options
GSSAPIAuthentication yes
GSSAPICleanupCredentials no
#GSSAPIStrictAcceptorCheck yes
#GSSAPIKeyExchange no
#GSSAPIEnablek5users no
# Set this to 'yes' to enable PAM authentication, account processing,
# and session processing. If this is enabled, PAM authentication will
# be allowed through the ChallengeResponseAuthentication and
# PasswordAuthentication. Depending on your PAM configuration,
# PAM authentication via ChallengeResponseAuthentication may bypass
# the setting of "PermitRootLogin without-password".
# If you just want the PAM account and session checks to run without
# PAM authentication, then enable this but set PasswordAuthentication
# and ChallengeResponseAuthentication to 'no'.
# WARNING: 'UsePAM no' is not supported in Red Hat Enterprise Linux and may cause several
# problems.
#AllowAgentForwarding yes
#AllowTcpForwarding yes
#GatewayPorts no
X11Forwarding yes
#X11DisplayOffset 10
#X11UseLocalhost yes
#PermitTTY yes
#PrintMotd yes
#PrintLastLog yes
#TCPKeepAlive yes
#UseLogin no
#UsePrivilegeSeparation sandbox
#PermitUserEnvironment no
#Compression delayed
#ClientAliveInterval 0
#ClientAliveCountMax 3
#ShowPatchLevel no
UseDNS no
#PidFile /var/run/sshd.pid
#MaxStartups 10:30:100
#PermitTunnel no
#ChrootDirectory none
#VersionAddendum none
# no default banner path
#Banner none
# Accept locale-related environment variables
AcceptEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES
AcceptEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT
AcceptEnv LC_IDENTIFICATION LC_ALL LANGUAGE
AcceptEnv XMODIFIERS
# override default of no subsystems
# Force sftp and chroot jail
Subsystem sftp internal-sftp
ForceCommand internal-sftp
#ChrootDirectory %h
# Example of overriding settings on a per-user basis
#Match User anoncvs
# X11Forwarding no
# AllowTcpForwarding no
# PermitTTY no
# ForceCommand cvs server
代碼說明
- pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.jeasy</groupId>
<artifactId>easy-batch-opencsv</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.4.5</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.6</version>
</dependency>
<dependency>
<groupId>org.jeasy</groupId>
<artifactId>easy-batch-flatfile</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>org.jeasy</groupId>
<artifactId>easy-batch-jdbc</artifactId>
<version>6.1.0</version>
</dependency>
<dependency>
<groupId>org.jeasy</groupId>
<artifactId>easy-batch-core</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>au.com.bytecode</groupId>
<artifactId>opencsv</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-sftp</artifactId>
<version>5.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
- 核心bean配置
主要關於pg datasource 以及easy-batch 處理的
@Bean
public DataSource dataSource() {
HikariConfig config = new HikariConfig();
Properties properties = new Properties();
properties.setProperty("serverName","127.0.0.1");
properties.setProperty("portNumber","5432");
properties.setProperty("user","postgres");
properties.setProperty("password","dalong");
config.setMaximumPoolSize(10);
config.setMaxLifetime(3);
properties.setProperty("sslmode","disable");
properties.setProperty("databaseName","postgres");
properties.setProperty("preparedStatementCacheQueries","0");
config.setDataSourceClassName("org.postgresql.ds.PGSimpleDataSource");
config.setDataSourceProperties(properties);
config.setConnectionTestQuery("SELECT 1");
HikariDataSource ds = new HikariDataSource(config);
return ds;
}
@Bean(destroyMethod = "shutdown")
public JobExecutor jobExecutor(){
JobExecutor jobExecutor = new JobExecutor();
return jobExecutor;
}
- sftp下載處理
簡單demo
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.SftpException;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.integration.sftp.session.SftpSession;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
public class Download {
private DefaultSftpSessionFactory gimmeFactory(){
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost("127.0.0.1");
factory.setPort(2222);
Properties properties =new Properties();
properties.setProperty("StrictHostKeyChecking","false");
factory.setSessionConfig(properties);
factory.setAllowUnknownKeys(true);
factory.setUser("demo");
factory.setChannelConnectTimeout(Duration.ofSeconds(1000));
factory.setPassword("demoapp");
return factory;
}
public String download() throws JSchException {
SftpSession session = gimmeFactory().getSession();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try {
session.read("upload/HENAN0_settleJournal_20190728.txt", outputStream);
return new String(outputStream.toByteArray(),"utf-8");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
- csv 實體映射
這個使用opencsv來解析,很方便可以靈活的進行映射處理,減少好多代碼量
import com.opencsv.bean.CsvBindByName;
public class Bank {
@CsvBindByName(column = "集團號")
private String id;
@CsvBindByName(column = "企業用戶號")
private String user;
@CsvBindByName(column = "商戶名稱")
private String message;
public Bank() {
}
public Bank(String id, String user, String message) {
this.id = id;
this.user = user;
this.message = message;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "Bank{" + "id=" + id +
", user='" + user + '\'' +
", message='" + message + '\'' +
'}';
}
}
- 簡單解析處理
@RequestMapping("/api3")
public void demo3() throws JSchException, IOException {
Download download =new Download();
String content = download.download();
// 直接使用了opencsv 的實體映射以及解析處理,簡化代碼量
ByteArrayInputStream byteArrayInputStream= new ByteArrayInputStream(content.getBytes());
InputStreamReader inputStreamReader = new InputStreamReader(byteArrayInputStream,"utf-8");
HeaderColumnNameMappingStrategy<Bank> mappingStrategy = new HeaderColumnNameMappingStrategy<>();
mappingStrategy.setType(Bank.class);
CsvToBean<Bank> build = new CsvToBeanBuilder<Bank>(inputStreamReader).withMappingStrategy(mappingStrategy).withSeparator('|').build();
List<Bank> bankList = build.parse();
// 使用upsert 模式插入數據,實際處理,我們可以添加定時任務,同時最好暴露rest api,方便在處理異常的時候進行手工處理,同時推薦添加easy-batch 的監聽以及監控(集成prometheus 也很不錯,方便了解實際情況)
String query = "insert into tweet VALUES(?,?,?) ON conflict(id) DO NOTHING ";
String[] fields = {"id", "user", "message"};
PreparedStatementProvider psp = new BeanPropertiesPreparedStatementProvider(Bank.class, fields);
Job job = new JobBuilder()
.batchSize(2)
.reader(new IterableRecordReader(bankList))
.errorThreshold(100)
.writer(new JdbcRecordWriter(dataSource, query, psp))
.build();
JobReport jobReport = jobExecutor.execute(job);
System.out.println(jobReport);
}
- 運行效果
注意需要在pg db 創建table

說明
opencsv 推薦使用新版本在bena處理上比較靈活,easy-batch 是一個很不錯的batch處理框架,簡單,靈活,一個參考圖 
sftp 連接的sdk 可選的有:JSch, SSHJ,Apache Commons VFS, jsch 很不錯
同時對於數據處理實際上除過 easy-batch,spring-batch, apache camel, spring-integration 都是不錯的選擇,但是easy-batch從使用
上更加簡單,而且靈活(主要是輕量)
參考資料
https://github.com/j-easy/easy-batch
http://www.jcraft.com/jsch/
http://opencsv.sourceforge.net/
