一、什么是消息隊列?
消息隊列(Message Queue),是分布式系統中重要的組件,其通用的使用場景可以簡單地描述為:
當不需要立即獲得結果,但是並發量又需要進行控制的時候,差不多就是需要使用消息隊列的時候
二、消息隊列有什么用?
1. 提高響應速度
異步處理,串行化的功能變成並行化,從而提升系統性能,縮短響應時間
常用於秒殺、發送短信通知等,需要立即返回結果的場景
2. 流量控制
在高並發的情況,為了避免大量的請求沖擊后端服務,可以使用消息隊列暫存請求,后端服務按照自己的重能力,從隊列中消費,例如秒殺、埋點場景。
這樣可以隨時增加服務的實例數量水平擴容,而不用對系統的其他部分做修改
3.系統解耦
例如一個下單的信息需要同步多個子系統,每個子系統都需要保存訂單的數據的一部分,如果光靠訂單服務的團隊去維護所有的子系統數據同步,代價太大
解決方法是,通過發布訂閱模型,訂單服務在訂單變化時發送一條消息到一個主題中,所有的下游子系統都訂單主題,這樣可以每個子系統都可以獲得一份完整的訂單數據
即使是增加、減少子系統,也不會對訂單服務造成影響
三、消息隊列有什么缺點?
- 同步消息改成了異步,增加了系統的調用鏈,增加了系統的復雜度
- 降低了數據一致性,如果要保持一致性,需要高代價的補償(如分布式事務、對賬)
- 引入了消息隊列帶來的延遲問題
四、如何自定義一個消息隊列?
我們可使用 Queue 來實現消息隊列,Queue 大體可分為以下三類:
雙端隊列(Deque)是 Queue 的子類也是 Queue 的補充類,頭部和尾部都支持元素插入和獲取;
阻塞隊列指的是在元素操作時(添加或刪除),如果沒有成功,會阻塞等待執行,比如當添加元素時,如果隊列元素已滿,隊列則會阻塞等待直到有空位時再插入;
非阻塞隊列,和阻塞隊列相反,它會直接返回操作的結果,而非阻塞等待操作,雙端隊列也屬於非阻塞隊列。
自定義消息隊列的實現代碼如下:
import java.util.LinkedList;
import java.util.Queue;
/**
* @author james
* @version 1.0.0
* @Description 自定義實現消息隊列
* @createTime 2020年08月15日 16:34:00
*/
public class CustomQueue {
/**
* 定義消息隊列
*/
private static Queue<String> queue = new LinkedList<>();
public static void main(String[] args) {
producer(); // 調用生產者
consumer(); // 調用消費者
}
public static void consumer() {
while (!queue.isEmpty()){
System.out.println(queue.poll());
}
}
public static void producer(){
queue.add("hello,");
queue.add("queue");
queue.add("!");
}
}
以上程序的執行結果為
hello,
queue
!
可以看出消息是以先進先出順序進行消費的。
實現自定義延遲隊列需要實現 Delayed 接口,重寫 getDelay() 方法,延遲隊列完整實現代碼如下:
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author james
* @version 1.0.0
* @Description 延時消息隊列
* @createTime 2020年08月16日 10:27:00
*/
public class MyDelay implements Delayed {
/**
* 延遲截止時間(毫秒)
*/
long delayTime = System.currentTimeMillis();
private String msg;
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public MyDelay(long delayTime, String msg) {
this.delayTime = this.delayTime + delayTime;
this.msg = msg;
}
/**
* 獲取剩余時間
*
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
@Override
public String toString() {
return this.msg;
}
}
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
/**
* @author james
* @version 1.0.0
* @Description TODO
* @createTime 2020年08月16日 10:45:00
*/
public class CustomDelayQueue {
public static final DelayQueue delayQueue = new DelayQueue();
public static void main(String[] args) throws InterruptedException {
producer(); // 調用生產者
consumer(); // 調用消費者
}
public static void consumer() throws InterruptedException {
System.out.println("開始執行時間:" +
DateFormat.getDateTimeInstance().format(new Date()));
while (!delayQueue.isEmpty()) {
System.out.println(delayQueue.take());
}
System.out.println("結束執行時間:" +
DateFormat.getDateTimeInstance().format(new Date()));
}
public static void producer() {
// 添加消息
delayQueue.put(new MyDelay(1000, "hello,"));
delayQueue.put(new MyDelay(60000, "delayQueue"));
}
}
同樣,一個簡易版的延遲消息隊列就這樣完成了!
關注我的技術公眾號,每天都有優質技術文章推送。
微信掃一掃下方二維碼即可關注: