JAVA多線程實戰


背景

由於項目上要和其他系統交互,而該系統采用同步接口,我們采用單線程調用,接收一條數據平均需要4~6s。而我們需要匯總近三個月的訂單信息,大約一次有幾千條數據,所以進行一次交互大概需要幾百分鍾。經過溝通后,對方系統不願意修改他們的程序,所以我們這邊就要改成多線程多次同時調用接口,來達到降低處理時間的效果。

代碼

話不多說,先上代碼

public ReturnT execute(Map<String, String> map, SchedulerTool tool) {
        tool.info("<==== MakeOrderOutJob is starting: " + map);

        //獲取最大線程數
        int threadNum = Integer.parseInt(map.get("threadNum"));

        //獲取線程結束超時時間
        int timeOut = Integer.parseInt(map.get("timeOut"));

        //獲取傳輸最大條數
        int max = Integer.parseInt(map.get("max"));

        //獲取待發送的生產訂單數據
        List<MakeOrder> makeOrderList = makeOrderRepository.getMakeOrder(max);
        tool.info("<==== MakeOrderOutJob makeOrderList size: " + makeOrderList.size());

        //返回消息集合
        List<MakeOrderReturnVO> makeOrderReturnVoList = new ArrayList<>();

        try {
            //手動創建線程池
            ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build();
            ExecutorService executor = new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), namedThreadFactory);
            //循環所有待傳出數據,調用接口
            makeOrderList.stream().forEach(one -> {
                executor.execute(new ThreadPoolMakeOrderOutTask(tool, user, password, url, one, moRequirementRepository.getMoRequirementList(one.getKid()), makeOrderReturnVoList, restUtil, makeOrderRepository));
            });
            executor.shutdown();
            while (!executor.awaitTermination(timeOut, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }

            tool.info("<==== MakeOrderOutJob makeOrderReturnVoList size: " + makeOrderReturnVoList.size());
            //循環返回結果更新接口表
            makeOrderReturnVoList.stream().forEach((vo) -> {
                tool.info("<==== MakeOrderOutJob update status: " + vo.getMessageV1());
                //更新接口表狀態
                vo.setStatus(vo.getType());
                tool.info("<==== MakeOrderOutJob update status success: " + vo.getMessageV1());
            });
        } catch (Exception e) {
            e.printStackTrace();
            tool.error("<==== MakeOrderOutJob is error: " + e.getMessage());
            return ReturnT.FAILURE;
        }
        return ReturnT.SUCCESS;
    }

總結

這里用到了線程池ExecutorService

1.線程的創建

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

線程池一共有五種

  • newCachedThreadPool:用來創建一個可以無限擴大的線程池
  • newFixedThreadPool:創建一個固定大小的線程池
  • newSingleThreadExecutor:創建一個單線程的線程池
  • newScheduledThreadPool:可以延時啟動,定時啟動的線程池
  • newWorkStealingPool:創建一個擁有多個任務隊列的線程池

經BD后發現所有的線程池最終都是通過ThreadPoolExecutor創建的

其中各個參數說明:

  • corePoolSize : 核心線程數,一旦創建將不會再釋放。如果創建的線程數還沒有達到指定的核心線程數量,將會繼續創建新的核心線程,直到達到最大核心線程數后,核心線程數將不在增加;如果沒有空閑的核心線程,同時又未達到最大線程數,則將繼續創建非核心線程;如果核心線程數等於最大線程數,則當核心線程都處於激活狀態時,任務將被掛起,等待空閑線程來執行。
  • maximumPoolSize : 最大線程數,允許創建的最大線程數量。如果最大線程數等於核心線程數,則無法創建非核心線程;如果非核心線程處於空閑時,超過設置的空閑時間,則將被回收,釋放占用的資源。
  • keepAliveTime : 當線程空閑時,所允許保存的最大時間,超過這個時間,線程將被釋放銷毀,但只針對於非核心線程。
  • unit : 時間單位,TimeUnit.SECONDS等。
  • workQueue : 任務隊列,存儲暫時無法執行的任務,等待空閑線程來執行任務。
  • threadFactory :  線程工程,用於創建線程。本例用到new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build();來為線程創建名稱。
  • handler : 當線程邊界和隊列容量已經達到最大時,用於處理阻塞時的程序

 

2.創建好了線程后就要執行任務,實現多線程有三種方法

  • 繼承Thread類,重寫run方法
  • 實現Runnable接口,重寫run方法
  • 實現Callable接口,重寫call方法(有返回值)

本例中ThreadPoolMakeOrderOutTask用到了第2種方法

 

3.任務執行完畢,要結束線程池任務

本例調用shutdown() 方法在終止前允許執行以前提交的任務,shutdown()方法的作用是:停止接收新任務,原來的任務繼續執行

然后調用awaitTermination(long timeOut, TimeUnit unit)方法,使當前線程阻塞,直到:

  • 等所有已提交的任務(包括正在跑的和隊列中等待的)執行完
  • 或者 等超時時間到了(timeout 和 TimeUnit設定的時間)
  • 或者 線程被中斷,拋出InterruptedException

然后會監測 ExecutorService 是否已經關閉,返回true(shutdown請求后所有任務執行完畢)或false(已超時)

 最后調用shutdownNow()方法,停止接收新任務,原來的任務停止執行

 

結果

經測試,啟用多線程后,發現交互時間確實有明顯提升。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM