Callable+ThreadPoolExecutor實現多線程並發並獲得返回值(轉)


出處:https://blog.csdn.net/kity9420/article/details/80740466

 

前言
  經常會遇到一些性能問題,比如調用某個接口,可能要循環調用100次,並且需要拿到每一次調用的返回結果,通常我們都是放在for循環中一次次的串行調用,這種方式可想而知道有多慢,那怎么解決這個問題呢?

 

多線程
  為了解決以上問題,我使用的方式是多線程。多線程常規的有兩種實現方式,即繼承Tread類,實現Runnable接口,但是這兩種實現方式,有一個共同的問題,就是沒有返回值,對於我們來說,獲得每個線程的返回值,是個很困難的問題,因此不能用Tread類或Runnable接口,我用的是Callable和ThreadPoolExecutor,Callable的process方法可以允許有返回值,ThreadPoolExecutor的invokeAll或submit方法可以拿到線程的執行結果

 

案例
  假設需要給100個用戶發送郵件,並需要每個用戶的返回結果,先看下代碼結構

               

  CallableTemplate.java 

package com.gdut.thread.multiThread;

import java.util.concurrent.Callable;

/**
 * 多線程模板類
 * @author yang.han
 *
 * @param <V>
 */
public abstract class CallableTemplate<V> implements Callable<V>{
    
    /**
     * 前置處理,子類可以Override該方法
     */
    public void beforeProcess() {
        System.out.println("before process");
    }
    
    /**
     * 處理業務邏輯的方法,需要子類去Override
     * @param <V>
     * @return
     */
    public abstract V process();
    
    /**
     * 后置處理,子類可以Override該方法
     */
    public void afterProcess() {
        System.out.println("after process");
    }

    @Override
    public V call() throws Exception {
        beforeProcess();
        V result = process();
        afterProcess();
        return result;
    }

}

  CallableTemplate類實現了Callable接口,並實現了process方法,該類是一個抽象類,接收任意返回值的類型,beforeProcess方法為前置處理,afterProcess的后置處理,process為具體的業務邏輯抽象方法,該方法在子類中實現

 

IConcurrentThreadPool.java

package com.gdut.thread.multiThread;

import java.util.List;
import java.util.concurrent.ExecutionException;

public interface IConcurrentThreadPool {

    /**
     * 初始化線程池
     */
    void initConcurrentThreadPool();
    
    /**
     * 提交單個任務
     * @param <V>
     * @param task
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    <V> V submit(CallableTemplate<V> task) throws InterruptedException, ExecutionException;
    
    /**
     * 提交多個任務
     * @param <V>
     * @param tasks
     * @return
     * @throws InterruptedException 
     * @throws ExecutionException 
     */
    <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks) throws InterruptedException, ExecutionException;
}

  IConcurrentThreadPool是多線程接口類,聲名了三個方法,initConcurrentThreadPool:初始化線程池,submit:提交單個任務的線程,並有返回值,invokeAll:提交多個任務的線程,並有返回值

 

ConcurrentThreadPool.java

package com.gdut.thread.multiThread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConcurrentThreadPool implements IConcurrentThreadPool{
    
    private ThreadPoolExecutor threadPoolExecutor;
    // 核心線程數
    private int corePoolSize = 10;
    // 最大線程數
    private int maximumPoolSize = 20;
    // 超時時間30秒
    private long keepAliveTime = 30;

    @Override
    public void initConcurrentThreadPool() {
        threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                                                    maximumPoolSize, 
                                                    keepAliveTime, 
                                                    TimeUnit.SECONDS,
                                                    new LinkedBlockingDeque<Runnable>()
                                                    );
    }

    @Override
    public <V> V submit(CallableTemplate<V> task) throws InterruptedException, ExecutionException {
        Future<V> result = threadPoolExecutor.submit(task);
        return result.get();
    }

    @Override
    public <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks) throws InterruptedException, ExecutionException {
        List<Future<V>> tasksResult = threadPoolExecutor.invokeAll(tasks);
        List<V> resultList = new ArrayList<V>();
        
        for(Future<V> future : tasksResult) {
            resultList.add(future.get());
        }
        return resultList;
    }

}

  ConcurrentThreadPool是創建線程池的實現類,用到了ThreadPoolExecutor線程池類及這個類的invokeAll方法和submit方法,這兩個方法的返回值,都可以通過Future類的get方法獲得

 

ICallableTaskFrameWork.java

package com.gdut.thread.multiThread;

import java.util.List;
import java.util.concurrent.ExecutionException;

public interface ICallableTaskFrameWork {
    <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks)
            throws InterruptedException, ExecutionException;
}

  ICallableTaskFrameWork是定義的線程任務框架接口,所有的多線程調用,都通過該接口發起

 

CallableTaskFrameWork.java

package com.gdut.thread.multiThread;

import java.util.List;
import java.util.concurrent.ExecutionException;

public class CallableTaskFrameWork implements ICallableTaskFrameWork{

    private IConcurrentThreadPool concurrentThreadPool = new ConcurrentThreadPool();
    
    @Override
    public <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks)
            throws InterruptedException, ExecutionException {
        concurrentThreadPool.initConcurrentThreadPool();
        return concurrentThreadPool.invokeAll(tasks);
    }

}

  CallableTaskFrameWork是ICallableTaskFrameWork 的實現類,在submitsAll實現方法中,通過調用線程池對象IConcurrentThreadPool接口的invokeAll方法來發起多線程的調用,這里注意一個,在submitAll實現方法中,我手動的調用了初始化線程池的方法concurrentThreadPool.initConcurrentThreadPool(),在真實的項目上,應該在應用啟動的時候就調用該方法來初始化線程池

測試類代碼 
SendMessageService.java,假設這是一個發送郵件信息的服務類

package com.gdut.thread.multiThread;

public class SendMessageService {
    public void sendMessage(String email,String content){
        System.out.println("發送郵件。。。");
    }
}

 

SendMessageHander.java,多線程發送郵件的處理類

package com.gdut.thread.multiThread;

import java.util.HashMap;
import java.util.Map;

public class SendMessageHander extends CallableTemplate<Map<String, String>>{
    
    private String email;
    private String content;
    public SendMessageHander(String email,String content) {
        this.email = email;
        this.content = content;
    }

    @Override
    public Map<String, String> process() {
        SendMessageService sendMessageService = new SendMessageService();
        sendMessageService.sendMessage(email, content);
        Map<String, String> map = new HashMap<String, String>();
        map.put(email, content);
        return map;
    }

}

  這個類繼承了上面的CallableTemplate,我們要的返回值是Map,因此泛型類型是Map,在類中還重寫了process方法,在方法中調用發送郵件的業務邏輯接口SendMessageService.sendMessage,並將返回結果組裝成Map返回,這里我就簡單處理了,將郵件地址及內容放在Map中直接返回了;另外還要注意這個類有個有參構造器,通過構建器可以接收需要傳遞進來的參數

 

SendMessageTest.java,測試類

package com.gdut.thread.multiThread;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;

public class SendMessageTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ICallableTaskFrameWork callableTaskFrameWork = new CallableTaskFrameWork();
        
        List<CallableTemplate<Map<String, String>>> tasks = new ArrayList<CallableTemplate<Map<String, String>>>();
        
        SendMessageHander sendMessageHander = null;
        
        // 將需要發送郵件的郵件地址及內容組裝好,放在一個集合中
        for (int i = 0; i < 1000; i++) {
            sendMessageHander = new SendMessageHander("email" + i, "content" + i);
            tasks.add(sendMessageHander);
        }
        
        //通過多線程一次性發起郵件,並拿到返回結果集
        List<Map<String, String>> results = callableTaskFrameWork.submitsAll(tasks);
        
        // 解析返回結果集
        for (Map<String, String> map : results) {
            for (Entry<String, String> entry : map.entrySet()) {
                System.out.println(entry.getKey() + "\t" + entry.getValue());
            }
        }
    }

}

運行結果 

             

 

 

附錄:還可以看這邊文章: java並發異步編程 原來十個接口的活現在只需要一個接口就搞定!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM