1,什么是發布訂閱模式?
在軟件架構中,發布訂閱是一種消息范式,消息的發送者(稱為發布者)不會將消息直接發送給特定的接收者(稱為訂閱者)。而是將發布的消息分為不同的類別,無需了解哪些訂閱者(如果有的話)可能存在。同樣的,訂閱者可以表達對一個或多個類別的興趣,只接收感興趣的消息,無需了解哪些發布者(如果有的話)存在。
Java9開始新增了一個發布-訂閱框架,框架是基於異步響應流。發布,訂閱框架可以非常方便地處理異步線程之間的流數據交換( 比如兩個線程之間需要交換數據) 而且這個發布、訂閱框架不需要使用數據中心來緩沖數據,同時具有非常高效的性能。
2,發布訂閱模式的4個角色
- Flow.Publisher: 代表數據發布者,生產者
- Flow.Subscriber: 表數據訂閱者、消費者
- Flow.Subscription: 表發布者和訂閱者之間的鏈接紐帶。訂閱者既可通過調用該對象的request()方法來獲取數據項,也可通過調用對象的cancel()方法來取消訂閱。
- Flow.Processor: 數據處理器,它可同時作為發布者和訂閱者使用
測試用例:發布者每秒鍾發布一條消息,訂閱者每秒鍾訂閱一條消息。
注意:訂閱者處理消息,依賴當前線程的存活狀態,如果發布消息后當前程序代碼運行完畢會立即退出,訂閱者來不及執行任何程序。
此例 用鎖保持當前線程存活
import java.util.List; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @ClassName PublisherFlowSubscriber * @projectName: object1 * @author: Zhangmingda * @description: XXX * date: 2021/4/28. */ public class PublisherFlowSubscriber { /** * 定義用來保持線程不退出的鎖 */ private static Lock lock = new ReentrantLock(true); private static Condition condition = lock.newCondition(); public static void main(String[] args) throws InterruptedException { /** * 定義一個發布者,需要設定要發送消息的泛型數據類型 */ SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); /** * 定義一個訂閱者 */ MySubscirber<String> subscirber = new MySubscirber<>("訂閱者1"); MySubscirber<String> subscirber2 = new MySubscirber<>("訂閱者2"); /** * 通過發布者配置訂閱者 會觸發訂閱者的onSubscribe方法,他們之間的鏈接紐帶會通過參數傳遞給onSubscribe方法,如果注冊失敗會觸發onError方法 */ publisher.subscribe(subscirber);publisher.subscribe(subscirber2); /** * 測試發布消息 */ List<String> list = List.of("張三", "李四", "王五", "趙六"); list.forEach(string -> publisher.submit(string)); //向訂閱者發布數據,需要保持前台的線程存活,否則當前線程執行結束,發布者和訂閱者都被銷毀了。 /** * 關閉消息發布 */ publisher.close(); //關閉后,如果當前線程未退出,待訂閱者所有消息都處理完畢才會運行訂閱者的onComplete方法 lock.lock(); //拋出鎖 condition.await(); lock.unlock(); } /** * 定義訂閱者類,需要注意實現接口Flow.Subscriber 實現其泛型傳遞 */ private static class MySubscirber<T> implements Flow.Subscriber<T>{ /** * 訂閱者自定義的屬性,名字,關聯的訂閱平台 */ private String name; private Flow.Subscription subscription; public MySubscirber(String name) { this.name = name; } /** * 訂閱的時候觸發的方法 * @param subscription 訂閱者被關聯的訂閱平台 */ @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println(name + "開啟訂閱" + subscription); /** * 從訂閱平台獲取一條消息 */ subscription.request(1); /** * 將平台實例保存,便於復用 */ this.subscription = subscription; } /** * 獲取一條數據后觸發的方法 * @param */ @Override public void onNext(T t) { System.out.println(name + "獲取到了一條數據:" +t); //再次獲取一條數據...自循環觸發自己循環調用,一直將所有數據獲取完畢 subscription.request(1); /** * 模擬處理耗時 */ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 訂閱出錯時運行的方法 * @param throwable 錯誤對象 */ @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } /** * 發布者停止發布,且訂閱者處理完接收數據后,觸發該方法 */ @Override public void onComplete() { System.out.println(name + "發布者關閉了發布"); lock.lock(); condition.signalAll(); lock.unlock(); } } }

