redisTemplate實現輕量級消息隊列, 異步處理excel並實現騰訊雲cos文件上傳下載


背景
公司項目有個需求, 前端上傳excel文件, 后端讀取數據、處理數據、返回錯誤數據, 最簡單的方式同步處理, 客戶端上傳文件后一直阻塞等待響應, 但用戶體驗無疑很差, 處理數據可能十分耗時, 沒人願意傻等, 由於項目暫未使用ActiveMQ等消息隊列中間件, 而redis的lpush和rpop很適合作為一種輕量級的消息隊列實現, 所以用它完成此次功能開發

一、本文涉及知識點

  1. excel文件讀寫--阿里easyexcel sdk
  2. 文件上傳、下載--騰訊雲對象存儲
  3. 遠程服務調用--restTemplate
  4. 生產者、消費者--redisTemplate leftPush和rightPop操作
  5. 異步處理數據--Executors線程池
  6. 讀取網絡文件流--HttpClient
  7. 自定義注解實現用戶身份認證--JWT token認證, 攔截器攔截標注有@LoginRequired注解的請求入口

當然, Java實現咯
涉及的知識點比較多, 每一個知識點都可以作為專題進行學習分析, 本文將完整實現呈現出來, 后期拆分與小伙伴分享學習

二、項目目錄結構

項目結構

說明: 數據庫DAO層放到另一個模塊了, 不是本文重點

三、主要maven依賴

  1. easyexcel
<easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>easyexcel</artifactId>
			<version>${easyexcel-latestVersion}</version>
		</dependency>
  1. JWT
		<dependency>
			<groupId>io.jsonwebtoken</groupId>
			<artifactId>jjwt</artifactId>
			<version>0.7.0</version>
		</dependency>
  1. redis
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-redis</artifactId>
			<version>1.3.5.RELEASE</version>
		</dependency>
  1. 騰訊cos
		<dependency>
			<groupId>com.qcloud</groupId>
			<artifactId>cos_api</artifactId>
			<version>5.4.5</version>
		</dependency>

四、流程

  1. 用戶上傳文件
  2. 將文件存儲到騰訊cos
  3. 將上傳后的文件id及上傳記錄保存到數據庫
  4. redis生產一條導入消息, 即保存文件id到redis
  5. 請求結束, 返回"處理中"狀態
  6. redis消費消息
  7. 讀取cos文件, 異步處理數據
  8. 將錯誤數據以excel形式上傳至cos, 以供用戶下載, 並更新處理狀態為"處理完成"
  9. 客戶端輪詢查詢處理狀態, 並可以下載錯誤文件
  10. 結束

五、實現效果

  1. 上傳文件
    上傳文件

  2. 數據庫導入記錄
    數據庫導入記錄

  3. 導入的數據
    導入的數據

  4. 下載錯誤文件
    下載錯誤文件

  5. 錯誤數據提示
    錯誤數據提示

  6. 查詢導入記錄
    查詢導入記錄

六、代碼實現

1、導入excel控制層

    @LoginRequired
    @RequestMapping(value = "doImport", method = RequestMethod.POST)
    public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {
        PLUser user = getUser(request);
        return orderImportService.doImport(file, user.getId());
    }

2、service層

    @Override
    public JsonResponse doImport(MultipartFile file, Integer userId) {
        if (null == file || file.isEmpty()) {
            throw new ServiceException("文件不能為空");
        }

        String filename = file.getOriginalFilename();
        if (!checkFileSuffix(filename)) {
            throw new ServiceException("當前僅支持xlsx格式的excel");
        }

        // 存儲文件
        String fileId = saveToOss(file);
        if (StringUtils.isBlank(fileId)) {
            throw new ServiceException("文件上傳失敗, 請稍后重試");
        }

        // 保存記錄到數據庫
        saveRecordToDB(userId, fileId, filename);

        // 生產一條訂單導入消息
        redisProducer.produce(RedisKey.orderImportKey, fileId);

        return JsonResponse.ok("導入成功, 處理中...");
    }

    /**
     * 校驗文件格式
     * @param fileName
     * @return
     */
    private static boolean checkFileSuffix(String fileName) {
        if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {
            return false;
        }

        int pointIndex = fileName.lastIndexOf(".");
        String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();
        if (".xlsx".equals(suffix)) {
            return true;
        }

        return false;
    }

   /**
     * 將文件存儲到騰訊OSS
     * @param file
     * @return
     */
    private String saveToOss(MultipartFile file) {
        InputStream ins = null;
        try {
            ins = file.getInputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }

        String fileId;
        try {
            String originalFilename = file.getOriginalFilename();
            File f = new File(originalFilename);
            inputStreamToFile(ins, f);
            FileSystemResource resource = new FileSystemResource(f);

            MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
            param.add("file", resource);

            ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
            fileId = (String) responseResult.getData();
        } catch (Exception e) {
            fileId = null;
        }

        return fileId;
    }

3、redis生產者

@Service
public class RedisProducerImpl implements RedisProducer {

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public JsonResponse produce(String key, String msg) {
        Map<String, String> map = Maps.newHashMap();
        map.put("fileId", msg);
        redisTemplate.opsForList().leftPush(key, map);
        return JsonResponse.ok();
    }

}

4、redis消費者

@Service
public class RedisConsumer {

    @Autowired
    public RedisTemplate redisTemplate;

    @Value("${txOssFileUrl}")
    private String txOssFileUrl;

    @Value("${txOssUploadUrl}")
    private String txOssUploadUrl;

    @PostConstruct
    public void init() {
        processOrderImport();
    }

    /**
     * 處理訂單導入
     */
    private void processOrderImport() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            while (true) {
                Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);
                if (null == object) {
                    continue;
                }
                String msg = JSON.toJSONString(object);
                executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));
            }
        });
    }

}

5、處理任務線程類

public class OrderImportTask implements Runnable {
    public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {
        this.msg = msg;
        this.txOssFileUrl = txOssFileUrl;
        this.txOssUploadUrl = txOssUploadUrl;
    }
}

    /**
     * 注入bean
     */
    private void autowireBean() {
        this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);
        this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);
        this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);
    }

    @Override
    public void run() {
        // 注入bean
        autowireBean();

        JSONObject jsonObject = JSON.parseObject(msg);
        String fileId = jsonObject.getString("fileId");

        MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
        param.add("id", fileId);

        ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);
        String fileUrl = (String) responseResult.getData();
        if (StringUtils.isBlank(fileUrl)) {
            return;
        }

        InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);
        List<Object> list = ExcelUtil.read(inputStream);
        process(list, fileId);
    }

    /**
     * 將文件上傳至oss
     * @param file
     * @return
     */
    private String saveToOss(File file) {
        String fileId;
        try {
            FileSystemResource resource = new FileSystemResource(file);
            MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
            param.add("file", resource);

            ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
            fileId = (String) responseResult.getData();
        } catch (Exception e) {
            fileId = null;
        }
        return fileId;
    }

說明: 處理數據的業務邏輯代碼就不用貼了

6、上傳文件到cos

    @RequestMapping("/txOssUpload")
    @ResponseBody
    public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {
        if (null == file || file.isEmpty()) {
            return ResponseResult.fail("文件不能為空");
        }

        String originalFilename = file.getOriginalFilename();
        originalFilename = MimeUtility.decodeText(originalFilename);// 解決中文亂碼問題
        String contentType = getContentType(originalFilename);
        String key;

        InputStream ins = null;
        File f = null;

        try {
            ins = file.getInputStream();
            f = new File(originalFilename);
            inputStreamToFile(ins, f);
            key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);
        } catch (Exception e) {
            return ResponseResult.fail(e.getMessage());
        } finally {
            if (null != ins) {
                try {
                    ins.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (f.exists()) {// 刪除臨時文件
                f.delete();
            }
        }

        return ResponseResult.ok(key);
    }

    public static void inputStreamToFile(InputStream ins,File file) {
        try {
            OutputStream os = new FileOutputStream(file);
            int bytesRead = 0;
            byte[] buffer = new byte[8192];
            while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
                os.write(buffer, 0, bytesRead);
            }
            os.close();
            ins.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String txOssUpload(FileInputStream inputStream, String key, String contentType) {
        key = Uuid.getUuid() + "-" + key;
        OSSUtil.txOssUpload(inputStream, key, contentType);
        try {
            if (null != inputStream) {
                inputStream.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return key;
    }

    public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {
        ObjectMetadata objectMetadata = new ObjectMetadata();
        try{
            int length = inputStream.available();
            objectMetadata.setContentLength(length);
        }catch (Exception e){
            logger.info(e.getMessage());
        }
        objectMetadata.setContentType(contentType);
        cosclient.putObject(txbucketName, key, inputStream, objectMetadata);
    }

7、下載文件

    /**
     * 騰訊雲文件下載
     * @param response
     * @param id
     * @return
     */
    @RequestMapping("/txOssDownload")
    public Object txOssDownload(HttpServletResponse response, String id) {
        COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);
        String contentType = getContentType(id);
        FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);
        return null;
    }

    public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {
        FileOutputStream fos = null;
        response.reset();
        OutputStream os = null;
        try {
            response.setContentType(contentType + "; charset=utf-8");
            if(!contentType.equals(PlConstans.FileContentType.image)){
                try {
                    response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));
                } catch (UnsupportedEncodingException e) {
                    response.setHeader("Content-Disposition", "attachment; filename=" + fileName);
                    logger.error("encoding file name failed", e);
                }
            }

            os = response.getOutputStream();

            byte[] b = new byte[1024 * 1024];
            int len;
            while ((len = fileStream.read(b)) > 0) {
                os.write(b, 0, len);
                os.flush();
                try {
                    if(fos != null) {
                        fos.write(b, 0, len);
                        fos.flush();
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage());
                }
            }
        } catch (IOException e) {
            IOUtils.closeQuietly(fos);
            fos = null;
        } finally {
            IOUtils.closeQuietly(os);
            IOUtils.closeQuietly(fileStream);
            if(fos != null) {
                IOUtils.closeQuietly(fos);
            }
        }
    }

8、讀取網絡文件流

	/**
	 * 讀取網絡文件流
	 * @param url
	 * @return
	 */
	public static InputStream readFileFromURL(String url) {
		if (StringUtils.isBlank(url)) {
			return null;
		}

		HttpClient httpClient = new DefaultHttpClient();
		HttpGet methodGet = new HttpGet(url);
		try {
			HttpResponse response = httpClient.execute(methodGet);
			if (response.getStatusLine().getStatusCode() == 200) {
				HttpEntity entity = response.getEntity();
				return entity.getContent();
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}

9、ExcelUtil

    /**
     * 讀excel
     * @param inputStream 文件輸入流
     * @return list集合
     */
    public static List<Object> read(InputStream inputStream) {
        return EasyExcelFactory.read(inputStream, new Sheet(1, 1));
    }

    /**
     * 寫excel
     * @param data list數據
     * @param clazz
     * @param saveFilePath 文件保存路徑
     * @throws IOException
     */
    public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException {
        File tempFile = new File(saveFilePath);
        OutputStream out = new FileOutputStream(tempFile);
        ExcelWriter writer = EasyExcelFactory.getWriter(out);
        Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);
        writer.write(data, sheet);
        writer.finish();
        out.close();
    }

說明: 至此, 整個流程算是完整了, 下面將其他知識點代碼也貼出來參考

七、其他

1、@LoginRequired注解

/**
 * 在需要登錄驗證的Controller的方法上使用此注解
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface LoginRequired {
}

2、MyControllerAdvice

@ControllerAdvice
public class MyControllerAdvice {

    @ResponseBody
    @ExceptionHandler(TokenValidationException.class)
    public JsonResponse tokenValidationExceptionHandler() {
        return JsonResponse.loginInvalid();
    }

    @ResponseBody
    @ExceptionHandler(ServiceException.class)
    public JsonResponse serviceExceptionHandler(ServiceException se) {
        return JsonResponse.fail(se.getMsg());
    }

    @ResponseBody
    @ExceptionHandler(Exception.class)
    public JsonResponse exceptionHandler(Exception e) {
        e.printStackTrace();
        return JsonResponse.fail(e.getMessage());
    }

}

3、AuthenticationInterceptor

public class AuthenticationInterceptor implements HandlerInterceptor {

    private static final String CURRENT_USER = "user";

    @Autowired
    private UserService userService;

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        // 如果不是映射到方法直接通過
        if (!(handler instanceof HandlerMethod)) {
            return true;
        }
        HandlerMethod handlerMethod = (HandlerMethod) handler;
        Method method = handlerMethod.getMethod();

        // 判斷接口是否有@LoginRequired注解, 有則需要登錄
        LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);
        if (methodAnnotation != null) {
            // 驗證token
            Integer userId = JwtUtil.verifyToken(request);
            PLUser plUser = userService.selectByPrimaryKey(userId);
            if (null == plUser) {
                throw new RuntimeException("用戶不存在,請重新登錄");
            }
            request.setAttribute(CURRENT_USER, plUser);
            return true;
        }
        return true;
    }

    @Override
    public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
    }

    @Override
    public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
    }
}

4、JwtUtil

    public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天
    public static final String SECRET = "pl_token_secret";
    public static final String HEADER = "token";
    public static final String USER_ID = "userId";

    /**
     * 根據userId生成token
     * @param userId
     * @return
     */
    public static String generateToken(String userId) {
        HashMap<String, Object> map = new HashMap<>();
        map.put(USER_ID, userId);
        String jwt = Jwts.builder()
                .setClaims(map)
                .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
                .signWith(SignatureAlgorithm.HS512, SECRET)
                .compact();
        return jwt;
    }

    /**
     * 驗證token
     * @param request
     * @return 驗證通過返回userId
     */
    public static Integer verifyToken(HttpServletRequest request) {
        String token = request.getHeader(HEADER);
        if (token != null) {
            try {
                Map<String, Object> body = Jwts.parser()
                        .setSigningKey(SECRET)
                        .parseClaimsJws(token)
                        .getBody();

                for (Map.Entry entry : body.entrySet()) {
                    Object key = entry.getKey();
                    Object value = entry.getValue();
                    if (key.toString().equals(USER_ID)) {
                        return Integer.valueOf(value.toString());// userId
                    }
                }
                return null;
            } catch (Exception e) {
                logger.error(e.getMessage());
                throw new TokenValidationException("unauthorized");
            }
        } else {
            throw new TokenValidationException("missing token");
        }
    }

結語: OK, 搞定,睡了, 好困


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM