前言
消息隊列是軟件系統領域用來實現系統間通信最廣泛的中間件。基於消息隊列的方式是指由應用中的某個系統負責發送消息,由關心這條消息的相關系統負責接收消息,並在收到消息后進行各自系統內的業務處理。消息可以非常簡單,比如只包含文本字符串;也可以很復雜,比如包含字節流、字節數組,還可以包含嵌入對象,甚至是Java對象(經過序列化的對象)。
消息在被發送后可以立即返回,由消息隊列來負責消息的傳遞,消息發布者只管將消息發布到消息隊列而不用管誰來取,消息使用者只管從消息隊列中取消息而不管是誰發布的,這樣發布者和使用者都不用知道對方的存在(見下圖)。
為何要用消息隊列
從上面的描述可以看出,消息隊列(MQ)是一種系統間相互協作的通信機制。那么什么時候需要使用消息隊列呢?
舉個例子。某天產品人員說“系統要增加一個鍋爐設備報警功能,當鍋爐設備溫度大於260度后,用戶能收到郵件”。在實際場景中這種需求很常見,開發人員覺得這個很簡單,就是提供一個判斷邏輯,當鍋爐設備溫度大於260度進行判斷,然后發送郵件,最好返回報警信息以警示。
該功能上線運行了一段時間后,產品人員說“設備高溫后收到郵件的響應有點慢,很多人都提出這個意見,能不能優化一下”。開發人員首先想到的優化方案是將鍋爐設備溫度判斷邏輯與發送郵件分開執行,怎么分呢?可以單獨開啟線程來做發送郵件的事情。
沒多久,產品人員又說“現在設備高溫並收到郵件的響應是快了,但有用戶反映沒收到報警郵件,能不能在發送郵件的時候先保存所發送郵件的內容,如果郵件發送失敗了則進行補發”。
看着開發人員愁眉苦臉的樣子,產品人員說“在郵件發送這塊平台部門已經做好方案了,你直接用他們提供的服務就行”。開發人員一聽,趕緊和平台部門溝通,對方的答復是“我們提供一個類似於郵局信箱的東西,你直接往這個信箱里寫上發送郵件的地址、郵件標題和內容,之后就不用你操心了,我們會直接從信箱里取消息,向你所填寫的郵件地址發送響應郵箱”。
這個故事講的就是使用消息隊列的典型場景---異步處理。消息隊列還可用於解決解耦、流量削峰、日志收集等問題。
簡單實現一個消息隊列
回到消息隊列這個術語本身,它包含了兩個關鍵詞: 消息和隊列。消息是指在應用間傳送的數據,消息的表現形式是多樣的,可以簡單到只包含文本字符串,也可以復雜到有一個結構化的對象定義格式。對於隊列,從抽象意義上來理解,就是指消息的進和出。從時間順序上說,進和出並不一定是同步進行的,所以需要一個容器來暫存和處理消息。因此,一個典型意義上的消息隊列,至少需要包含消息的發送、接受和暫存功能。
- Broker: 消息處理中心,負責消息的接受、存儲、轉發等。
- Producer: 消息生產者,負責產生和發送消息和消息處理中心。
- Consumer: 消息消費者,負責從消息處理中心獲取消息,並進行相應的處理。
可以看到,消息隊列服務的核心是消息處理中心,它至少要具備消息發送、消息接受和消息暫存功能。所以,我們就從消息處理中心開始逐步搭建一個消息隊列。
消息處理中心
先看一下消息處理中心類(InMemoryStorage)的實現
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @author james mu
* @date 2020/7/27 21:47
*/
public final class InMemoryStorage {
//保存消息數據的容器,<topic,消息阻塞隊列> 鍵值對
private final ConcurrentHashMap<String, BlockingQueque<QueueMsg>> storage;
private static InMemoryStorage instance;
private InMemoryStorage() {
storage = new ConcurrentHashMap<>();
}
//利用雙重檢查加鎖(double-checked locking),首先檢查是否示例已經創建了,如果尚未創建,"才"進行同步。這樣以來,只有第一次會同步,這正是我們想要的。
public static InMemoryStorage getInstance() {
if (instance == null) {
synchronized (InMemoryStorage.class) {
if (instance == null) {
instance = new InMemoryStorage();
}
}
}
return instance;
}
//保存消息到主題中,若topic對應的value為空,會將第二個參數的返回值存入並返回
public boolean put(String topic, QueueMsg msg) {
return storage.computeIfAbsent(topic, (t) -> new LinkedBlockingDeque<>()).add(msg);
}
//獲得主題中的消息
public <T extends QueueMsg> List<T> get(String topic) {
//判斷map中是否包含此topic
if (storage.containsKey(topic)) {
List<T> entities;
//從此主題對應的阻塞隊列中出隊一個元素
T first = (T) storage.get(topic).poll();
if (first != null) {
entities = new ArrayList<>();
entities.add(first);
List<QueueMsg> otherList = new ArrayList<>();
//移動阻塞隊列中最大999個元素到arrayList中
storage.get(topic).drainTo(otherList, 999);
for (QueueMsg other : otherList) {
entities.add((T) other);
}
} else {
entities = Collections.emptyList();
}
}
return Collections.emptyList();
}
//刪除此map中所有的鍵值對
public void cleanup() {
storage.clear();
}
}
作為一個消息處理中心中,至少要有一個數據容器用來保存接受到的消息。
Java中的隊列(Queue)是提供該功能的一種簡單的數據結構,同時為簡化隊列操作的並發訪問處理,我們選擇了它的一個子類LinkedBlockingDeque。該類提供了對數據的插入、獲取、查詢等操作,其底層將數據以鏈表的形式保存。如果用 offer方法插入數據時隊列沒滿,則數據插入成功,並立 即返回:如果隊列滿了,則直接返回 false。 如果用 poll方法刪除數據時隊列不為空, 則返回隊 列頭部的數據;如果隊列為空,則立刻返回 null。
消息格式定義
隊列消息接口定義(QueueMsg)
/**
* @author james mu
* @date 2020/7/27 22:00
*/
public interface QueueMsg {
//消息鍵
String getKey();
//消息頭
QueueMsgHeaders getHeaders();
//消息負載byte數組
byte[] getData();
}
隊列消息頭接口定義(QueueMsgHeaders)
import java.util.Map;
/**
* @author james mu
* @date 2020/7/27 21:55
*/
public interface QueueMsgHeaders {
//消息頭放入
byte[] put(String key, byte[] value);
//消息頭通過key獲取byte數組
byte[] get(String key);
//消息頭數據全部讀取方法
Map<String, byte[]> getData();
}
隊列消息格式(ProtoQueueMsg)
/**
* @author jamesmsw
* @date 2021/2/19 2:23 下午
*/
public class ProtoQueueMsg implements QueueMsg {
private final String key;
private final String value;
private final QueueMsgHeaders headers;
public ProtoQueueMsg(String key, String value) {
this(key, value, new DefaultQueueMsgHeaders());
}
public ProtoQueueMsg(String key, String value, QueueMsgHeaders headers) {
this.key = key;
this.value = value;
this.headers = headers;
}
@Override
public String getKey() {
return key;
}
@Override
public QueueMsgHeaders getHeaders() {
return headers;
}
@Override
public byte[] getData() {
return value.getBytes();
}
}
默認隊列消息頭(DefaultQueueMsgHeaders)
import java.util.HashMap;
import java.util.Map;
/**
* @author james mu
* @date 2020/7/27 21:57
*/
public class DefaultQueueMsgHeaders implements QueueMsgHeaders {
protected final Map<String, byte[]> data = new HashMap<>();
@Override
public byte[] put(String key, byte[] value) {
return data.put(key, value);
}
@Override
public byte[] get(String key) {
return data.get(key);
}
@Override
public Map<String, byte[]> getData() {
return data;
}
}
消息生產者
import iot.technology.mqtt.storage.msg.QueueMsg;
import iot.technology.mqtt.storage.queue.QueueCallback;
/**
* @author james mu
* @date 2020/8/31 11:05
*/
public class Producer<T extends QueueMsg> {
private final InMemoryStorage storage = InMemoryStorage.getInstance();
private final String defaultTopic;
public Producer(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
public void send(String topicName, T msg) {
boolean result = storage.put(topicName, msg);
}
}
消息消費者
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* @author james mu
* @date 2020/8/31 11:23
*/
@Slf4j
public class Consumer<T extends QueueMsg> {
private final InMemoryStorage storage = InMemoryStorage.getInstance();
private volatile Set<String> topics;
private volatile boolean stopped;
private volatile boolean subscribed;
private final String topic;
//虛構函數
public Consumer(String topic) {
this.topic = topic;
stopped = false;
}
public String getTopic() {
return topic;
}
public void subscribe() {
topics = Collections.singleton(topic);
subscribed = true;
}
//批量訂閱主題
public void subscribe(Set<String> topics) {
this.topics = topics;
subscribed = true;
}
public void unsubscribe() {
stopped = true;
}
//不斷讀取topic集合下阻塞隊列中的數據集合
public List<T> poll(long durationInMillis) {
if (subscribed) {
List<T> messages = topics
.stream()
.map(storage::get)
.flatMap(List::stream)
.map(msg -> (T) msg).collect(Collectors.toList());
if (messages.size() > 0) {
return messages;
}
try {
Thread.sleep(durationInMillis);
} catch (InterruptedException e) {
if (!stopped) {
log.error("Failed to sleep.", e);
}
}
}
return Collections.emptyList();
}
}
至此,一個簡單的消息隊列中就實現完畢了。
有的同學可能會質疑我上面設計的實戰性,不用擔心,在下一節中,我將帶大家通過閱讀高達8k+🌟的Thingsboard的內存型消息隊列源碼,看下是否和我上面的設計一致。