java 多線程 發布訂閱模式:發布者java.util.concurrent.SubmissionPublisher;訂閱者java.util.concurrent.Flow.Subscriber


1,什么是發布訂閱模式?

在軟件架構中,發布訂閱是一種消息范式,消息的發送者(稱為發布者)不會將消息直接發送給特定的接收者(稱為訂閱者)。而是將發布的消息分為不同的類別,無需了解哪些訂閱者(如果有的話)可能存在。同樣的,訂閱者可以表達對一個或多個類別的興趣,只接收感興趣的消息,無需了解哪些發布者(如果有的話)存在。
Java9開始新增了一個發布-訂閱框架,框架是基於異步響應流。發布,訂閱框架可以非常方便地處理異步線程之間的流數據交換( 比如兩個線程之間需要交換數據) 而且這個發布、訂閱框架不需要使用數據中心來緩沖數據,同時具有非常高效的性能。

 

2,發布訂閱模式的4個角色

  1. Flow.Publisher: 代表數據發布者,生產者
  2. Flow.Subscriber: 表數據訂閱者、消費者
  3. Flow.Subscription: 表發布者和訂閱者之間的鏈接紐帶。訂閱者既可通過調用該對象的request()方法來獲取數據項,也可通過調用對象的cancel()方法來取消訂閱。
  4. Flow.Processor: 數據處理器,它可同時作為發布者和訂閱者使用

測試用例:發布者每秒鍾發布一條消息,訂閱者每秒鍾訂閱一條消息。

注意:訂閱者處理消息,依賴當前線程的存活狀態,如果發布消息后當前程序代碼運行完畢會立即退出,訂閱者來不及執行任何程序。

此例 用鎖保持當前線程存活

import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @ClassName PublisherFlowSubscriber
 * @projectName: object1
 * @author: Zhangmingda
 * @description: XXX
 * date: 2021/4/28.
 */
public class PublisherFlowSubscriber {
    /**
     * 定義用來保持線程不退出的鎖
     */
    private static Lock lock = new ReentrantLock(true);
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        /**
         * 定義一個發布者,需要設定要發送消息的泛型數據類型
         */
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        /**
         * 定義一個訂閱者
         */
        MySubscirber<String> subscirber = new MySubscirber<>("訂閱者1");
        MySubscirber<String> subscirber2 = new MySubscirber<>("訂閱者2");
        /**
         * 通過發布者配置訂閱者 會觸發訂閱者的onSubscribe方法,他們之間的鏈接紐帶會通過參數傳遞給onSubscribe方法,如果注冊失敗會觸發onError方法
         */
        publisher.subscribe(subscirber);publisher.subscribe(subscirber2);

        /**
         * 測試發布消息
         */
        List<String> list =  List.of("張三", "李四", "王五", "趙六");
        list.forEach(string -> publisher.submit(string)); //向訂閱者發布數據,需要保持前台的線程存活,否則當前線程執行結束,發布者和訂閱者都被銷毀了。
        /**
         * 關閉消息發布
         */
        publisher.close(); //關閉后,如果當前線程未退出,待訂閱者所有消息都處理完畢才會運行訂閱者的onComplete方法
        lock.lock();
        //拋出鎖
        condition.await();
        lock.unlock();

    }

    /**
     * 定義訂閱者類,需要注意實現接口Flow.Subscriber 實現其泛型傳遞
     */
    private static class MySubscirber<T> implements Flow.Subscriber<T>{
        /**
         * 訂閱者自定義的屬性,名字,關聯的訂閱平台
         */
        private String name;
        private Flow.Subscription subscription;

        public MySubscirber(String name) {
            this.name = name;
        }

        /**
         * 訂閱的時候觸發的方法
         * @param subscription 訂閱者被關聯的訂閱平台
         */
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println(name + "開啟訂閱" + subscription);
            /**
             * 從訂閱平台獲取一條消息
             */
            subscription.request(1);
            /**
             * 將平台實例保存,便於復用
             */
            this.subscription = subscription;
        }

        /**
         * 獲取一條數據后觸發的方法
         * @param
         */
        @Override
        public void onNext(T t) {
            System.out.println(name + "獲取到了一條數據:" +t);
            //再次獲取一條數據...自循環觸發自己循環調用,一直將所有數據獲取完畢
            subscription.request(1);
            /**
             * 模擬處理耗時
             */
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        /**
         * 訂閱出錯時運行的方法
         * @param throwable 錯誤對象
         */
        @Override
        public void onError(Throwable throwable) {
            throwable.printStackTrace();
        }

        /**
         * 發布者停止發布,且訂閱者處理完接收數據后,觸發該方法
         */
        @Override
        public void onComplete() {
            System.out.println(name + "發布者關閉了發布");
            lock.lock();
            condition.signalAll();
            lock.unlock();
        }
    }
}

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM