websocket+多线程+jdbc大数据量导表


1. 项目背景 : 前几天以及最近的博客,都在完成一个接口,前端点击“导入”按钮,后台完成大数据量导入,并且在导入的过程中持续告知导入情况;

2. 思考分析 : 大数据量导入,假设是10W条,那么如果是使用关系型数据库则可以使用jdbc的fetch游标移动完成工作,并且在插入的时候可以使用多线程加快速度,

在最开始的时候查询出总数据量,除以我们规定的5000次导入一次就得到了子线程应当执行的次数,并返回给前端,之后子线程开始执行,每执行一次,返回一个

状态码给前端,则前端依据此2个环节可推断进度,并能显示对应的进度条效果。

3. 代码实践 : 

  3.1 pom 加入 websokcet依赖 

  

 <!--  引入websokcet -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>

   3.2 springboot 注入 websocket配置类 

  

package root.websocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @Date: 2018/10/25 10:10
 * @Description:  注入 websocekt配置类
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}
View Code

  3.3 springboot  加入自己要对外的websocket类

  

package root.websocket;

import org.apache.ibatis.session.SqlSession;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import root.report.control.dict.TestTwo;
import root.report.db.DbFactory;
import root.report.service.DictService;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @Date: 2018/10/25 10:11
 * @Description: 注入dictSocket接口
 */
@ServerEndpoint(value = "/websocket/dictSocket")
@Component
public class ImportDictValueSocket {

    private static Logger log = Logger.getLogger(ImportDictValueSocket.class);

    private DictService dictService;
    //此处是解决无法注入的关键
    private static ApplicationContext applicationContext;
    public static void setApplicationContext(ApplicationContext applicationContext) {
        ImportDictValueSocket.applicationContext = applicationContext;
    }

    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static int onlineCount = 0;

    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    private static CopyOnWriteArraySet<ImportDictValueSocket> webSocketSet = new CopyOnWriteArraySet<ImportDictValueSocket>();

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;

    /**
     * 连接建立成功调用的方法*/
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        webSocketSet.add(this);     //加入set中
        addOnlineCount();           //在线数加1
        /*System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
        try {
            sendMessage("当前在线人数"+getOnlineCount()+",当前sessionID:"+this.session.getId());
        } catch (IOException e) {
            System.out.println("IO异常");
        }*/
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  //从set中删除
        subOnlineCount();           //在线数减1
        log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息*/
    @OnMessage
    public void onMessage(String message, Session session) throws IOException, InterruptedException {
        log.info("来自客户端的消息:" + message);
        // 调用  TestTwo的 导入方法,一旦连接成功 , 则 持续通信
        try {
            dictService = applicationContext.getBean(DictService.class);
            String result = dictService.importFuncDictValueByDictId(session,Integer.parseInt(message));   // 需要加锁与否?
            synchronized (result){
                log.info("结果为"+result);
                sendMessage(result);   // 返回  "over"
                session.close();  // 关闭掉
            }
        }catch (Exception e){
            log.error(e.getMessage());
            sendMessage(e.getMessage());
        }

        //群发消息
      /*  for (ImportDictValueSocket item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }*/
    }

    /**
     * 发生错误时调用
     @OnError
     public void onError(Session session, Throwable error) {
     System.out.println("发生错误");
     error.printStackTrace();
     }  **/


     public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
     }

     /**
      * 群发自定义消息
      * */
    public static void sendInfo(String message) throws IOException {
        for (ImportDictValueSocket item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                continue;
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        ImportDictValueSocket.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        ImportDictValueSocket.onlineCount--;
    }
}
View Code

  3.3-1  注意到 : 我们在 websocket 当中 无法正常引入 @Service等 业务class,这些必须手动在 websocket生成,或者我们从 容器当中读到,所以我们要在启动类当中做如下处理:

  

    public static void main(String[] args) {
        ConfigurableApplicationContext configurableApplicationContext = SpringApplication.run(ReportServerApplication.class, args);
        // 解决websocket 当中无法注入bean的方法
        ImportDictValueSocket.setApplicationContext(configurableApplicationContext);
    }

  3.3-2 业务类代码 

  

package root.report.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.session.SqlSession;
import org.apache.log4j.Logger;
import org.dom4j.*;
import org.dom4j.io.OutputFormat;
import org.dom4j.io.XMLWriter;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.xml.sax.SAXException;
import root.configure.AppConstants;
import root.configure.MybatisCacheConfiguration;
import root.report.db.DbFactory;
import root.report.util.JsonUtil;
import root.report.util.XmlUtil;

import javax.websocket.Session;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.*;
import java.util.*;
import java.util.Date;
import java.util.concurrent.*;
import java.util.function.Consumer;

@Service
public class DictService {
// 测试使用 fetch 按照指定规格读取数据  ,  源表为 test_dict 目标表名为 test_import
    public void fetchBySql(SqlSession sqlSession,String sourceSql,Consumer<ResultSet> consumer){
        // 通过 conn 得到 真正的连接(要解析 数据库名,从而真正的 数据库连接通道)
        // 而在我们这里则不需要这么复杂,前面过程都由DbFactory 完成了,传递进来的sqlSession 就是一个 连接好的通道
        Connection conn = sqlSession.getConnection();
        PreparedStatement stm = null;
        // 开始时间
        Long begin = new Date().getTime();
        String sql = sourceSql;
        try {
            final int countRow = 1000;
            stm = conn.prepareStatement(sql,ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);
            // stm.setFetchSize(Integer.MIN_VALUE);      // 设置游标  1000 每次读取一千行数据
            stm.setFetchSize(countRow);
            stm.setFetchDirection(ResultSet.FETCH_REVERSE);     //
            ResultSet rs = stm.executeQuery();
            // rs.setFetchSize(Integer.MIN_VALUE);
            consumer.accept(rs);    // 把 当前的 rs 传递给消费者
        }catch (Exception e){
            e.printStackTrace();
        }
    }
public String importFuncDictValueByDictId(Session session, int dict_id){

        // 先决条件 : 根据 dict_id 得到 sourceSql
        SqlSession sqlSession = DbFactory.Open(DbFactory.FORM);
        String dbName = sqlSession.selectOne("dict.getDictDbByDictId",dict_id);
        if(StringUtils.isBlank(dbName)) return("此DictId所对应的数据库为空,无法操作!");
        final int countRow = 1000;
        final List<Map<String, Object>> list = new ArrayList<>();

        // 初始化对应的数据库
        SqlSession sourceSqlSession = DbFactory.Open(dbName);
        if(sourceSqlSession==null)  return("数据库无法连接,无法操作!");
        String sourceSql = null;
        try {
            sourceSql = this.getSqlTemplate("数据字典",String.valueOf(dict_id),false);
        } catch (Exception e){
            log.info(e.getMessage());
            return "无法从xml文件得到对应的sql,无法查询!";
        }

        Long begin = new Date().getTime();
        Connection conn  = sqlSession.getConnection();
        final int poolSize = 5;
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(poolSize);
        fetchBySql(sourceSqlSession,sourceSql,rs -> {
            try {
                String prefix = "INSERT INTO func_dict_value (dict_id,value_code,value_name) VALUES ";
                final StringBuffer suffix = new StringBuffer();
                // 设置事务为非自动提交
                conn.setAutoCommit(false);   //  非提交能减少日志的生成,从而加快执行速度
                PreparedStatement pst = (PreparedStatement) conn.prepareStatement("");
                List<Map> mapList = new ArrayList<>();
                if (rs != null) {
                    rs.last();      // 移动到最后面
                    BigDecimal result = new BigDecimal((double) rs.getRow()/5000).setScale(0, BigDecimal.ROUND_UP);
                    int countSize = result.intValue();
                    try {
                        session.getBasicRemote().sendText(String.valueOf(countSize));    // 第一次直接发送给websocket 客户端1个 本次执行总数统计.
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    rs.first();  // 移动到最前面   // 坑处1,如果使用了firt的话,那么直接达到了第一条,则不要使用 while(rs.next())
                    if(rs!=null && StringUtils.isNotBlank(rs.getInt("code")+"")){
                        Map<String, Object> map = new HashMap<>();
                        map.put("code",rs.getInt("code"));
                        map.put("name",rs.getString("name"));
                        mapList.add(map);
                    }
                    while (rs.next()) {
                        Map<String, Object> map = new HashMap<>();
                        map.put("code",rs.getInt("code"));
                        map.put("name",rs.getString("name"));
                        mapList.add(map);
                        if(rs.getRow()%5000==0){
                            //  执行批量插入操作
                            List<Map> finalMapList = mapList;
                            final StringBuffer sb = new StringBuffer();
                            Future<String> stringFuture = fixedThreadPool.submit(
                                    new Callable<String>() {
                                        @Override
                                        public String call() throws Exception {
                                            String name = Thread.currentThread().getName();
                                            long threadId = Thread.currentThread().getId();
                                            log.info("thread name: "+name+",id为"+threadId+"执行了一次");
                                            for (Map tempMap : finalMapList) {
                                                // 构建SQL后缀
                                                sb.append("(" +dict_id+",'"+ tempMap.get("code") + "'," + "'" + tempMap.get("name") + "'),");
                                            }
                                            return "success";
                                        }
                                    }
                            );
                            mapList = new ArrayList<>();
                            try {
                                String s = stringFuture.get();
                                if ("success".equals(s)) {
                                    String sql = prefix + sb.substring(0, sb.length() - 1);  // 构建完整SQL
                                    pst.addBatch(sql);   // 添加执行SQL
                                    pst.executeBatch();  // 执行操作
                                    session.getBasicRemote().sendText("success");
                                }
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } catch (ExecutionException e) {
                                e.printStackTrace();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    // 执行完之后 mapList 是个 不足5000个的,这个时候我们再去执行一次 添加操作
                    List<Map> finalMapList = mapList;
                    System.out.println("mapList大小为:"+mapList.size());
                    System.out.println("finalMapList:"+finalMapList.size());
                    final StringBuffer sb = new StringBuffer();
                    Future<String> stringFuture = fixedThreadPool.submit(
                            new Callable<String>() {
                                @Override
                                public String call() throws Exception {
                                    String name = Thread.currentThread().getName();
                                    long threadId = Thread.currentThread().getId();
                                    log.info("thread name: "+name+"id为"+threadId+"执行了一次");
                                    for (Map tempMap : finalMapList) {
                                        // 构建SQL后缀
                                        sb.append("(" +dict_id+",'"+ tempMap.get("code") + "'," + "'" + tempMap.get("name") + "'),");
                                    }
                                    return "success";
                                }
                            }
                    );
                    try {
                        String s = stringFuture.get();
                        if ("success".equals(s)) {
                            String sql = prefix + sb.substring(0, sb.length() - 1);  // 构建完整SQL
                            pst.addBatch(sql);   // 添加执行SQL
                            pst.executeBatch();  // 执行操作
                            session.getBasicRemote().sendText("success");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    conn.commit();
                    pst.close();
                    conn.close();
                }
                Long end = new Date().getTime();
                log.info("条数据从远程库导入到本地花费时间 : " + (end - begin)  + " ms");
            } catch (SQLException e) {
                // e.printStackTrace();  // 批量执行遇到异常直接 用log打印,不要中断
                log.info(e.getMessage());
            }finally {
                fixedThreadPool.shutdown();   // 一定要shutdown  否则线程只是被回收到了线程池
            }
        });
        return "over";
    }
}
View Code

  3.4 使用在线工具 websocket测试 

  http://www.blue-zero.com/WebSocket/

  测试效果截图 :

  

   数据库效果截图 :

  

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM