靈感來源於一個豬隊友給我的題目
看到這個,我抓住的關鍵字是:任何子任務失敗,要通知所有子任務執行取消邏輯。
這不就是消息廣播嗎?觀察者模式!
干活
首先是收聽者
package com.example.broadcast; /** * 每個節點即是廣播者,也是收聽者 */ public interface Listener { /** * 設置調度中心 */ void setCenter(DispatchCenter center); /** * 主動通知其它收聽者 */ void notice(String msg); /** * 自己收到通知的處理邏輯 * @param msg */ void whenReceived(String msg); /** * 收聽者標志:唯一 * @return */ String identify(); }
然后是調度中心
package com.example.broadcast; /** * 調度中心 */ public interface DispatchCenter { /** * 廣播 * @param own 廣播的時候,要排除自己 * @param msg 廣播消息 */ void broadcast(String own, String msg); /** * 添加收聽者 * @param listener */ void addListener(Listener listener); }
調度中心實現
package com.example.broadcast; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class DispatchCenterImpl implements DispatchCenter { private static final Map<String, Listener> MAP = new ConcurrentHashMap<>(); @Override public void broadcast(String own, String msg) { MAP.forEach((k,v) -> { // 不用給自己發通知 if (!k.equals(own)){ v.whenReceived(msg); } }); } @Override public void addListener(Listener listener) { listener.setCenter(this); MAP.put(listener.identify(), listener); } }
剩下三個收聽者
package com.example.broadcast; import java.util.UUID; public class ListenerA implements Listener { private DispatchCenter center; private String identify; public ListenerA() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "收到消息:" + msg); } @Override public String identify() { return identify; } }
B和C除了類名不一樣,其他都一樣,不再贅述。目錄如下
測試
package com.example.broadcast; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { DispatchCenter center = new DispatchCenterImpl(); ListenerA listenerA = new ListenerA(); ListenerB listenerB = new ListenerB(); ListenerC listenerC = new ListenerC(); center.addListener(listenerA); center.addListener(listenerB); center.addListener(listenerC); ExecutorService executorService = Executors.newFixedThreadPool(3); // A觸發1條事件 executorService.submit(() -> { int i = 1; while (i > 0){ listenerA.notice(listenerA.getClass().getName() + "說:我有" + new Random().nextInt(1000000) + "元"); i--; } }); // B觸發2條事件 executorService.submit(() -> { int i = 2; while (i > 0){ listenerB.notice(listenerB.getClass().getName() + "說:我有" + new Random().nextInt(1000000) + "元"); i--; } }); // C觸發3條事件 executorService.submit(() -> { int i = 3; while (i > 0){ listenerC.notice(listenerC.getClass().getName() + "說:我有" + new Random().nextInt(1000000) + "元"); i--; } }); executorService.shutdown(); } }
輸出:
流程圖
當其中的B節點,發生了錯誤,除了把自己處理好之外
1. 向調度中心發送廣播請求,並攜帶需要的消息
2. 調度中心遍歷收聽者,挨個通知(執行)每一個收聽者接受消息的邏輯
關於停止任務
因為題目要求,【快速取消】所有子任務
關於線程停止的方法也有很多:
1. 優雅退出run方法
2. 暴力stop
3. run方法拋出異常
如果說要求,A異常了,B和C收到消息之后,線程立即停止,不能有一點遲疑,說實話我還沒想到該怎么做。因為你要知道,實際上的任務的run方法內部,不太可能是個while循環,人家可能就是個順序執行,所以停止標志位的方式,並不適用。
我先寫個按照標志位停止的“玩具”。
修改三個收聽者代碼和測試類
package com.example.broadcast; import lombok.SneakyThrows; import java.util.Random; import java.util.UUID; public class ListenerA implements Listener,Runnable { private DispatchCenter center; private String identify; public ListenerA() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "收到消息:" + msg); } @Override public String identify() { return identify; } @SneakyThrows @Override public void run() { // 5秒之后,模擬發生異常 Thread.sleep(5000); notice(this.getClass().getName() + "說:我有" + new Random().nextInt(1000000) + "元"); System.out.println(this.getClass().getName() + "程序異常,並已經傳播了消息..."); } }
package com.example.broadcast; import lombok.SneakyThrows; import java.util.UUID; public class ListenerB implements Listener,Runnable { private DispatchCenter center; private String identify; private volatile Boolean stopFlag = false; public ListenerB() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg); // 停止當前線程 stopFlag = true; } @Override public String identify() { return identify; } @SneakyThrows @Override public void run() { while (!stopFlag){ Thread.sleep(1000); System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B在執行任務"); } System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B Dead"); } }
package com.example.broadcast; import lombok.SneakyThrows; import java.util.UUID; public class ListenerC implements Listener,Runnable { private DispatchCenter center; private String identify; private volatile Boolean stopFlag = false; public ListenerC() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg); // 停止當前線程 stopFlag = true; } @Override public String identify() { return identify; } @SneakyThrows @Override public void run() { while (!stopFlag){ Thread.sleep(1000); System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C在執行任務"); } System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C Dead"); } }
測試
package com.example.broadcast; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { DispatchCenter center = new DispatchCenterImpl(); ListenerA listenerA = new ListenerA(); ListenerB listenerB = new ListenerB(); ListenerC listenerC = new ListenerC(); center.addListener(listenerA); center.addListener(listenerB); center.addListener(listenerC); ExecutorService executorService = Executors.newFixedThreadPool(3); // A executorService.submit(listenerA); // B executorService.submit(listenerB); // C executorService.submit(listenerC); executorService.shutdown(); } }
再想一想
這個問題想想並不簡單:
1.這不是單一線程處理異常的情況(如果只是單一線程,自己的異常自己捕獲並處理即可)
2. A出現了異常,B收到了取消任務通知,問題在於, 1)不知道B目前執行到哪里了,沒辦法讓B停下手中的工作。2)如果殺死B線程,那么執行一半的任務,會不會導致什么程序異常,或者臟數據之類的?
3. 穩妥一點的方法就是,A出現了異常,B收到了通知之后,照常執行任務,只是當收到了異常通知的時候,會在正常邏輯的后面調用一個任務回退方法;而所有任務正常工作,則不會調用這個回退方法。
4. 這個思路讓我想到了分布式事務,是不是有內味了?
改動收聽者接口,增加回滾方法
package com.example.broadcast; /** * 每個節點即是廣播者,也是收聽者 */ public interface Listener { /** * 設置調度中心 */ void setCenter(DispatchCenter center); /** * 主動通知其它收聽者 */ void notice(String msg); /** * 自己收到通知的處理邏輯 * @param msg */ void whenReceived(String msg); /** * 收聽者標志:唯一 * @return */ String identify(); /** * 發生異常時,任務回退方法 */ void rollback(); }
A
package com.example.broadcast; import lombok.SneakyThrows; import java.util.Random; import java.util.UUID; public class ListenerA implements Listener,Runnable { private DispatchCenter center; private String identify; public ListenerA() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "收到消息:" + msg); } @Override public String identify() { return identify; } @Override public void rollback() { System.out.println(this.getClass().getName() + "任務回退!!!"); } @SneakyThrows @Override public void run() { // 5秒之后,模擬發生異常 Thread.sleep(5000); notice(this.getClass().getName() + "說:我有" + new Random().nextInt(1000000) + "元"); // A異常,回滾 rollback(); } }
B
package com.example.broadcast; import lombok.SneakyThrows; import java.util.UUID; public class ListenerB implements Listener,Runnable { private DispatchCenter center; private String identify; private volatile Boolean rollbackFlag = false; public ListenerB() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg); // 任務需要回滾 rollbackFlag = true; } @Override public String identify() { return identify; } @Override public void rollback() { System.out.println(this.getClass().getName() + "任務回退!!!"); } @SneakyThrows @Override public void run() { // 模擬任務耗時,執行6秒的任務 for (int i = 0; i < 3; i++){ Thread.sleep(2000); System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B在執行任務"); } if (rollbackFlag){ rollback(); } } }
C
package com.example.broadcast; import lombok.SneakyThrows; import java.util.UUID; public class ListenerC implements Listener,Runnable { private DispatchCenter center; private String identify; private volatile Boolean rollbackFlag = false; public ListenerC() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg); // 任務需要回滾 rollbackFlag = true; } @Override public String identify() { return identify; } @Override public void rollback() { System.out.println(this.getClass().getName() + "任務回退!!!"); } @SneakyThrows @Override public void run() { // 模擬任務耗時,執行9秒的任務 for (int i = 0; i < 3; i++){ Thread.sleep(3000); System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C在執行任務"); } if (rollbackFlag){ rollback(); } } }
測試Main不變,執行