手寫一個消息隊列以及延遲消息隊列


一、什么是消息隊列?

消息隊列(Message Queue),是分布式系統中重要的組件,其通用的使用場景可以簡單地描述為:
當不需要立即獲得結果,但是並發量又需要進行控制的時候,差不多就是需要使用消息隊列的時候

二、消息隊列有什么用?

1. 提高響應速度

異步處理,串行化的功能變成並行化,從而提升系統性能,縮短響應時間
常用於秒殺、發送短信通知等,需要立即返回結果的場景

2. 流量控制

在高並發的情況,為了避免大量的請求沖擊后端服務,可以使用消息隊列暫存請求,后端服務按照自己的重能力,從隊列中消費,例如秒殺、埋點場景。
這樣可以隨時增加服務的實例數量水平擴容,而不用對系統的其他部分做修改

3.系統解耦

例如一個下單的信息需要同步多個子系統,每個子系統都需要保存訂單的數據的一部分,如果光靠訂單服務的團隊去維護所有的子系統數據同步,代價太大
解決方法是,通過發布訂閱模型,訂單服務在訂單變化時發送一條消息到一個主題中,所有的下游子系統都訂單主題,這樣可以每個子系統都可以獲得一份完整的訂單數據
即使是增加、減少子系統,也不會對訂單服務造成影響

三、消息隊列有什么缺點?

  1. 同步消息改成了異步,增加了系統的調用鏈,增加了系統的復雜度
  2. 降低了數據一致性,如果要保持一致性,需要高代價的補償(如分布式事務、對賬)
  3. 引入了消息隊列帶來的延遲問題

四、如何自定義一個消息隊列?

我們可使用 Queue 來實現消息隊列,Queue 大體可分為以下三類:

雙端隊列(Deque)是 Queue 的子類也是 Queue 的補充類,頭部和尾部都支持元素插入和獲取;
阻塞隊列指的是在元素操作時(添加或刪除),如果沒有成功,會阻塞等待執行,比如當添加元素時,如果隊列元素已滿,隊列則會阻塞等待直到有空位時再插入;
非阻塞隊列,和阻塞隊列相反,它會直接返回操作的結果,而非阻塞等待操作,雙端隊列也屬於非阻塞隊列。

自定義消息隊列的實現代碼如下:
import java.util.LinkedList;
import java.util.Queue;

/**
 * @author james
 * @version 1.0.0
 * @Description 自定義實現消息隊列
 * @createTime 2020年08月15日 16:34:00
 */
public class CustomQueue {

    /**
     * 定義消息隊列
     */
    private static Queue<String> queue = new LinkedList<>();

    public static void main(String[] args) {
        producer();  // 調用生產者
        consumer();  // 調用消費者
    }

    public static void consumer() {
        while (!queue.isEmpty()){
            System.out.println(queue.poll());
        }
    }

    public static void  producer(){
        queue.add("hello,");
        queue.add("queue");
        queue.add("!");
    }
}

以上程序的執行結果為

hello,
queue
!

可以看出消息是以先進先出順序進行消費的。

實現自定義延遲隊列需要實現 Delayed 接口,重寫 getDelay() 方法,延遲隊列完整實現代碼如下:

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @author james
 * @version 1.0.0
 * @Description 延時消息隊列
 * @createTime 2020年08月16日 10:27:00
 */
public class MyDelay implements Delayed {

    /**
     * 延遲截止時間(毫秒)
     */
    long delayTime = System.currentTimeMillis();


    private String msg;

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }


    public MyDelay(long delayTime, String msg) {
        this.delayTime = this.delayTime + delayTime;
        this.msg = msg;
    }

    /**
     * 獲取剩余時間
     *
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
            return 1;
        } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
            return -1;
        } else {
            return 0;
        }
    }

    @Override
    public String toString() {
        return this.msg;
    }
}
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;


/**
 * @author james
 * @version 1.0.0
 * @Description TODO
 * @createTime 2020年08月16日 10:45:00
 */
public class CustomDelayQueue {

    public static final DelayQueue delayQueue = new DelayQueue();

    public static void main(String[] args) throws InterruptedException {
        producer();  // 調用生產者
        consumer();  // 調用消費者
    }

    public static void consumer() throws InterruptedException {
        System.out.println("開始執行時間:" +
                DateFormat.getDateTimeInstance().format(new Date()));
        while (!delayQueue.isEmpty()) {
            System.out.println(delayQueue.take());
        }
        System.out.println("結束執行時間:" +
                DateFormat.getDateTimeInstance().format(new Date()));
    }

    public static void producer() {
        // 添加消息
        delayQueue.put(new MyDelay(1000, "hello,"));
        delayQueue.put(new MyDelay(60000, "delayQueue"));
    }
}

同樣,一個簡易版的延遲消息隊列就這樣完成了!

關注我的技術公眾號,每天都有優質技術文章推送。
微信掃一掃下方二維碼即可關注:
在這里插入圖片描述


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM