消息隊列
首先做簡單的引入。
MQ主要是用來:
- 解耦應用、
- 異步化消息
- 流量削峰填谷
目前使用的較多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。
網上的資源對各種情況都有詳細的解釋,在此不做過多贅述。本文
僅介紹如何使用Redis實現輕量級MQ的過程。
為什么要用Redis實現輕量級MQ?
在業務的實現過程中,就算沒有大量的流量,解耦和異步化幾乎也是處處可用,此時MQ就顯得尤為重要。但與此同時MQ也是一個蠻重的組件,例如我們如果用RabbitMQ就必須為它搭建一個服務器,同時如果要考慮可用性,就要為服務端建立一個集群,而且在生產如果有問題也需要查找功能。在中小型業務的開發過程中,可能業務的其他整個實現都沒這個重。過重的組件服務會成倍增加工作量。
所幸的是,Redis提供的list數據結構非常適合做消息隊列。
但是如何實現即時消費?如何實現ack機制?這些是實現的關鍵所在。
如何實現即時消費?
網上所流傳的方法是使用Redis中list的操作BLPOP或BRPOP,即列表的阻塞式(blocking)彈出。
讓我們來看看阻塞式彈出的使用方式:
BRPOP key [key ...] timeout
此命令的說明是:
1、當給定列表內沒有任何元素可供彈出的時候,連接將被 BRPOP 命令阻塞,直到等待超時或發現可彈出元素為止。
2、當給定多個key參數時,按參數 key 的先后順序依次檢查各個列表,彈出第一個非空列表的尾部元素。
另外,BRPOP 除了彈出元素的位置和 BLPOP 不同之外,其他表現一致。
以此來看,列表的阻塞式彈出有兩個特點:
1、如果list中沒有任務的時候,該連接將會被阻塞
2、連接的阻塞有一個超時時間,當超時時間設置為0時,即可無限等待,直到彈出消息
由此看來,此方式是可行的,但此為傳統的觀察者模式,業務簡單則可使用,如A的任務只由B去執行。但如果A和Z的任務,B和C都能執行,那使用這種方式就相形見肘。這個時候就應該使用訂閱/發布模式,使業務系統更加清晰。
好在Redis也支持Pub/Sub(發布/訂閱)。在消息A入隊list的同時發布(PUBLISH)消息B到頻道channel,此時已經訂閱channel的worker就接收到了消息B,知道了list中有消息A進入,即可循環lpop或rpop來消費list中的消息。流程如下:

其中的worker可以是單獨的線程,也可以是獨立的服務,其充當了Consumer和業務處理者角色。下面做實例說明。
即時消費實例
示例場景為:worker要做同步文件功能,等到有文件生成時立馬同步。
首先開啟一個線程代表worker,來訂閱頻道channel:
-
-
public class SubscribeService {
-
-
-
private RedisService redisService;
-
-
private SynListener synListener;//訂閱者
-
-
-
public void subscribe() {
-
new Thread(new Runnable() {
-
-
-
public void run() {
-
LogCvt.info( "服務已訂閱頻道:{}", channel);
-
redisService.subscribe(synListener, channel);
-
}
-
}).start();
-
-
}
-
}
代碼中的SynListener即為所聲明的訂閱者,channel為訂閱的頻道名稱,具體的訂閱邏輯如下:
-
-
public class SynListener extends JedisPubSub {
-
-
-
private DispatchMessageHandler dispatchMessageHandler;
-
-
-
public void onMessage(String channel, String message) {
-
LogCvt.info( "channel:{},receives message:{}",channel,message);
-
try {
-
//處理業務(同步文件)
-
dispatchMessageHandler.synFile();
-
} catch (Exception e) {
-
LogCvt.error(e.getMessage(),e);
-
}
-
}
-
}
處理業務的時候,就去list中去消費消息:
-
-
public class DispatchMessageHandler {
-
-
-
private RedisService redisService;
-
-
private MessageHandler messageHandler;
-
-
public void synFile(){
-
while(true){
-
try {
-
String message = redisService.lpop(RedisKeyUtil.syn_file_queue_key());
-
if (null == message){
-
break;
-
}
-
Thread.currentThread().setName(Tools.uuid());
-
// 隊列數據處理
-
messageHandler.synfile(message);
-
} catch (Exception e) {
-
LogCvt.error(e.getMessage(),e);
-
}
-
}
-
}
-
-
}
這樣我們就達到了消息的實時消費的目的。
如何實現ack機制?
ack,即消息確認機制(Acknowledge)。
首先來看RabbitMQ的ack機制:
- Publisher把消息通知給Consumer,如果Consumer已處理完任務,那么它將向Broker發送ACK消息,告知某條消息已被成功處理,可以從隊列中移除。如果Consumer沒有發送回ACK消息,那么Broker會認為消息處理失敗,會將此消息及后續消息分發給其他Consumer進行處理(redeliver flag置為true)。
- 這種確認機制和TCP/IP協議確立連接類似。不同的是,TCP/IP確立連接需要經過三次握手,而RabbitMQ只需要一次ACK。
- 值的注意的是,RabbitMQ當且僅當檢測到ACK消息未發出且Consumer的連接終止時才會將消息重新分發給其他Consumer,因此不需要擔心消息處理時間過長而被重新分發的情況。
那么在我們用Redis實現消息隊列的ack機制的時候該怎么做呢?
需要注意兩點:
- work處理失敗后,要回滾消息到原始pending隊列
- 假如worker掛掉,也要回滾消息到原始pending隊列
上面第一點可以在業務中完成,即失敗后執行回滾消息。
實現方案
(該方案主要解決worker掛掉的情況)
- 維護兩個隊列:pending隊列和doing表(hash表)。
- workers定義為ThreadPool。
- 由pending隊列出隊后,workers分配一個線程(單個worker)去處理消息——給目標消息append一個當前時間戳和當前線程名稱,將其寫入doing表,然后該worker去消費消息,完成后自行在doing表擦除信息。
- 啟用一個定時任務,每隔一段時間去掃描doing隊列,檢查每隔元素的時間戳,如果超時,則由worker的ThreadPoolExecutor去檢查線程是否存在,如果存在則取消當前任務執行,並把事務rollback。最后把該任務從doing隊列中pop出,再重新push進pending隊列。
- 在worker的某線程中,如果處理業務失敗,則主動回滾,並把任務從doing隊列中移除,重新push進pending隊列。
總結
Redis作為消息隊列是有很大局限性的。因為其主要特性及用途決定它只能實現輕量級的消息隊列。寫在最后:沒有絕對好的技術,只有對業務最友好的技術,謹此獻給所有developer。
