websocket+多線程+jdbc大數據量導表


1. 項目背景 : 前幾天以及最近的博客,都在完成一個接口,前端點擊“導入”按鈕,后台完成大數據量導入,並且在導入的過程中持續告知導入情況;

2. 思考分析 : 大數據量導入,假設是10W條,那么如果是使用關系型數據庫則可以使用jdbc的fetch游標移動完成工作,並且在插入的時候可以使用多線程加快速度,

在最開始的時候查詢出總數據量,除以我們規定的5000次導入一次就得到了子線程應當執行的次數,並返回給前端,之后子線程開始執行,每執行一次,返回一個

狀態碼給前端,則前端依據此2個環節可推斷進度,並能顯示對應的進度條效果。

3. 代碼實踐 : 

  3.1 pom 加入 websokcet依賴 

  

 <!--  引入websokcet -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>

   3.2 springboot 注入 websocket配置類 

  

package root.websocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @Date: 2018/10/25 10:10
 * @Description:  注入 websocekt配置類
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}
View Code

  3.3 springboot  加入自己要對外的websocket類

  

package root.websocket;

import org.apache.ibatis.session.SqlSession;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import root.report.control.dict.TestTwo;
import root.report.db.DbFactory;
import root.report.service.DictService;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @Date: 2018/10/25 10:11
 * @Description: 注入dictSocket接口
 */
@ServerEndpoint(value = "/websocket/dictSocket")
@Component
public class ImportDictValueSocket {

    private static Logger log = Logger.getLogger(ImportDictValueSocket.class);

    private DictService dictService;
    //此處是解決無法注入的關鍵
    private static ApplicationContext applicationContext;
    public static void setApplicationContext(ApplicationContext applicationContext) {
        ImportDictValueSocket.applicationContext = applicationContext;
    }

    //靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。
    private static int onlineCount = 0;

    //concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。
    private static CopyOnWriteArraySet<ImportDictValueSocket> webSocketSet = new CopyOnWriteArraySet<ImportDictValueSocket>();

    //與某個客戶端的連接會話,需要通過它來給客戶端發送數據
    private Session session;

    /**
     * 連接建立成功調用的方法*/
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        webSocketSet.add(this);     //加入set中
        addOnlineCount();           //在線數加1
        /*System.out.println("有新連接加入!當前在線人數為" + getOnlineCount());
        try {
            sendMessage("當前在線人數"+getOnlineCount()+",當前sessionID:"+this.session.getId());
        } catch (IOException e) {
            System.out.println("IO異常");
        }*/
    }

    /**
     * 連接關閉調用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  //從set中刪除
        subOnlineCount();           //在線數減1
        log.info("有一連接關閉!當前在線人數為" + getOnlineCount());
    }

    /**
     * 收到客戶端消息后調用的方法
     *
     * @param message 客戶端發送過來的消息*/
    @OnMessage
    public void onMessage(String message, Session session) throws IOException, InterruptedException {
        log.info("來自客戶端的消息:" + message);
        // 調用  TestTwo的 導入方法,一旦連接成功 , 則 持續通信
        try {
            dictService = applicationContext.getBean(DictService.class);
            String result = dictService.importFuncDictValueByDictId(session,Integer.parseInt(message));   // 需要加鎖與否?
            synchronized (result){
                log.info("結果為"+result);
                sendMessage(result);   // 返回  "over"
                session.close();  // 關閉掉
            }
        }catch (Exception e){
            log.error(e.getMessage());
            sendMessage(e.getMessage());
        }

        //群發消息
      /*  for (ImportDictValueSocket item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }*/
    }

    /**
     * 發生錯誤時調用
     @OnError
     public void onError(Session session, Throwable error) {
     System.out.println("發生錯誤");
     error.printStackTrace();
     }  **/


     public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
     }

     /**
      * 群發自定義消息
      * */
    public static void sendInfo(String message) throws IOException {
        for (ImportDictValueSocket item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                continue;
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        ImportDictValueSocket.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        ImportDictValueSocket.onlineCount--;
    }
}
View Code

  3.3-1  注意到 : 我們在 websocket 當中 無法正常引入 @Service等 業務class,這些必須手動在 websocket生成,或者我們從 容器當中讀到,所以我們要在啟動類當中做如下處理:

  

    public static void main(String[] args) {
        ConfigurableApplicationContext configurableApplicationContext = SpringApplication.run(ReportServerApplication.class, args);
        // 解決websocket 當中無法注入bean的方法
        ImportDictValueSocket.setApplicationContext(configurableApplicationContext);
    }

  3.3-2 業務類代碼 

  

package root.report.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.session.SqlSession;
import org.apache.log4j.Logger;
import org.dom4j.*;
import org.dom4j.io.OutputFormat;
import org.dom4j.io.XMLWriter;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.xml.sax.SAXException;
import root.configure.AppConstants;
import root.configure.MybatisCacheConfiguration;
import root.report.db.DbFactory;
import root.report.util.JsonUtil;
import root.report.util.XmlUtil;

import javax.websocket.Session;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.*;
import java.util.*;
import java.util.Date;
import java.util.concurrent.*;
import java.util.function.Consumer;

@Service
public class DictService {
// 測試使用 fetch 按照指定規格讀取數據  ,  源表為 test_dict 目標表名為 test_import
    public void fetchBySql(SqlSession sqlSession,String sourceSql,Consumer<ResultSet> consumer){
        // 通過 conn 得到 真正的連接(要解析 數據庫名,從而真正的 數據庫連接通道)
        // 而在我們這里則不需要這么復雜,前面過程都由DbFactory 完成了,傳遞進來的sqlSession 就是一個 連接好的通道
        Connection conn = sqlSession.getConnection();
        PreparedStatement stm = null;
        // 開始時間
        Long begin = new Date().getTime();
        String sql = sourceSql;
        try {
            final int countRow = 1000;
            stm = conn.prepareStatement(sql,ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);
            // stm.setFetchSize(Integer.MIN_VALUE);      // 設置游標  1000 每次讀取一千行數據
            stm.setFetchSize(countRow);
            stm.setFetchDirection(ResultSet.FETCH_REVERSE);     //
            ResultSet rs = stm.executeQuery();
            // rs.setFetchSize(Integer.MIN_VALUE);
            consumer.accept(rs);    // 把 當前的 rs 傳遞給消費者
        }catch (Exception e){
            e.printStackTrace();
        }
    }
public String importFuncDictValueByDictId(Session session, int dict_id){

        // 先決條件 : 根據 dict_id 得到 sourceSql
        SqlSession sqlSession = DbFactory.Open(DbFactory.FORM);
        String dbName = sqlSession.selectOne("dict.getDictDbByDictId",dict_id);
        if(StringUtils.isBlank(dbName)) return("此DictId所對應的數據庫為空,無法操作!");
        final int countRow = 1000;
        final List<Map<String, Object>> list = new ArrayList<>();

        // 初始化對應的數據庫
        SqlSession sourceSqlSession = DbFactory.Open(dbName);
        if(sourceSqlSession==null)  return("數據庫無法連接,無法操作!");
        String sourceSql = null;
        try {
            sourceSql = this.getSqlTemplate("數據字典",String.valueOf(dict_id),false);
        } catch (Exception e){
            log.info(e.getMessage());
            return "無法從xml文件得到對應的sql,無法查詢!";
        }

        Long begin = new Date().getTime();
        Connection conn  = sqlSession.getConnection();
        final int poolSize = 5;
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(poolSize);
        fetchBySql(sourceSqlSession,sourceSql,rs -> {
            try {
                String prefix = "INSERT INTO func_dict_value (dict_id,value_code,value_name) VALUES ";
                final StringBuffer suffix = new StringBuffer();
                // 設置事務為非自動提交
                conn.setAutoCommit(false);   //  非提交能減少日志的生成,從而加快執行速度
                PreparedStatement pst = (PreparedStatement) conn.prepareStatement("");
                List<Map> mapList = new ArrayList<>();
                if (rs != null) {
                    rs.last();      // 移動到最后面
                    BigDecimal result = new BigDecimal((double) rs.getRow()/5000).setScale(0, BigDecimal.ROUND_UP);
                    int countSize = result.intValue();
                    try {
                        session.getBasicRemote().sendText(String.valueOf(countSize));    // 第一次直接發送給websocket 客戶端1個 本次執行總數統計.
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    rs.first();  // 移動到最前面   // 坑處1,如果使用了firt的話,那么直接達到了第一條,則不要使用 while(rs.next())
                    if(rs!=null && StringUtils.isNotBlank(rs.getInt("code")+"")){
                        Map<String, Object> map = new HashMap<>();
                        map.put("code",rs.getInt("code"));
                        map.put("name",rs.getString("name"));
                        mapList.add(map);
                    }
                    while (rs.next()) {
                        Map<String, Object> map = new HashMap<>();
                        map.put("code",rs.getInt("code"));
                        map.put("name",rs.getString("name"));
                        mapList.add(map);
                        if(rs.getRow()%5000==0){
                            //  執行批量插入操作
                            List<Map> finalMapList = mapList;
                            final StringBuffer sb = new StringBuffer();
                            Future<String> stringFuture = fixedThreadPool.submit(
                                    new Callable<String>() {
                                        @Override
                                        public String call() throws Exception {
                                            String name = Thread.currentThread().getName();
                                            long threadId = Thread.currentThread().getId();
                                            log.info("thread name: "+name+",id為"+threadId+"執行了一次");
                                            for (Map tempMap : finalMapList) {
                                                // 構建SQL后綴
                                                sb.append("(" +dict_id+",'"+ tempMap.get("code") + "'," + "'" + tempMap.get("name") + "'),");
                                            }
                                            return "success";
                                        }
                                    }
                            );
                            mapList = new ArrayList<>();
                            try {
                                String s = stringFuture.get();
                                if ("success".equals(s)) {
                                    String sql = prefix + sb.substring(0, sb.length() - 1);  // 構建完整SQL
                                    pst.addBatch(sql);   // 添加執行SQL
                                    pst.executeBatch();  // 執行操作
                                    session.getBasicRemote().sendText("success");
                                }
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } catch (ExecutionException e) {
                                e.printStackTrace();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    // 執行完之后 mapList 是個 不足5000個的,這個時候我們再去執行一次 添加操作
                    List<Map> finalMapList = mapList;
                    System.out.println("mapList大小為:"+mapList.size());
                    System.out.println("finalMapList:"+finalMapList.size());
                    final StringBuffer sb = new StringBuffer();
                    Future<String> stringFuture = fixedThreadPool.submit(
                            new Callable<String>() {
                                @Override
                                public String call() throws Exception {
                                    String name = Thread.currentThread().getName();
                                    long threadId = Thread.currentThread().getId();
                                    log.info("thread name: "+name+"id為"+threadId+"執行了一次");
                                    for (Map tempMap : finalMapList) {
                                        // 構建SQL后綴
                                        sb.append("(" +dict_id+",'"+ tempMap.get("code") + "'," + "'" + tempMap.get("name") + "'),");
                                    }
                                    return "success";
                                }
                            }
                    );
                    try {
                        String s = stringFuture.get();
                        if ("success".equals(s)) {
                            String sql = prefix + sb.substring(0, sb.length() - 1);  // 構建完整SQL
                            pst.addBatch(sql);   // 添加執行SQL
                            pst.executeBatch();  // 執行操作
                            session.getBasicRemote().sendText("success");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    conn.commit();
                    pst.close();
                    conn.close();
                }
                Long end = new Date().getTime();
                log.info("條數據從遠程庫導入到本地花費時間 : " + (end - begin)  + " ms");
            } catch (SQLException e) {
                // e.printStackTrace();  // 批量執行遇到異常直接 用log打印,不要中斷
                log.info(e.getMessage());
            }finally {
                fixedThreadPool.shutdown();   // 一定要shutdown  否則線程只是被回收到了線程池
            }
        });
        return "over";
    }
}
View Code

  3.4 使用在線工具 websocket測試 

  http://www.blue-zero.com/WebSocket/

  測試效果截圖 :

  

   數據庫效果截圖 :

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM