springboot的異步調用


package com.handsight.platform.fras.aapp;

import java.util.Locale;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.i18n.LocaleContextHolder;
import org.springframework.stereotype.Component;

import com.handsight.platform.common.util.LogUtil;
import com.handsight.platform.fras.data.StaticObject;
import com.handsight.platform.fras.thread.service.AsyncService;

@Component
public class StartupListener implements ApplicationListener<ContextRefreshedEvent> {

    private final static Logger logger = LoggerFactory.getLogger(StartupListener.class);

    @Autowired
    private AsyncService asyncService;

    public StartupListener() {
    };

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {

        // 地域語言
        setSystemDefaultLanguage();

        logger.info("---------------- Start execute Async thread completed.");
        // 刪除用戶的相關訪問信息線程
        asyncService.execDeleteLogOutUserCacheInfo();

        // 啟動圖片批量發送線程
        asyncService.sendImageBatch();

        logger.info("---------------- End execute Async thread completed.");

    }

    /**
     * 設置系統默認語言
     */
    private  void setSystemDefaultLanguage() {
        Locale locale= LocaleContextHolder.getLocale();
        locale = Locale.CHINA;
//        if(!Constants.SYSTEM_DEFAULT_LANGUAGE.equals(locale.getLanguage()) ) {
//            locale = Locale.US;
//        }
        LocaleContextHolder.setLocale(locale);
        StaticObject.locale =locale;
        LogUtil.info("This language is {0}", StaticObject.locale.getLanguage());
    }

}

使用:

package com.handsight.platform.fras.thread.service;

import java.net.Socket;
import java.util.List;

import com.handsight.platform.common.exception.AppException;
import com.handsight.platform.fras.mgt.pojo.T_user;
import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmReq;

/**
 * 異步線程的接口
 *
 * @author wangh
 *
 */
public interface AsyncService {


    /**
     * 批量發送圖片
     *
     * @param userToken
     * @param facePhotoString
     * @throws AppException
     * @throws Exception
     */
    public void sendImageBatch();

    /**
     * 異步任務實現類: 用戶退出后刪除其可變用戶令牌與賬號的緩存信息;以及session信息
     */
    void execDeleteLogOutUserCacheInfo();

    /**
     * 異步任務實現類: 將硬件狀態數據存入數據庫
     */
    void execHardwareStatusDataToDBAsync();

    /**
     * 異步任務實現類:向算法發送任務 by http
     */
    public void workSendTaskThread(String userToken, String facePhotoString) throws AppException, Exception;

    /**
     * 異步任務實現類:向算法發送任務 by http
     *
     * @throws Exception
     */
    public void sendImageTaskThread(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception;

    /**
     * 異步任務實現類:保存用戶令牌及 用戶相關信息
     *
     * @param user
     * @throws Exception
     */
    public void saveUserInfo(T_user user) throws Exception;

    /**
     * 存儲用戶登錄地點信息
     *
     * @param user
     * @return
     */
    public void saveLoginLocationInfo(T_user user);

    /**
     * 更新用戶信息
     *
     * @param user
     * @return
     */
    public void updateUserInfo(T_user user) throws Exception;

    /***
     * 存儲用戶人臉信息及特征值
     *
     * @param user
     * @param userToken
     * @param currentFaceCode
     * @throws Exception
     */
    public void saveUserFaceCode(T_user user, String userToken, String currentFaceCode) throws Exception;

    /**
     * 異步任務實現類:接受算法產生的圖片特征碼 by http
     */
    public void workReciveResultThread(Socket socket);

    /**
     * 異步任務實現類:更新用戶人臉特征庫
     */
    public void updateUserFaceCodeListThread();

    /**
     *
     * 消息推送
     *
     * @param platform
     * @param pushKey
     * @param content
     * @throws Exception
     */
    public void pushMsg(String platform, String pushKey, String content) throws Exception;

    /**
     * 將月度登錄失敗次數加一
     */
    public void addOneMonthFailedNum(String userAccount) throws Exception;

    /**
     * 異步任務實現類:向算法發送任務
     */
    @Deprecated
    public void workSendTaskThread_skt(Socket socket);

    /**
     * 異步任務實現類:接受算法產生的圖片特征碼 socket
     */
    @Deprecated
    public void workReciveResultThread_skt(Socket socket);
}

實現:

package com.handsight.platform.fras.thread.service.impl;

import com.handsight.platform.common.exception.AppException;
import com.handsight.platform.common.util.HttpRequestUtil;
import com.handsight.platform.common.util.JsonUtil;
import com.handsight.platform.common.util.LogUtil;
import com.handsight.platform.common.util.UuidUtil;
import com.handsight.platform.fras.cache.UserCache;
import com.handsight.platform.fras.constant.Constants;
import com.handsight.platform.fras.constant.ErrorCodeMsg;
import com.handsight.platform.fras.data.StaticObject;
import com.handsight.platform.fras.mapper.UserMapper;
import com.handsight.platform.fras.mgt.pojo.T_user;
import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmReq;
import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmRes;
import com.handsight.platform.fras.mgt.pojo.UserFaceBean;
import com.handsight.platform.fras.pojo.MessageBean;
import com.handsight.platform.fras.service.CommonService;
import com.handsight.platform.fras.service.PushService;
import com.handsight.platform.fras.service.RedisService;
import com.handsight.platform.fras.thread.service.AsyncService;
import com.handsight.platform.fras.util.BeanUtil;
import com.handsight.platform.fras.util.EnumUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.NameValuePair;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.handsight.platform.common.constant.Constants.HTTP_RES_CODE;
import static com.handsight.platform.common.constant.Constants.HTTP_RES_CONTENT;
import static com.handsight.platform.fras.constant.Constants.QUEUE_KEY_LOGOUT_USER_TOKEN;

@Service
public class AsyncServiceImpl implements AsyncService {

    private final static Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);

    /** 定義一個每次要數據的大小(35K) */
    public final static int PART_SIZE = 35 * 1024;

    static boolean sendTaskThreadStart = false;

    @Autowired
    private RedisService redisService;

    @Autowired
    private PushService pushService;

    @Autowired
    private CommonService commonService;

    @Autowired
    private UserMapper userMapper;

    @Autowired
    private UserCache userCache;

    @Value("${fras.server.ip}")
    private String serverIp;

    @Value("${fras.send.work.port}")
    private String sendWorkPort;

    @Value("${fras.send.batch.list.size}")
    private int batchListSize;
    @Value("${fras.send.batch.interval.ms.times}")
    private int sendIntervalTimes;


    @Override
    @Async // ("frasAsyncServiceExecutor")
    public  void sendImageBatch() {
        long start = 0L;
         List<TransferAlgorithmReq> transferAlgorithmBeanList = null;
        while (true) {
            try {

                int cnt =0;
                // 每指定個數發送一次
                while (!StaticObject.imageQueue.isEmpty() ) {

                    if (StaticObject.transferAlgorithmBeanList.size() < batchListSize) {
                        redisService.hmSet("a---batchListSize", UuidUtil.uuid(), StaticObject.transferAlgorithmBeanList.size() ); // TODO
                        StaticObject.transferAlgorithmBeanList.add(StaticObject.imageQueue.take());
                        redisService.hmSet("cnt", UuidUtil.uuid(), ++cnt);
                        start = System.currentTimeMillis();
                    } else {
                        System.out.println("輸出集合"+StaticObject.transferAlgorithmBeanList.size());
                        transferAlgorithmBeanList = BeanUtil.deepCopy(StaticObject.transferAlgorithmBeanList);
                        dealWithTask(transferAlgorithmBeanList);// 處理請求
                    }
                }
                // 不足指定個數每指定秒數發送一次
                if (StaticObject.transferAlgorithmBeanList.size() > 0 && (((System.currentTimeMillis() - start)) >= sendIntervalTimes)) {

                    transferAlgorithmBeanList = BeanUtil.deepCopy(StaticObject.transferAlgorithmBeanList);
                    dealWithTask(transferAlgorithmBeanList);// 處理請求
                } else {
                    Thread.sleep(10);
                }
            } catch (Exception e) {
                logger.error("提取特征碼異常!", e);
            } finally {
            }
        }

    }

    /**
     * 處理任務隊列
     * 當已發送的匹數大於3個時將延遲發送1秒
     *
     * @param transferAlgorithmBeanList
     */
    private void dealWithTask(List<TransferAlgorithmReq> transferAlgorithmBeanList) {
        try {
            redisService.hmSet("a---executer", UuidUtil.uuid(), transferAlgorithmBeanList.size());
            long num = redisService.increment(Constants.CACHE_KEY_ALGORITHM_TASK_REQ_NUM, 1L);
            if( num > 3L) {
                Thread.sleep(1000);
            }
            // 批量發送圖片
            sendImageBatchTask(transferAlgorithmBeanList);

//            new Thread(new DealQueueThread(transferAlgorithmBeanList)).start();
            StaticObject.transferAlgorithmBeanList.clear();
        } catch (Exception e) {
            logger.error("特征碼提取處理異常", e);
        } finally {
        }
    }

    /**
     * 批量發送圖片
     *
     * @param transferAlgorithmBeanList
     * @throws AppException
     * @throws Exception
     */
    private void sendImageBatchTask(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception {

        long start = System.currentTimeMillis();
        String dataRequest = "";
        try {

            // 以表單形式向所有算法服務器發送指令 a
            dataRequest = JsonUtil.getJsonString(transferAlgorithmBeanList);
            List<NameValuePair> params = new ArrayList<NameValuePair>();
            NameValuePair pair = new BasicNameValuePair("data", dataRequest);
            params.add(pair);
            String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_BATCH_METHOD + "/";

            logger.info("傳輸數據完畢, 耗時:" + ((System.currentTimeMillis() - start) ) + "ms,  num:" + transferAlgorithmBeanList.size() + "  size: " + dataRequest.length() / 1024);

            // 處理結果
            dealResponseResult(HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 1, params));

            logger.info("獲取結果耗時:" + (System.currentTimeMillis() - start));
        } catch (AppException ape) {

            ErrorCodeMsg errorCodeMsg = EnumUtil.getByCode(ape.getCode(), ErrorCodeMsg.class);
            setExceptionForUser(errorCodeMsg, transferAlgorithmBeanList);
        }  catch (Exception e) {
            setExceptionForUser(ErrorCodeMsg.AI_SERVER_ABNORMAL, transferAlgorithmBeanList);
        } finally {
            redisService.increment(Constants.CACHE_KEY_ALGORITHM_TASK_REQ_NUM, -1L);
        }
    }

    /**
     * 處理http的響應結果
     *
     * @param resultMap
     */
    private void dealResponseResult(Map<String, Object> resultMap) throws Exception {

        if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) {

            String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg());
            throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg);
        } else {

            String code = (String) resultMap.get(HTTP_RES_CODE);
            String content = (String) resultMap.get(HTTP_RES_CONTENT);

            if (!Constants.HTTP_STATUS_CODE_200.equals(code)) {
                logger.error("提取圖片的特征碼出現異常:code: " + code + " msg: " + content);
                String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg());
                throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg);
            } else {

                List<TransferAlgorithmRes> resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class);
                for (TransferAlgorithmRes res : resultList) {
                    redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10);
                }
            }
        }
    }

    /**
     *
     * 為每個用戶設置異常時的信息
     *
     * @param erroCodeMsg
     * @param transferAlgorithmBeanList
     */
    private void setExceptionForUser(ErrorCodeMsg erroCodeMsg, List<TransferAlgorithmReq> transferAlgorithmBeanList) {

        int msgCode = erroCodeMsg.getCode();
        String msg = commonService.getMessage(erroCodeMsg.getMsg());
        for (TransferAlgorithmReq req : transferAlgorithmBeanList) {
            redisService.setForTimeMIN(Constants.CACHE_KEY_EXCEPTION_ALGORITHM + req.getId(), new MessageBean(msgCode, msg), 10);
        }
    }

    /**
     * 異步任務實現類:向算法發送任務 by http
     *
     * @throws Exception
     */
    @Override
//    @Async
    public void sendImageTaskThread(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception {

        long start = System.currentTimeMillis();
        String dataRequest = "";
        try {
            for (TransferAlgorithmReq req : transferAlgorithmBeanList) {
                System.out.println("http-------------------------------" + req.getId());
            }

            List<TransferAlgorithmRes> resultList = new ArrayList<TransferAlgorithmRes>();
            dataRequest = JsonUtil.getJsonString(transferAlgorithmBeanList);
            // 以表單形式向所有算法服務器發送指令
            List<NameValuePair> params = new ArrayList<NameValuePair>();
//            NameValuePair pair = new BasicNameValuePair("id", "1");
//            params.add(pair);
            NameValuePair pair = new BasicNameValuePair("data", dataRequest);
            params.add(pair);
            String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_BATCH_METHOD + "/";
            logger.info("傳輸數據完畢, 2222耗時:" + ((System.currentTimeMillis() - start) / 1000) + "  num:" + transferAlgorithmBeanList.size() + "  size: " + dataRequest.length() / 1024);
            Map<String, Object> resultMap = HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 2, params);

            if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) {
                String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg());
                throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg);
            } else {
                String code = (String) resultMap.get(HTTP_RES_CODE);
                String content = (String) resultMap.get(HTTP_RES_CONTENT);

                if (!Constants.HTTP_STATUS_CODE_200.equals(code)) {
                    logger.error("提取圖片的特征碼出現異常:code: " + code + " msg: " + content);
                    String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg());
                    throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg);
                } else {
                    resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class);
                    for (TransferAlgorithmRes res : resultList) {
                        redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10);
                    }
                }
            }
            logger.info("獲取結果333耗時:" + (System.currentTimeMillis() - start) / 1000);
        } catch (Exception e) {
            throw e;
        } finally {
        }
    }

    /**
     * 異步任務實現類:向算法發送任務 by http
     *
     * @throws Exception
     */
    @Override
//    @Async("asyncServiceExecutor")、
    @Async
    public void workSendTaskThread(String userToken, String facePhotoString) throws AppException, Exception {

        Map<String, Object> resultMap = null;

        long start = 0;
        try {

            if (!sendTaskThreadStart) {
                sendTaskThreadStart = true;
                start = System.currentTimeMillis();
                while (true) {
                    if ((System.currentTimeMillis() - start) / 1000 > 5) {
                        break;
                    } else {
//                        TransferAlgorithmReq transferAlgorithmBean = (TransferAlgorithmReq) redisService.rightPop(Constants.QUEUE_TASK);
                        TransferAlgorithmReq transferAlgorithmBean = StaticObject.imageQueue.poll();
                        if (StaticObject.transferAlgorithmBeanList != null && StaticObject.transferAlgorithmBeanList.size() <= 4) {
                            if (transferAlgorithmBean != null) {
                                StaticObject.transferAlgorithmBeanList.add(transferAlgorithmBean);
                                logger.info("bean:" + transferAlgorithmBean.getId());
                            }
                        } else {
                            redisService.leftPush(Constants.QUEUE_TASK, transferAlgorithmBean);
                            break;
                        }
                    }
                }
            } else {
                return;
            }

            List<TransferAlgorithmRes> resultList = new ArrayList<TransferAlgorithmRes>();

            // 以表單形式向所有算法服務器發送指令
            List<NameValuePair> params = new ArrayList<NameValuePair>();
            NameValuePair pair = new BasicNameValuePair("id", userToken);
            params.add(pair);
            pair = new BasicNameValuePair("img", facePhotoString); // JsonUtil.getJsonString(StaticObject.transferAlgorithmBeanList) TODO
            params.add(pair);
            String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_METHOD + "/";
            resultMap = HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 2, params);

            if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) {
                String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg());
                throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg);
            } else {
                String code = (String) resultMap.get(HTTP_RES_CODE);
                String content = (String) resultMap.get(HTTP_RES_CONTENT);

                TransferAlgorithmRes sd = new TransferAlgorithmRes(userToken, 1002, "001,191,101"); // TODO
                TransferAlgorithmRes bean = JsonUtil.json2Obj(JsonUtil.getJsonString(sd), TransferAlgorithmRes.class); // TODO
                List<TransferAlgorithmRes> lst2 = new ArrayList<TransferAlgorithmRes>(); // TODO
                lst2.add(bean);// TODO
                content = JsonUtil.getJsonString(lst2);// TODO

                if (!Constants.HTTP_STATUS_CODE_200.equals(code)) {
                    logger.error("提取圖片的特征碼出現異常:code: " + code + " msg: " + content);
                    String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg());
                    throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg);
                } else {
                    resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class);
                    for (TransferAlgorithmRes res : resultList) {
                        redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10);
                    }

//                    redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + userToken, content, 10);
                }
            }
            logger.info("傳輸數據完畢,耗時:" + (System.currentTimeMillis() - start + "  size:" + facePhotoString.length() / 1024));
        } catch (Exception e) {
            throw e;
        } finally {
            StaticObject.transferAlgorithmBeanList.clear();
            sendTaskThreadStart = false;
        }

//        return resultMap;
    }

    /**
     * 異步任務實現類:保存用戶令牌及 用戶相關信息
     *
     * @param user
     * @throws Exception
     */
    @Override
    @Async
    @Transactional(rollbackFor = Exception.class)
    public void saveUserInfo(T_user user) throws Exception {

        int cnt = 0;
        while (true) {
            try {
                try {
                    cnt++;
                    // 保存用戶信息
                    commonService.saveUserInfo(user);
                    break;
                } catch (Exception e) {
                    if (cnt > 20) {
                        logger.error("數據保存失敗,開始重試次數:" + cnt, e);
                        throw e;
                    } else {
                        Thread.sleep(5000);
                    }
                    logger.error("數據保存失敗,開始重試次數:" + cnt, e);
                }
            } catch (Exception e) {
                logger.error("數據庫連接發生異常,未保存數據 " + user.toString(), e);
                break;
            } finally {
            }
        }
    }

    /**
     * 存儲用戶登錄地點信息
     *
     * @param user
     * @return
     */
    @Override
    @Async
    public void saveLoginLocationInfo(T_user user) {
        int cnt = 0;
        while (true) {
            try {
                try {
                    cnt++;
                    commonService.checkSQLReturnCode(userMapper.saveLoginLocationInfo(user));
                    break;
                } catch (Exception e) {
                    if (cnt > 20) {
                        logger.error("數據保存失敗,開始重試次數:" + cnt, e);
                        throw e;
                    } else {
                        Thread.sleep(5000);
                    }
                    logger.error("數據保存失敗,開始重試次數:" + cnt, e);
                }
            } catch (Exception e) {
                logger.error("數據庫連接發生異常,未保存數據 " + user.toString(), e);
                break;
            } finally {
            }
        }
    }

    /***
     * 存儲用戶人臉信息及特征值
     *
     * @param user
     * @param userToken
     * @param currentFaceCode
     * @throws Exception
     */
    @Override
    public void saveUserFaceCode(T_user user, String userAccount, String currentFaceCode) throws Exception {

        int cnt = 0;
        while (true) {
            try {
                try {
                    cnt++;
                    commonService.saveUserFaceCode(user, userAccount, currentFaceCode);
                    break;
                } catch (Exception e) {
                    if (cnt > 20) {
                        logger.error("數據保存失敗,開始重試次數:" + cnt, e);
                        throw e;
                    } else {
                        Thread.sleep(5000);
                    }
                    logger.error("數據保存失敗,開始重試次數:" + cnt, e);
                }
            } catch (Exception e) {
                logger.error("數據庫連接發生異常,未保存數據 " + user.toString(), e);
                break;
            } finally {
            }
        }
    }

    /**
     * 更新用戶信息
     *
     * @param user
     * @return
     */
    @Override
    public void updateUserInfo(T_user user) throws Exception {

        int cnt = 0;
        while (true) {
            try {
                try {
                    cnt++;
                    userMapper.updateUserInfo(user);
                    // 更新用戶信息
                    userCache.getUserInfo(user, true);
                    break;
                } catch (Exception e) {
                    if (cnt > 20) {
                        logger.error("數據保存失敗,開始重試次數:" + cnt, e);
                        throw e;
                    } else {
                        Thread.sleep(5000);
                    }
                    logger.error("數據保存失敗,開始重試次數:" + cnt, e);
                }
            } catch (Exception e) {
                logger.error("數據庫連接發生異常,未保存數據 " + user.toString(), e);
                break;
            } finally {
            }
        }

    }

    /**
     * 異步任務實現類: 用戶退出后刪除其可變用戶令牌與賬號的緩存信息;以及session信息
     */
    @Override
    @Async
    public void execDeleteLogOutUserCacheInfo() {

        while (true) {
            String userTokenJson = null;
            // 加鎖
            // lock.lock();
            try {
                try {
                    userTokenJson = (String) redisService.rightPop(QUEUE_KEY_LOGOUT_USER_TOKEN);
                    if (!waitForData(userTokenJson)) {
                        continue;
                    }
                } catch (Exception e) {
                }

                try {
                    // session
                    String sessionKey = Constants.SPRING_SESSION_NAME_NAMESPCE + userTokenJson;
                    if (redisService.hasKey(sessionKey)) {
                        redisService.delete(sessionKey);
                    }
                } catch (Exception e) {
                    logger.error("redis 連接異常", e);
                    redisService.leftPush(QUEUE_KEY_LOGOUT_USER_TOKEN, userTokenJson);
                    throw e;
                }
            } catch (Exception e) {
                logger.error("刪除已退出的用戶session信息失敗,用戶令牌:" + userTokenJson, e);
            } finally {
                // 解鎖
                // lock.unlock();
            }
        }
    }

    /**
     * 異步任務實現類:接受算法產生的圖片特征碼 by http
     */
    @Override
//    @Async("asyncServiceExecutor")
    public void workReciveResultThread(Socket socket) {
        while (true) {
            try {

                /**
                 * 在從Socket的InputStream中接收數據時,像上面那樣一點點的讀就太復雜了, 有時候我們就會換成使用BufferedReader來一次讀一行
                 *
                 * BufferedReader的readLine方法是一次讀一行的,這個方法是阻塞的,直到它讀到了一行數據為止程序才會繼續往下執行,
                 * 那么readLine什么時候才會讀到一行呢?直到程序遇到了換行符或者是對應流的結束符readLine方法才會認為讀到了一行,
                 * 才會結束其阻塞,讓程序繼續往下執行。
                 * 所以我們在使用BufferedReader的readLine讀取數據的時候一定要記得在對應的輸出流里面一定要寫入換行符(
                 * 流結束之后會自動標記為結束,readLine可以識別),寫入換行符之后一定記得如果輸出流不是馬上關閉的情況下記得flush一下,
                 * 這樣數據才會真正的從緩沖區里面寫入。
                 */
                BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"));
                StringBuilder sb = new StringBuilder();
                String temp = "";
                while ((temp = br.readLine()) != null) {
                    sb.append(temp.substring(1, temp.length() - 1).replaceAll("'", ""));
                    break;
                }

                String[] arr = sb.toString().split(Constants.COLON_SIGN);

                redisService.setForTimeMS(Constants.CACHE_KEY_RESULT + arr[0].trim(), arr[1].trim(), Constants.TIME_KEY_RESULT);

                logger.info("數據接受 --- Form Cliect[port:" + socket.getPort() + "] 消息內容:" + sb.toString());

            } catch (Exception e) {
                logger.error("work Recive Result Thread abnormal!", e);
            }
        }
    }

    /**
     * 異步任務實現類:向算法發送任務 by socket
     */
    @Deprecated
    @Override
//    @Async("asyncServiceExecutor")
    public void workSendTaskThread_skt(Socket socket) {

        String faceJson = "";
        while (true) {
            try {
                // 獲取任務
                faceJson = (String) redisService.rightPop(Constants.QUEUE_TASK);
                if (StringUtils.isBlank(faceJson)) {
                    continue;
                }

//                UserFaceBean userFaceBean = JsonUtil.json2Obj(faceJson, UserFaceBean.class);

                DataOutputStream outputStream = null;
                outputStream = new DataOutputStream(socket.getOutputStream());
                byte[] jsonByte = faceJson.getBytes();

                logger.info("發送的數據長度為:" + jsonByte.length);
                Map<String, Integer> sizeMap = new HashMap<String, Integer>();
                int fileSize = jsonByte.length;
                sizeMap.put("size", fileSize);

                // 告訴服務器要發送文件的大小
                outputStream.write(JsonUtil.getJsonString(sizeMap).getBytes());

                int partCount = fileSize / PART_SIZE;
                int rest = fileSize % PART_SIZE;

                // 每次發送35K大小的數據
                for (int index = 0; index < partCount; index++) {
                    int beginIndex = index * PART_SIZE;
                    int endIndex = (index + 1) * PART_SIZE;
                    String temp = faceJson.substring(beginIndex, endIndex);
                    outputStream.write(temp.getBytes());
                }
                // 發送剩余的數據
                if (rest != 0) {
                    int beginIndex = partCount * PART_SIZE;
                    int endIndex = partCount * PART_SIZE + rest;
                    String temp = faceJson.substring(beginIndex, endIndex);
                    outputStream.write(temp.getBytes());
                }

                outputStream.flush();
                Thread.sleep(10);
                logger.info("傳輸數據完畢");
            } catch (Exception e) {
                redisService.leftPush(Constants.QUEUE_TASK, faceJson);
                logger.error("work Send Task Thread error!", e);
            }
        }
    }

    /**
     * 異步任務實現類:接受算法產生的圖片特征碼 by socket
     */
    @Deprecated
    @Override
//    @Async("asyncServiceExecutor")
    public void workReciveResultThread_skt(Socket socket) {
        while (true) {
            try {

                /**
                 * 在從Socket的InputStream中接收數據時,像上面那樣一點點的讀就太復雜了, 有時候我們就會換成使用BufferedReader來一次讀一行
                 *
                 * BufferedReader的readLine方法是一次讀一行的,這個方法是阻塞的,直到它讀到了一行數據為止程序才會繼續往下執行,
                 * 那么readLine什么時候才會讀到一行呢?直到程序遇到了換行符或者是對應流的結束符readLine方法才會認為讀到了一行,
                 * 才會結束其阻塞,讓程序繼續往下執行。
                 * 所以我們在使用BufferedReader的readLine讀取數據的時候一定要記得在對應的輸出流里面一定要寫入換行符(
                 * 流結束之后會自動標記為結束,readLine可以識別),寫入換行符之后一定記得如果輸出流不是馬上關閉的情況下記得flush一下,
                 * 這樣數據才會真正的從緩沖區里面寫入。
                 */
                BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"));
                StringBuilder sb = new StringBuilder();
                String temp = "";
                while ((temp = br.readLine()) != null) {
                    sb.append(temp.substring(1, temp.length() - 1).replaceAll("'", ""));
                    break;
                }

                String[] arr = sb.toString().split(Constants.COLON_SIGN);

                redisService.setForTimeMS(Constants.CACHE_KEY_RESULT + arr[0].trim(), arr[1].trim(), Constants.TIME_KEY_RESULT);

                logger.info("數據接受 --- Form Cliect[port:" + socket.getPort() + "] 消息內容:" + sb.toString());

            } catch (Exception e) {
                logger.error("work Recive Result Thread abnormal!", e);
            }
        }
    }

    /**
     * 異步任務實現類: 將硬件狀態數據存入數據庫
     */
    @Override
//    @Async("asyncServiceExecutor")
    public void execHardwareStatusDataToDBAsync() {
        while (true) {
            String json = null;
            // 加鎖
            // lock.lock();
            try {
                try {
//                    json = (String) redisService.rightPop(REDIS_HARDWARE_RUNNING_DATA);
                    if (!waitForData(json)) {
                        continue;
                    }
                } catch (Exception e) {
                }

                try {
//                    TODO
//                    hdwareStatusService.gatherHardwareStatusInfo(json);
                } catch (Exception e) {
                    if (!(e instanceof AppException)) {
                        LogUtil.error("", e);
//                        redisService.leftPush(REDIS_HARDWARE_RUNNING_DATA, json);
                    }
                    LogUtil.error("hardware status data insert db abnormal:" + json, e);
                }
            } catch (Exception e) {
                LogUtil.error(json, e);
            } finally {
                // 解鎖
                // lock.unlock();
            }
        }
    }

    /**
     * 異步任務實現類:更新用戶人臉特征庫
     */
    @Override
//    @Async("asyncServiceExecutor")
    public void updateUserFaceCodeListThread() {

        while (true) {
            String userFaceJson = null;
            // 加鎖
            // lock.lock();
            try {
                try {
                    userFaceJson = (String) redisService.rightPop(Constants.CACHE_KEY_UPDATE_FACE_CODE);
                    if (!waitForData(userFaceJson)) {
                        continue;
                    }
                } catch (Exception e) {
                    logger.error("redis exception", e);
                }

                try {
//                    TODO
//                    hdwareStatusService.gatherHardwareStatusInfo(json);
                    updateUserFaceCode(userFaceJson);
                } catch (Exception e) {
                    if (!(e instanceof AppException)) {
                        logger.error("", e);
                        redisService.leftPush(Constants.CACHE_KEY_UPDATE_FACE_CODE, userFaceJson);
                    }
                    LogUtil.error("hardware status data insert db abnormal:" + userFaceJson, e);
                }
            } catch (Exception e) {
                logger.error("system exception", e);
            } finally {
                // 解鎖
                // lock.unlock();
            }
        }

    }

    /**
     * 消息推送
     *
     * @param platform
     * @param pushKey
     * @param content
     * @throws Exception
     * @return
     */
    @Override
//     @Async("asyncServiceExecutor")
    @Async
    public void pushMsg(String platform, String pushKey, String content) throws Exception {
        // 消息推送
        pushService.pushMsg(platform, pushKey, content);
    }

    /**
     * 將月度登錄失敗次數加一
     */
    @Override
//    @Async("asyncServiceExecutor")
    @Async
    public void addOneMonthFailedNum(String userAccount) throws Exception {
        // 更新月度登錄失敗次數
        commonService.checkSQLReturnCode(userMapper.addOneMonthFailedNum(userAccount));

        // 更新用戶月度登錄失敗次數
        T_user param = new T_user();
        param.setUserAccount(userAccount);
        userCache.getUserInfo(param, true);
    }

    /**
     * 更新用戶特征碼
     *
     * @param userFaceJson
     * @throws Exception
     */
    private void updateUserFaceCode(String userFaceJson) throws Exception {
        UserFaceBean faceBean = JsonUtil.json2Obj(userFaceJson, UserFaceBean.class);
        commonService.saveUserFaceCode(null, faceBean.getUserToken(), faceBean.getFeatureCodes());
    }

    /**
     * 數據為空時, 返回false;否則,返回true
     *
     * @param json
     * @return
     */
    private boolean waitForData(String json) {
        if (json == null) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                LogUtil.error(json, e);
            }
            return false;
        } else {
            return true;
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM