多線程(10) — Future模式


  Future模式是多線程開發中常用常見的一種設計模式,它的核心思想是異步調用。在調用一個函數方法時候,如果函數執行很慢,我們就要進行等待,但這時我們可能不着急要結果,因此我們可以讓被調者立即返回,讓它在后台慢慢處理這個請求,對於調用者來說可以先處理一些其他事物,在真正需要數據的場合再去嘗試獲得需要的數據。對於Future模式來說,雖然它無法立即給出你需要的數據,但是它們返回一個契約給你,將來你可以憑借這個契約去重新獲取你需要的信息。主要的角色有:

  • Main:系統啟動,調用Client發出請求。
  • Client:返回Data對象,立即返回FutureData,並開啟ClientThread線程裝配RealData
  • Data:返回數據的接口。
  • FutureData:Future數據構造很快,但是是一個虛擬的數據,需要裝配RealData
  • RealData:真實數據,其構造是比較慢的

  Future模式簡單實現:一個核心接口Data,就是客戶端希望獲取的數據,在Future模式中,這個Data接口有兩個重要的實現,一個是RealData,是真實數據,就是最終要獲得的信息。另外一個是FutureData,它是用來提取RealData的一個訂單,因此FutureData可以立即返回。

public interface Data {
    public String getResult();
}

  FutureData實現了一個快速返回的RealData包裝,只是一個包裝,或者說是虛擬實現,它可以很快構造並返回。當使用FutureData的getResult()方法,如果實際數據沒准備好程序會阻塞,等RealData准備好並注入FutureData才最終返回數據。

注意:FutureData是Future模式的關鍵,實際上是真實數據RealData的代理,封裝了獲取RealData的等待過程。

public class FutureData implements Data {

    protected RealData realdata = null;
    protected boolean isReady = false;
    
    public synchronized void setRealData(RealData realdata){
        if(isReady){
            return;
        }
        this.realdata = realdata;
        isReady = true;
        notifyAll();
    }
    
    @Override
    public synchronized String getResult() {
        while(!isReady){
            try {
                wait();
            } catch (InterruptedException e) {}
        }
        return realdata.result;
    }
}

  RealData是最終需要使用的數據模型,它的構造很慢,用sleep()函數模擬這個過程,簡單地模擬一個字符串的構造。

public class RealData implements Data {

    protected final String result;
    public RealData(String para){
        // RealData的構造可能很慢,需要用戶等待很久,這里用sleep模擬
        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < 10; i++) {
            sb.append(para);
            try {
                // 這里使用sleep代替一個很慢的操作過程
                Thread.sleep(100);
            } catch (InterruptedException e) {}
        }
        result = sb.toString();
    }
    @Override
    public String getResult() {
        return result;
    }
}

  接下來就是客戶端程序,Client主要實現了獲取FutureData,並開啟構造RealData的線程,並在接受請求后,很快返回FutureData。注意,它不會等待數據真的構造完畢再返回,而是立即返回FutureData,即使這個時候FutureData內沒有真實數據。

public class Client {
    public Data request(final String queryStr){
        final FutureData future = new FutureData();
        new Thread(){

            @Override
            public void run() {
                RealData realdata = new RealData(queryStr);
                future.setRealData(realdata);
            }
            
        }.start();
        return future;
    }
}
public static void main(String[] args) {
    Client client = new Client();
    // 這里立即返回,得到的是Future而不是RealData
    Data data = client.request("Hello");
    System.out.println("請求完成");
    try {
        Thread.sleep(2000);// 這個過程代替RealData被創建的過程,也就是自己的業務處理過程
    } catch (InterruptedException e) {}
    // 使用真實的數據
    System.out.println("數據 = "+data.getResult());
}

 

JDK中的Future模式

  JDK中,Future接口類似於模式中的契約,通過它,可以得到真實的數據。RunnableFuture繼承了Future和Runnable倆接口,其中run()方法用於構造真實的數據,它有一個具體的實現FutureTask類,這個實現類有個內部類Sync,一些實質性的工作會委托Sync實現,而Sync類最終會調用Callable接口,完成實際數據的組裝工作。Callable接口只有一個方法call(),它會返回需要構造的實際數據。這個Callable接口也是Future框架和應用程序之間的重要接口。要實現自己的業務系統,通常需要實現自己的Callable對象,此外,FutureTask類也是與應用密切相關,通常可以使用Callable實例構造一個FutureTask實例,並將它交給線程池。下面來舉個使用的例子:

public class RealData implements Callable<String>{
    private String para;
    public RealDatajdk(String para){
        this.para = para;
    }
    
    @Override
    public String call() throws Exception {
        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < 10; i++) {
            sb.append(para);
            Thread.sleep(100);
        }
        return sb.toString();
    }
}

  上述代碼實現了Callable接口,它的call()方法會構造我們需要的真實數據並返回,當然這個過程可能是緩慢的,這里使用Thread.sleep()方法模擬它。

public class FutureMain {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        FutureTask<String> future = new FutureTask<String>(new RealData("a"));// 構造FutureTask
        ExecutorService executor = Executors.newFixedThreadPool(1);
        // 執行FutureTask,相當於上例中client.request("a")發送請求
        // 在這里開啟線程進行RealData的call()方法執行
        executor.submit(future);
        System.out.println("請求完畢");
        try {
            // 這里可以做額外的數據操作,使用sleep代替其他業務邏輯處理
            Thread.sleep(2000);
        } catch (InterruptedException e) {}
        // 取得call()方法的返回值,如果call()方法還沒有執行完,就會等待
        System.out.println("數據 = "+future.get());
    }
}

除基本的功能外JDK還為Future接口提供了一些簡單的控制方法

boolean cancel(boolean mayInterruptIfRunning);           // 取消任務
boolean isCancelled();                                   // 是否已經取消任務
boolean isDone();                                        // 是否已完成
V get() throws InterruptedException, ExecutionException; // 取得返回對象
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;// 取得返回對象,可以設置超時時間

Guava對Future模式的支持

  JDK自帶的簡單Future模式,可以使用get()方法得到處理結果,但是這個方法是阻塞的,因此不利於高並發應用。在Guava中增強了Future模式,增加了對模式完成時的回調接口,使得Future完成時可以自動通知應用程序進行后續處理。使用Guava改寫上一節中的FutureMain可以得到更好的效果。

public class FutureDemo {
    public static void main(String[] args) throws InterruptedException{
        ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
        ListenableFuture<String> task = service.submit(new RealData("x"));
        task.addListener(new Runnable(){
            @Override
            public void run() {
                System.out.print("異步處理成功:");
                try {
                    System.out.println(task.get());
                } catch (Exception e) {}
            }
        },MoreExecutors.directExecutor());
        System.out.println("main task done......");
        Thread.sleep(3000);
    }
}

  MoreExecutors.listeningDecorator()方法將一個普通的線程池包裝為一個包含通知功能的Future線程池。第5行將Callable任務提交到線程池中,並得到一個ListenableFuture。與Future相比,ListenableFuture擁有完成時的通知功能,addListener向ListenableFuture中添加回調函數,即當Future執行完成后,執行addListener中第一個參數中代碼

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM