spring boot 訪問sftp csv 格式數據&&寫入db


算是一個比較常見的問題,銀行對賬(sftp讀取使用csv格式,比ftp安全點),一般都是定時數據處理(可以使用spring boot 的Scheduled 可以方便處理)
以下只是說明關於sftp讀取以及csv處理以及jdbc寫入的

依賴的組件

spring-integration-sftp(包裝處理sftp的,jsch也可以直接拿來用),opencsv 簡單靈活的csv讀取以及bean轉換
easy-batch一個簡單靈活的batch處理框架

參考csv 內容格式說明

截取了部分,實際比這個多,只是演示

 

 

 

環境准備

使用docker-compose運行sftp以及依賴的pg 數據庫

  • docker-compos 文件
 
version: "3"
services:
  postgres:
    image: postgres:alpine
    environment:
      - POSTGRES_PASSWORD=dalong
    ports:
      - 5432:5432
  ftp:
    image:  mikatux/ftps-server
    environment:
      - "USER=demo"
      - "PASSWORD=demoapp"
    restart: always
    volumes:
      - "./data:/home/demo"
    network_mode: host
    sftp:
      image: atmoz/sftp
      volumes:
        - ./data2/upload:/home/demo/upload
        - ./sshd_config:/etc/ssh/sshd_config
      restart: always
      cap_add:
        - ALL
      ports:
        - "2222:22"
      command: demo:demoapp:1001

sshd_config (此文件還比較重要,在測試的過程中發現直接使用默認的居然有問題)

# $OpenBSD: sshd_config,v 1.100 2016/08/15 12:32:04 naddy Exp $
# This is the sshd server system-wide configuration file. See
# sshd_config(5) for more information.
# This sshd was compiled with PATH=/usr/local/bin:/usr/bin
# The strategy used for options in the default sshd_config shipped with
# OpenSSH is to specify options with their default value where
# possible, but leave them commented. Uncommented options override the
# default value.
# If you want to change the port on a SELinux system, you have to tell
# SELinux about this change.
# semanage port -a -t ssh_port_t -p tcp #PORTNUMBER
#
Port 22
#AddressFamily any
#ListenAddress 0.0.0.0
#ListenAddress ::
HostKey /etc/ssh/ssh_host_rsa_key
#HostKey /etc/ssh/ssh_host_dsa_key
#HostKey /etc/ssh/ssh_host_ecdsa_key
HostKey /etc/ssh/ssh_host_ed25519_key
# Ciphers and keying
#RekeyLimit default none
# Logging
#SyslogFacility AUTH
SyslogFacility AUTHPRIV
#LogLevel INFO
# Authentication:
#LoginGraceTime 2m
#PermitRootLogin no
#StrictModes yes
#MaxAuthTries 6
#MaxSessions 10
#PubkeyAuthentication yes
# The default is to check both .ssh/authorized_keys and .ssh/authorized_keys2
# but this is overridden so installations will only check .ssh/authorized_keys
#AuthorizedPrincipalsFile none
#AuthorizedKeysCommand none
#AuthorizedKeysCommandUser nobody
# For this to work you will also need host keys in /etc/ssh/ssh_known_hosts
#HostbasedAuthentication no
# Change to yes if you don't trust ~/.ssh/known_hosts for
# HostbasedAuthentication
#IgnoreUserKnownHosts no
# Don't read the user's ~/.rhosts and ~/.shosts files
#IgnoreRhosts yes
# To disable tunneled clear text passwords, change to no here!
#PasswordAuthentication yes
#PermitEmptyPasswords no
PasswordAuthentication yes
# Change to no to disable s/key passwords
#ChallengeResponseAuthentication yes
ChallengeResponseAuthentication no
# Kerberos options
#KerberosAuthentication no
#KerberosOrLocalPasswd yes
#KerberosTicketCleanup yes
#KerberosGetAFSToken no
#KerberosUseKuserok yes
# GSSAPI options
GSSAPIAuthentication yes
GSSAPICleanupCredentials no
#GSSAPIStrictAcceptorCheck yes
#GSSAPIKeyExchange no
#GSSAPIEnablek5users no
# Set this to 'yes' to enable PAM authentication, account processing,
# and session processing. If this is enabled, PAM authentication will
# be allowed through the ChallengeResponseAuthentication and
# PasswordAuthentication. Depending on your PAM configuration,
# PAM authentication via ChallengeResponseAuthentication may bypass
# the setting of "PermitRootLogin without-password".
# If you just want the PAM account and session checks to run without
# PAM authentication, then enable this but set PasswordAuthentication
# and ChallengeResponseAuthentication to 'no'.
# WARNING: 'UsePAM no' is not supported in Red Hat Enterprise Linux and may cause several
# problems.
#AllowAgentForwarding yes
#AllowTcpForwarding yes
#GatewayPorts no
X11Forwarding yes
#X11DisplayOffset 10
#X11UseLocalhost yes
#PermitTTY yes
#PrintMotd yes
#PrintLastLog yes
#TCPKeepAlive yes
#UseLogin no
#UsePrivilegeSeparation sandbox
#PermitUserEnvironment no
#Compression delayed
#ClientAliveInterval 0
#ClientAliveCountMax 3
#ShowPatchLevel no
UseDNS no
#PidFile /var/run/sshd.pid
#MaxStartups 10:30:100
#PermitTunnel no
#ChrootDirectory none
#VersionAddendum none
# no default banner path
#Banner none
# Accept locale-related environment variables
AcceptEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES
AcceptEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT
AcceptEnv LC_IDENTIFICATION LC_ALL LANGUAGE
AcceptEnv XMODIFIERS
# override default of no subsystems
# Force sftp and chroot jail
Subsystem sftp internal-sftp
ForceCommand internal-sftp
#ChrootDirectory %h
# Example of overriding settings on a per-user basis
#Match User anoncvs
# X11Forwarding no
# AllowTcpForwarding no
# PermitTTY no
# ForceCommand cvs server

代碼說明

  • pom.xml
<dependencies>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.jeasy</groupId>
    <artifactId>easy-batch-opencsv</artifactId>
    <version>6.0.0</version>
</dependency>
<dependency>
    <groupId>com.zaxxer</groupId>
    <artifactId>HikariCP</artifactId>
    <version>3.4.5</version>
</dependency>
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.2.6</version>
</dependency>
<dependency>
    <groupId>org.jeasy</groupId>
    <artifactId>easy-batch-flatfile</artifactId>
    <version>6.0.0</version>
</dependency>
<dependency>
    <groupId>org.jeasy</groupId>
    <artifactId>easy-batch-jdbc</artifactId>
    <version>6.1.0</version>
</dependency>
<dependency>
    <groupId>org.jeasy</groupId>
    <artifactId>easy-batch-core</artifactId>
    <version>6.0.0</version>
</dependency>
<dependency>
    <groupId>au.com.bytecode</groupId>
    <artifactId>opencsv</artifactId>
    <version>2.4</version>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-sftp</artifactId>
    <version>5.3.2.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    <exclusions>
        <exclusion>
            <groupId>org.junit.vintage</groupId>
            <artifactId>junit-vintage-engine</artifactId>
        </exclusion>
    </exclusions>
</dependency>
</dependencies>
<build>
<plugins>
    <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
    </plugin>
</plugins>
</build>
  • 核心bean配置
    主要關於pg datasource 以及easy-batch 處理的
 
@Bean
  public DataSource dataSource() {
    HikariConfig config = new HikariConfig();
    Properties properties  = new Properties();
    properties.setProperty("serverName","127.0.0.1");
    properties.setProperty("portNumber","5432");
    properties.setProperty("user","postgres");
    properties.setProperty("password","dalong");
    config.setMaximumPoolSize(10);
    config.setMaxLifetime(3);
    properties.setProperty("sslmode","disable");
    properties.setProperty("databaseName","postgres");
    properties.setProperty("preparedStatementCacheQueries","0");
    config.setDataSourceClassName("org.postgresql.ds.PGSimpleDataSource");
    config.setDataSourceProperties(properties);
    config.setConnectionTestQuery("SELECT 1");
    HikariDataSource ds = new HikariDataSource(config);
    return ds;
  }
  @Bean(destroyMethod = "shutdown")
  public   JobExecutor jobExecutor(){
    JobExecutor jobExecutor = new JobExecutor();
    return jobExecutor;
  }
  • sftp下載處理
    簡單demo
 
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.SftpException;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.integration.sftp.session.SftpSession;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
public class Download {
    private DefaultSftpSessionFactory gimmeFactory(){
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(2222);
        Properties properties  =new Properties();
        properties.setProperty("StrictHostKeyChecking","false");
        factory.setSessionConfig(properties);
        factory.setAllowUnknownKeys(true);
        factory.setUser("demo");
        factory.setChannelConnectTimeout(Duration.ofSeconds(1000));
        factory.setPassword("demoapp");
        return factory;
    }
    public String download() throws JSchException {
        SftpSession session = gimmeFactory().getSession();
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        try {
                session.read("upload/HENAN0_settleJournal_20190728.txt", outputStream);
                return new String(outputStream.toByteArray(),"utf-8");
        } catch (IOException  e) {
            throw new RuntimeException(e);
        }
    }
}
  • csv 實體映射
    這個使用opencsv來解析,很方便可以靈活的進行映射處理,減少好多代碼量
 
import com.opencsv.bean.CsvBindByName;
public class Bank {
    @CsvBindByName(column = "集團號")
    private String id;
    @CsvBindByName(column = "企業用戶號")
    private String user;
    @CsvBindByName(column = "商戶名稱")
    private String message;
    public Bank() {
    }
    public Bank(String id, String user, String message) {
        this.id = id;
        this.user = user;
        this.message = message;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getUser() {
        return user;
    }
    public void setUser(String user) {
        this.user = user;
    }
    public String getMessage() {
        return message;
    }
    public void setMessage(String message) {
        this.message = message;
    }
    @Override
    public String toString() {
        return "Bank{" + "id=" + id +
                ", user='" + user + '\'' +
                ", message='" + message + '\'' +
                '}';
    }
}
  • 簡單解析處理
@RequestMapping("/api3")
    public void demo3() throws JSchException, IOException {
        Download download =new Download();
        String content =  download.download();
        // 直接使用了opencsv 的實體映射以及解析處理,簡化代碼量
        ByteArrayInputStream byteArrayInputStream= new ByteArrayInputStream(content.getBytes());
        InputStreamReader inputStreamReader = new InputStreamReader(byteArrayInputStream,"utf-8");
        HeaderColumnNameMappingStrategy<Bank> mappingStrategy = new HeaderColumnNameMappingStrategy<>();
        mappingStrategy.setType(Bank.class);
        CsvToBean<Bank> build = new CsvToBeanBuilder<Bank>(inputStreamReader).withMappingStrategy(mappingStrategy).withSeparator('|').build();
        List<Bank> bankList = build.parse();
        // 使用upsert 模式插入數據,實際處理,我們可以添加定時任務,同時最好暴露rest api,方便在處理異常的時候進行手工處理,同時推薦添加easy-batch 的監聽以及監控(集成prometheus 也很不錯,方便了解實際情況)
        String query = "insert into tweet VALUES(?,?,?) ON conflict(id) DO NOTHING ";
        String[] fields = {"id", "user", "message"};
        PreparedStatementProvider psp = new BeanPropertiesPreparedStatementProvider(Bank.class, fields);
        Job job = new JobBuilder()
                .batchSize(2)
                .reader(new IterableRecordReader(bankList))
                .errorThreshold(100)
                .writer(new JdbcRecordWriter(dataSource, query, psp))
                .build();
        JobReport jobReport = jobExecutor.execute(job);
        System.out.println(jobReport);
    }
  • 運行效果

    注意需要在pg db 創建table

 

 

說明

opencsv 推薦使用新版本在bena處理上比較靈活,easy-batch 是一個很不錯的batch處理框架,簡單,靈活,一個參考圖

 

 


sftp 連接的sdk 可選的有:JSch, SSHJ,Apache Commons VFS, jsch 很不錯
同時對於數據處理實際上除過 easy-batch,spring-batch, apache camel, spring-integration 都是不錯的選擇,但是easy-batch從使用
上更加簡單,而且靈活(主要是輕量)

參考資料

https://github.com/j-easy/easy-batch
http://www.jcraft.com/jsch/
http://opencsv.sourceforge.net/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM