1.問題描述
對於大文件上傳解析,若直接上傳,會超時,可使用WebSocket長鏈接方式實時顯示文件的上傳狀態,實際上是從文件上傳到內容解析完成存入數據庫的過程,各個階段的進度可自定義。
本文使用SpringBoot+WebSocket+vue2.0+Element+nginx實現文件實時上傳顯示進度條,上傳的截圖如下:
2.解決方案
1)導入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
2)開啟WebSocket的支持,並把該類注入到spring容器中
package com.zxh.example.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; //開啟WebSocket的支持,並把該類注入到spring容器中 @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
3)編寫WebSocket服務
package com.zxh.example.service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; /** * @author zhengkai.blog.csdn.net */ @ServerEndpoint("/wsServer/{userId}") @Component @Slf4j public class WebSocketServer { /** * 靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。 */ private static int onlineCount = 0; /** * concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。 */ private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>(); /** * 與某個客戶端的連接會話,需要通過它來給客戶端發送數據 */ private Session session; /** * 接收userId */ private String userId = ""; /** * 連接建立成功調用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { this.session = session; this.userId = userId; if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); webSocketMap.put(userId, this); //加入set中 } else { webSocketMap.put(userId, this); //加入set中 addOnlineCount(); //在線數加1 } log.info("用戶連接:" + userId + ",當前在線人數為:" + getOnlineCount()); try { sendMessage("連接成功"); } catch (IOException e) { log.error("用戶:" + userId + ",網絡異常!!!!!!"); } } /** * 連接關閉調用的方法 */ @OnClose public void onClose() { if (webSocketMap.containsKey(userId)) { webSocketMap.remove(userId); //從set中刪除 subOnlineCount(); } log.info("用戶退出:" + userId + ",當前在線人數為:" + getOnlineCount()); } /** * 收到客戶端消息后調用的方法 * * @param message 客戶端發送過來的消息 */ @OnMessage public void onMessage(String message, Session session) { log.info("用戶消息:" + userId + ",報文:" + message); //可以群發消息 //消息保存到數據庫、redis if (StringUtils.isNotBlank(message)) { try { //解析發送的報文 JSONObject jsonObject = JSON.parseObject(message); //追加發送人(防止串改) jsonObject.put("fromUserId", this.userId); String toUserId = jsonObject.getString("toUserId"); //傳送給對應toUserId用戶的websocket if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) { webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); } else { log.error("請求的userId:" + toUserId + "不在該服務器上"); //否則不在這個服務器上,發送到mysql或者redis } } catch (Exception e) { e.printStackTrace(); } } } /** * 出現錯誤 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("用戶錯誤:" + this.userId + ",原因:" + error.getMessage()); error.printStackTrace(); } /** * 實現服務器主動推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 發送自定義消息 */ public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException { log.info("發送消息到:" + userId + ",報文:" + message); if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) { webSocketMap.get(userId).sendMessage(message); } else { log.error("用戶" + userId + ",不在線!"); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } }
4)編寫文件上傳的controller
package com.zxh.example.controller; import com.zxh.example.service.TestService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.multipart.MultipartFile; @RestController @RequestMapping("/api") @Slf4j public class TestController { @Autowired private TestService testService; @PostMapping("/upload") public String upload(MultipartFile file) { return testService.upload(file); } }
5)編寫文件上傳的實現類,實時解析文件並發送通知
package com.zxh.example.service; import cn.afterturn.easypoi.handler.inter.IReadHandler; import com.zxh.example.entity.User; import com.zxh.example.util.ExcelUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; import java.io.IOException; import java.util.ArrayList; import java.util.List; @Service @Slf4j public class TestService { public String upload(MultipartFile file) { Integer[] percent = {1}; sendMessage(percent[0]); Integer percentMax1 = 20; Integer percentMax2 = 100; // 讀取Excel中的數據到list集合中 List<User> list = new ArrayList<>(); //解析excel,解析1%~20% ExcelUtils.importExcelBySax(file, User.class, 2, new IReadHandler<User>() { @Override public void handler(User o) { list.add(o); //每讀取指定行,推送1 if (list.size() % 10000 == 0 && percent[0] < percentMax1) { percent[0]++; sendMessage(percent[0]); } } @Override public void doAfterAll() { //解析成功 percent[0] = percentMax1; sendMessage(percent[0]); } }); //模擬數據插入,每1000條發送一次消息 21%~100% Integer maxSize = 1000; Integer queryCnt = list.size() % maxSize == 0 ? list.size() / maxSize : (list.size() / maxSize) + 1; Integer sendCnt = 10; for (int i = 0; i < queryCnt; i++) { Integer endIndex = (i + 1) * maxSize; if (endIndex > list.size()) { endIndex = list.size(); } //集合截取 List<User> tempList = new ArrayList<>(list.subList(i * maxSize, endIndex)); //模擬數據查詢 if (queryCnt % sendCnt == 0 && percent[0] < percentMax2) { percent[0]++; sendMessage(percent[0]); } } percent[0] = percentMax2; sendMessage(percent[0]); return "success"; } /** * 自定義封裝的發送方法 * @param msg */ private void sendMessage(Integer msg) { try { WebSocketServer.sendInfo(msg.toString(), "111"); } catch (IOException e) { log.error("消息發送異常:" + e.getMessage()); e.printStackTrace(); } } }
6)編寫全局的global.js,可在全局使用,方便各個頁面都能獲取到消息
export default { //websocket webSocket: {}, setWs: function (ws) { this.webSocket = ws }, wsUrl: `${location.protocol === 'https:' ? 'wss' : 'ws'}://${location.host}/wsServer/`, }
7)在main.js中注入global.js中的方法
import global from './global' Vue.prototype.global = global
8)在Vue的App.vue創建webscoketd對象,並注冊到全局
<template> <div id="app"> <router-view /> </div> </template> <script> export default { name: 'App', data() { return { socket: null } }, mounted() { this.initWs() }, methods: { //初始化 initWs() { if (typeof (WebSocket) === "undefined") { alert("您的瀏覽器不支持socket") } else { // 實例化socket 111是固定的用戶id,正式環境直接獲取當前登錄用戶id this.socket = new WebSocket(this.global.wsUrl + '111') this.global.setWs(this.socket) // 監聽socket連接 this.socket.onopen = () => { console.log("socket連接成功") } // 監聽socket錯誤信息 this.socket.onerror = () => { console.error("連接錯誤") } //監聽socket消息 this.socket.onmessage = (msg) => { // console.log(msg) } // 監聽socket關閉信息 this.socket.onclose = (e) => { console.error("socket已經關閉") console.error(e) } } }, }, } </script> <style> #app { height: 100%; } </style>
9)在vue.config.js配置協議,轉發到后台服務(本地開發)
module.exports = { devServer: { host: '0.0.0.0', // //設置端口號 port: 8006, //自動打開瀏覽器 open: true, proxy: { '/api': { target: 'http://localhost:8080', }, //websocket配置,正式環境設置nginx代理 '/wsServer': { target: 'http://localhost:8080' }, }, }, }
10)編寫上傳文件的頁面
<template> <div> <el-button type="primary" icon="el-icon-upload" @click="handleUpload" style="margin-left: 10px;">導入 </el-button> <el-upload ref="importUpload" :auto-upload="false" :show-file-list="false" :on-change="postFile" style="display: inline" action="#"> <el-button id="uploadButton1" style="display: none" slot="trigger" /> </el-upload> <el-dialog title="上傳進度" :visible.sync="uploadDialog" width="30%" @close="closeDialog" :close-on-click-modal="false"> <p> <div class="time-content">已用時間:{{timesStr}}</div> </p> <el-progress :percentage="percentMsg" :text-inside="true" :stroke-width="23"></el-progress> <div class="status-content"> <p v-if="importStatus == 1"> <span class="status-content-icon-span">上傳中,請稍后......</span> </p> <p v-if="importStatus == 2"><i class="el-icon-success"></i> <span class="status-content-icon-span">上傳成功</span> </p> <p v-if="importStatus == 3"><i class="el-icon-error"></i> <span class="status-content-icon-span">上傳失敗</span> </p> </div> </el-dialog> </div> </template> <script> import { user } from "@/api/user"; let that export default { data() { return { uploadDialog: false, websocket: "", percentMsg: 0, times: 0, timesStr: '00:00', timesId: null, importStatus: 0, //上傳狀態,0未上傳,1上傳中,2上傳成功,3上傳失敗 } }, created() { that = this }, watch: { 'percentMsg': function (val) { if (val === 100 && this.timesId) { clearInterval(this.timesId) } }, 'importStatus': function (val) { if (val === 3 && this.timesId) { clearInterval(this.timesId) } } }, mounted() { this.getSystemWs() }, methods: { getSystemWs() { this.global.webSocket.onmessage = res => { if (res && res.data) { this.percentMsg = Number(res.data) } else { this.importStatus = 3 } } }, //上傳開始計時 startUpload() { this.timesId = setInterval(function () { let timesStr = that.timesStr that.times++ let m = parseInt(that.times / 60) let s = that.times % 60 if (that.times != 0 && s % 60 == 0) { m = that.times / 60 s = 0 } if (m < 10) { timesStr = '0' + m } else { timesStr = m } timesStr += ":" if (s < 10) { timesStr = timesStr + '0' } timesStr = timesStr + s that.timesStr = timesStr }, 1000); }, handleUpload() { const uploadObj1 = document.getElementById("uploadButton1"); uploadObj1.click(); }, beforeUpload(file) { if (file.type == "" || file.type == null || file.type == undefined) { const FileExt = file.name.replace(/.+\./, "").toLowerCase(); if ( FileExt == "xls" || FileExt == "xlsx" || FileExt == "XLS" || FileExt == "XLSX" ) { return true; } else { this.$message.error("上傳文件必須是Excel格式!"); return false; } } return true; }, postFile(file) { this.percentMsg = 0 this.startUpload() var fileData = new FormData(); fileData.append("file", file.raw); let headers = { "Content-Type": "multipart/form-data" }; this.uploadDialog = true; user.upload(fileData).then(res => { if (res == 'success') { this.importStatus = 2 } else { this.importStatus = 3 } }); }, closeDialog() { if (this.timesId) { clearInterval(this.timesId) } this.percentMsg = 0 this.times = 0 this.timesStr = '00:00' if (this.importStatus == 2) { this.getList() } this.importStatus = 0 }, }, } </script> <style> .time-content { text-align: right; width: 100%; } .status-content { margin-top: 40px; width: 100%; text-align: center; } .status-content .el-icon-success { font-size: 30px; vertical-align: -20%; color: #67C23A; } .status-content .el-icon-error { font-size: 30px; vertical-align: -20%; color: #ee3838; } .status-content .el-icon-warning { font-size: 30px; vertical-align: -20%; color: #E6A23C; } .status-content-icon-span { margin-left: 10px; } </style>
3.注意事項
3.1nginx代理配置
11)在上線時是需要使用nginx代理的,故需使用nginx代理前端的WebSocket
在nginx.conf做如下配置:
... #請求體大小 client_max_body_size 20M; ... server { listen 81; server_name localhost; location / { root html; try_files $uri $uri/ /index.html; } location ~^/api/ { proxy_pass http://127.0.0.1:8080; proxy_read_timeout 600s; #默認是60s,若不配置則超過60s會出現504狀態碼 } #websocket代理配置 location ~^/wsServer/ { proxy_pass http://127.0.0.1:8080; # 開啟nginx對websocket的支持 proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_read_timeout 36000s; #10小時未傳輸數據則關閉連接 } ...
默認情況下,如果代理服務器在60秒內未傳輸任何數據,則連接將關閉。請求體的大小根據實際情況修改。若不配置,則上傳文件超過默認值1MB時就會出現413錯誤狀態碼。
3.2多節點問題
在單節點服務時,上述即可滿足需求,但多節點服務時,通過nginx代理,若連接和請求都在同一台服務器時,可正常使用,但也會出現和A服務器連接了WebSocket,但在導入時請求的是B服務器的情況,此時B服務器並不會發送消息給前端,導致導入時不顯示進度。此時就需要使用分布式的通知方式,下面使用redis的發布訂閱功能進行消息的通知。
1)導入redis依賴
<!-- redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2)創建redis消息實體,
package com.zxh.model; import lombok.Data; import lombok.experimental.Accessors; import java.util.List; /** * redis發布訂閱的消息實體 */ @Data @Accessors(chain = true) public class RedisMessage { //消息類型,1全部廣播,2個人信息 private Integer category; //消息 private String message; //要發送的用戶組 private List<String> userList; }
方便消息的封裝。
2)創建業務處理類,監聽redis消息發布
主要用於監聽消息的發布,收到消息時進行相關業務的處理。
package com.zxh.common.listener; import com.alibaba.fastjson.JSON; import com.zxh.common.util.CollectionUtil; import com.zxh.model.RedisMessage; import com.zxh.server.WebSocketServer; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * redis消息訂閱-業務處理 */ @Component @Slf4j public class RedisMessageListener implements MessageListener { //重寫onMessage,處理相關發布訂閱的業務 @SneakyThrows @Override public void onMessage(Message message, byte[] bytes) { String body = new String(message.getBody(), "UTF-8"); RedisMessage redisMessage = JSON.parseObject(body, RedisMessage.class); if (redisMessage != null) { Integer category = redisMessage.getCategory(); //個人信息 if (category == 2) { //根據用戶id消息 if (CollectionUtil.isNotEmpty(redisMessage.getUserList())) { redisMessage.getUserList().stream().forEach(userId -> { try { WebSocketServer.sendInfo(redisMessage.getMessage(),userId); } catch (IOException e) { e.printStackTrace(); } }); } else { log.warn("無用戶信息,發送信息失敗"); } } else if (category == 1) { } } } }
3)配置redis發布訂閱
package com.zxh.configure; import com.zxh.common.SystemConst; import com.zxh.common.listener.RedisMessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; /** * redis發布訂閱配置 */ @Configuration @EnableCaching public class RedisPubSubConfig { Logger logger = LoggerFactory.getLogger(this.getClass()); /** * 配置 交換機消息,添加多個 messageListener參數,配置不同的交換機 * * @param connectionFactory * @param listenerAdapter * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic("channel:test1")); return container; } /** * 消息監聽器適配器,綁定消息處理器,利用反射技術調用消息處理器的業務方法 * * @param listener 業務處理類 * @return */ @Bean MessageListenerAdapter listenerAdapter(RedisMessageListener listener) { logger.info("redis消息監聽器加載成功--------->>>>>>"); // onMessage 就是方法名,基於反射調用 return new MessageListenerAdapter(listener, "onMessage"); } @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } }
4)調用redis的發布功能
修改TestService的sendMessage的方法,把使用WebSocket發送信息改為把消息發布到redis中。
@Service @Slf4j public class TestService { ..... @Autowired private StringRedisTemplate stringRedisTemplate; private void sendMessage(Integer msg) { List<String> userList = Arrays.asList("1111");//使用redis的發布訂閱發送消息 RedisMessage redisMessage = new RedisMessage().setCategory(2); redisMessage.setMessage(msg.toString()).setUserList(userList); stringRedisTemplate.convertAndSend("channel:test1", JSON.toJSONString(redisMessage)); } }
redis發布后,監聽器監聽到有消息時,使用WebSocket進行消息推送。每台服務器都會推送,只有服務連接成功的一台服務器才能通知到前台成功