public interface CallBack {
/*
為什么要寫這個回調接口呢?
*因為可能不止主調A需要用到被調的處理過程,如果很多地方需要用到被調程序
* 那么傳入被調的方法就不可能只傳主調A類,所以要定義一個接口,
* 傳入被調的處理方法的參數就是這個接口對象
* */
public void solve(String result);
}
public class CallbackRequest implements Callback{
private CallbackResponse callbackResponse;
public CallbackRequest(CallbackResponse callbackResponse) {
this.callbackResponse = callbackResponse;
}
//主調需要解決一個問題,所以他把問題交給被調處理,被調單獨創建一個線程,不影響主調程序的運行
public void request(final String question){
System.out.println("主調程序問了一個問題");
new Thread(()->{
//B想要幫A處理東西,就必須知道誰讓自己處理的,所以要傳入a,也要知道a想處理什么,所以要傳入question
callbackResponse.handler(this, question);
}).start();
//A把要處理的事情交給b之后,就可以自己去玩耍了,或者去處理其他事情
afterAsk();
}
private void afterAsk(){
System.out.println("主調程序繼續處理其他事情");
}
@Override
public void solve(String result) {
System.out.println("被調程序接到答案后進行處理" + result);
}
}
public class CallbackResponse {
public void handler(Callback callback, String request) {
System.out.println(callback.getClass()+"問的問題是:"+ request);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String result="\n答案是2";
callback.solve(result);
}
}
public class CallbackTest {
public static void main(String[] args) {
CallbackResponse callbackResponse = new CallbackResponse();
CallbackRequest callbackRequest = new CallbackRequest(callbackResponse);
callbackRequest.request("1+1");
}
}
輸出:
主調程序問了一個問題
主調程序繼續處理其他事情
class javapratice.CallbackRequest問的問題是:1+1
被調程序接到答案后進行處理
答案是2
3、異步回調
//多線程中的“回調”
public class CallBackMultiThread {
//這里簡單地使用future和callable實現了線程執行完后
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("call");
TimeUnit.SECONDS.sleep(1);
return "str";
}
});
//手動阻塞調用get通過call方法獲得返回值。
System.out.println(future.get());
//需要手動關閉,不然線程池的線程會繼續執行。
executor.shutdown();
//使用futuretask同時作為線程執行單元和數據請求單元。
FutureTask<Integer> futureTask = new FutureTask(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("dasds");
return new Random().nextInt();
}
});
new Thread(futureTask).start();
//阻塞獲取返回值
System.out.println(futureTask.get());
}
}
注:比起future.get(),其實更推薦使用get (long timeout, TimeUnit unit)方法,設置了超時時間可以防止程序無限制的等待future的結果。
-
不支持手動完成:這個意思指的是,我提交了一個任務,但是執行太慢了,我通過其他路徑已經獲取到了任務結果,現在沒法把這個任務結果,通知到正在執行的線程,所以必須主動取消或者一直等待它執行完成。
-
不支持進一步的非阻塞調用:這個指的是我們通過Future的get方法會一直阻塞到任務完成,但是我還想在獲取任務之后,執行額外的任務,因為Future不支持回調函數,所以無法實現這個功能。
-
不支持鏈式調用:這個指的是對於Future的執行結果,我們想繼續傳到下一個Future處理使用,從而形成一個鏈式的pipline調用,這在Future中是沒法實現的。
-
不支持多個Future合並:比如我們有10個Future並行執行,我們想在所有的Future運行完畢之后,執行某些函數,是沒法通過Future實現的。
-
不支持異常處理:Future的API沒有任何的異常處理的api,所以在異步運行時,如果出了問題是不好定位的。
Future vs CompletableFuture
Futrue在Java里面,通常用來表示一個異步任務的引用,比如我們將任務提交到線程池里面,然后我們會得到一個Futrue,在Future里面有isDone方法來 判斷任務是否處理結束,還有get方法可以一直阻塞直到任務結束然后獲取結果,但整體來說這種方式,還是同步的,因為需要客戶端不斷阻塞等待或者不斷輪詢才能知道任務是否完成。
public class TestCompletableFuture {
public static void main(String[] args) throws Exception{
CompletableFuture<String> completableFuture=new CompletableFuture<String>();
Runnable runnable=new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+" 執行.....");
completableFuture.complete("success");//在子線程中完成主線程completableFuture的完成
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Thread t1=new Thread(runnable);
t1.start();//啟動子線程
String result=completableFuture.get();//主線程阻塞,等待完成
System.out.println(Thread.currentThread().getName()+" 1x: "+result);
}
}
輸出結果:
Thread-0 執行.....
main 1x: success
2、運行一個簡單的沒有返回值的異步任務
public class TestCompletableFuture {
public static void main(String[] args) throws Exception{
CompletableFuture<Void> future=CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"正在執行一個沒有返回值的異步任務。");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
future.get();
System.out.println(Thread.currentThread().getName()+" 結束。");
}
}
輸出如下:
ForkJoinPool.commonPool-worker-1正在執行一個沒有返回值的異步任務。
main 結束。
從上面代碼我們可以看到CompletableFuture默認運行使用的是ForkJoin的的線程池。當然,你也可以用lambda表達式使得代碼更精簡。
public class TestCompletableFuture {
public static void main(String[] args) throws Exception{
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>(){
@Override
public String get() {
try {
System.out.println(Thread.currentThread().getName()+"正在執行一個有返回值的異步任務。");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "OK";
}
});
String result=future.get();
System.out.println(Thread.currentThread().getName()+" 結果:"+result);
}
}
輸出結果:
ForkJoinPool.commonPool-worker-1正在執行一個有返回值的異步任務。
main 結果:OK
當然,上面默認的都是ForkJoinPool我們也可以換成Executor相關的Pool,其api都有支持如下:
static CompletableFuture<Void> runAsync(Runnable runnable) static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public class TestCompletableFuture {
public static void asyncCallback() throws ExecutionException, InterruptedException {
CompletableFuture<String> task=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println("線程" + Thread.currentThread().getName() + " supplyAsync");
return "123";
}
});
CompletableFuture<Integer> result1 = task.thenApply(number->{
System.out.println("線程" + Thread.currentThread().getName() + " thenApply1 ");
return Integer.parseInt(number);
});
CompletableFuture<Integer> result2 = result1.thenApply(number->{
System.out.println("線程" + Thread.currentThread().getName() + " thenApply2 ");
return number*2;
});
System.out.println("線程" + Thread.currentThread().getName()+" => "+result2.get());
}
public static void main(String[] args) throws Exception{
asyncCallback();
}
}
輸出結果:
線程ForkJoinPool.commonPool-worker-1 supplyAsync
線程main thenApply1
線程main thenApply2
線程main => 246
2、thenAccept()
public class TestCompletableFuture {
public static void asyncCallback() throws ExecutionException, InterruptedException {
CompletableFuture<String> task=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println(Thread.currentThread().getName()+" supplyAsync");
return "123";
}
});
CompletableFuture<Integer> chain1 = task.thenApply(number->{
System.out.println(Thread.currentThread().getName()+" thenApply1");
return Integer.parseInt(number);
});
CompletableFuture<Integer> chain2 = chain1.thenApply(number->{
System.out.println(Thread.currentThread().getName()+" thenApply2");
return number*2;
});
CompletableFuture<Void> result=chain2.thenAccept(product->{
System.out.println(Thread.currentThread().getName()+" thenAccept="+product);
});
result.get();
System.out.println(Thread.currentThread().getName()+" end");
}
public static void main(String[] args) throws Exception {
asyncCallback();
}
}
結果如下:
ForkJoinPool.commonPool-worker-1 supplyAsync
main thenApply1
main thenApply2
main thenAccept=246
main end
public class TestCompletableFuture {
public static void asyncCallback() throws ExecutionException, InterruptedException {
CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"supplyAsync: 一階段任務");
return null;
}).thenRun(()->{
System.out.println(Thread.currentThread().getName()+"thenRun: 收尾任務");
}).get();
}
public static void main(String[] args) throws Exception {
asyncCallback();
}
}
結果:
ForkJoinPool.commonPool-worker-1supplyAsync: 一階段任務
mainthenRun: 收尾任務
這里注意,截止到目前,前面的例子代碼只會涉及兩個線程,一個是主線程一個是ForkJoinPool池的線程,但其實上面的每一步都是支持異步運行的,其api如下:
// thenApply() variants <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
我們看下改造后的一個例子:
public class TestCompletableFuture {
public static void asyncCallback() throws ExecutionException, InterruptedException {
CompletableFuture<String> ref1= CompletableFuture.supplyAsync(()->{
try {
System.out.println(Thread.currentThread().getName() + " supplyAsync開始執行任務1.... ");
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " supplyAsync: 任務1");
return null;
});
CompletableFuture<String> ref2= CompletableFuture.supplyAsync(()->{
try {
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " thenApplyAsync: 任務2");
return null;
});
CompletableFuture<String> ref3=ref2.thenApplyAsync(value->{
System.out.println(Thread.currentThread().getName() +" thenApplyAsync: 任務2的子任務");
return " finish";
});
Thread.sleep(4000);
System.out.println(Thread.currentThread().getName() + ref3.get());
}
public static void main(String[] args) throws Exception {
asyncCallback();
}
}
輸出結果如下:
ForkJoinPool.commonPool-worker-1 supplyAsync開始執行任務1....
ForkJoinPool.commonPool-worker-2 thenApplyAsync: 任務2
ForkJoinPool.commonPool-worker-2 thenApplyAsync: 任務2的子任務
ForkJoinPool.commonPool-worker-1 supplyAsync: 任務1
main finish
我們可以看到,ForkJoin池的線程1,執行了前面的三個任務,但是第二個任務的子任務,因為我們了使用也異步提交所以它用的線程是ForkJoin池的線程2,最終由於main線程處執行了get是最后結束的。
ExecutorService pool = Executors.newFixedThreadPool(5);
final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
... }, pool);
4、thenCompose():合並兩個有依賴關系的CompletableFutures的執行結果
public class TestCompletableFuture {
public static void asyncCompose() throws ExecutionException, InterruptedException {
CompletableFuture<String> future1=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "1";
}
});
CompletableFuture<String>nestedResult = future1.thenCompose(value->
CompletableFuture.supplyAsync(()->{
return value+"2";
}));
System.out.println(nestedResult.get());
}
public static void main(String[] args) throws Exception {
asyncCompose();
}
}
輸出結果:12
public class TestCompletableFuture {
public static void asyncCombine() throws ExecutionException, InterruptedException {
CompletableFuture<Double> d1= CompletableFuture.supplyAsync(new Supplier<Double>() {
@Override
public Double get() {
return 1d;
}
});
CompletableFuture<Double> d2= CompletableFuture.supplyAsync(new Supplier<Double>() {
@Override
public Double get() {
return 2d;
}
});
CompletableFuture<Double> result= d1.thenCombine(d2,(number1,number2)->{
return number1+number2;
});
System.out.println(result.get());
}
public static void main(String[] args) throws Exception {
asyncCombine();
}
}
輸出結果:3d
6、合並多個任務的結果allOf與anyOf
public class TestCompletableFuture {
public static void mutilTaskTest() throws ExecutionException, InterruptedException {
//添加n個任務
CompletableFuture<Double> array[]=new CompletableFuture[3];
for ( int i = 0; i < 3; i++) {
array[i]=CompletableFuture.supplyAsync(new Supplier<Double>() {
@Override
public Double get() {
return Math.random();
}
});
}
//獲取結果的方式一
// CompletableFuture.allOf(array).get();
// for(CompletableFuture<Double> cf:array){
// if(cf.get()>0.6){
// System.out.println(cf.get());
// }
// }
//獲取結果的方式二,過濾大於指定數字,在收集輸出
List<Double> rs= Stream.of(array).map(CompletableFuture::join).filter(number->number>0.6).collect(Collectors.toList());
System.out.println(rs);
}
public static void main(String[] args) throws Exception {
mutilTaskTest();
}
}
結果如下(結果可能不一致):
[0.85538057702618, 0.7692532053269862, 0.6441387373310598]
注意其中的join方法和get方法類似,僅僅在於在Future不能正常完成的時候拋出一個unchecked的exception,這可以確保它用在Stream的map方法中,直接使用get是沒法在map里面運行的。
public class TestCompletableFuture {
public static void mutilTaskTest() throws ExecutionException, InterruptedException {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "wait 4 seconds";
}
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "wait 2 seconds";
}
});
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "wait 10 seconds";
}
});
CompletableFuture<Object> result = CompletableFuture.anyOf(f1, f2, f3);
System.out.println(result.get());
}
public static void main(String[] args) throws Exception {
mutilTaskTest();
}
}
輸出結果:
wait 2 seconds
注意由於Anyof返回的是其中任意一個Future所以這里沒有明確的返回類型,統一使用Object接受,留給使用端處理。
public class TestCompletableFuture {
public static void exceptionProcess() throws ExecutionException, InterruptedException {
int age=-1;
CompletableFuture<String> task= CompletableFuture.supplyAsync(new Supplier<String>(){
@Override
public String get(){
if(age<0){
throw new IllegalArgumentException("性別必須大於0");
}
if(age<18){
return "未成年人";
}
return "成年人";
}
}).exceptionally(ex->{
System.out.println(ex.getMessage());
return "發生 異常"+ex.getMessage();
});
System.out.println(task.get());
}
public static void main(String[] args) throws Exception {
exceptionProcess();
}
}
結果如下:
java.lang.IllegalArgumentException: 性別必須大於0
發生 異常java.lang.IllegalArgumentException: 性別必須大於0
此外還有另外一種異常捕捉方法handle,無論發生異常都會執行,示例如下:
public class TestCompletableFuture {
public static void exceptionProcess() throws ExecutionException, InterruptedException {
int age = -10;
CompletableFuture<String> task= CompletableFuture.supplyAsync(new Supplier<String>(){
@Override
public String get(){
if(age<0){
throw new IllegalArgumentException("性別必須大於0");
}
if(age<18){
return "未成年人";
}
return "成年人";
}
}).handle((res,ex)->{
System.out.println("執行handle");
if(ex!=null){
System.out.println("發生異常");
return "發生 異常"+ex.getMessage();
}
return res;
});
System.out.println(task.get());
}
public static void main(String[] args) throws Exception {
exceptionProcess();
}
}
輸出結果:
執行handle
發生異常
發生 異常java.lang.IllegalArgumentException: 性別必須大於0
注意上面的方法如果正常執行,也會執行handle方法。
-
orTimeout()
-
completeOnTimeout()
-
Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
-
Executor delayedExecutor(long delay, TimeUnit unit)
