前言
上一篇博客上已經實現了使用EventBus對具體事件行為的分發處理,某種程度上也算是基於事件驅動思想編程了。但是如上篇博客結尾處一樣,我們源碼的執行效率依然達不到心里預期。在下單流程里我們明顯可以將部分行為進行異步處理,提升下單操作的執行效率。
Redis基礎命令
Redis有兩種方式可支持我們實現MQ功能,1、使用列表(List)相關命令特性;2、使用publish、subscribe命令特性;
這里我是采取列表相關命令實現。
使用列表(List)相關命令的特性實現
- 壓入數據(發布消息)
使用列表(List)的LPUSHRPUSH命令可以從列表左邊和右邊壓入數據;
LPUSH
將一個或多個值插入到列表頭部(此處可以將列表想象成一個從左到右的鏈表數據結構,LPUSH就是將指定的值插入最左側!)
如下命令,將多個元素壓入list1的頭部(最左側)
LPUSH list1 測試1 測試2
執行結果如下:

上面是寫入多個元素,我們也可以寫入單個元素
LPUSH list1 測試3

需要留意,每次執行完LPUSH后,Redis會返回當前列表的長度。
RPUSH
在指定列表的尾部(相當於一個鏈表的最右側)添加單個或多個元素
如下命令,還是在list1上添加多個元素,並查看執行后的list1元素信息
RPUSH list 測試4 測試5

同理,RPUSH也可直接寫入單個元素,和LPUSH一樣。
- 拉取數據(消費數據)
這里的拉取數據不單單是讀取List內的元素,而是將元素從列表中取出來
BLPOP
移出並獲取列表的第一個元素(從左至右), 如果列表沒有元素會阻塞當前線程,直到等待超時或發現可彈出元素為止。
如下命令,從list1這個列表獲取從左至右第一個元素,在100秒內如果獲取則結束阻塞,否則阻塞到100秒之后。
BLPOP list1 100
執行結果如下:

需要留意的是BLPOP命令如果拉取到數據則會返回兩行數據,1行為列表的key名稱,1行為獲取到的元素值。如果直到阻塞結束都沒有獲取到元素值則直接返回命令執行超時。如下圖:

BRPOP
移出並獲取列表的最后一個元素(從左至右), 如果列表沒有元素會阻塞當前線程直到等待超時或發現可彈出元素為止。該命令與BLPOP除了獲取的元素位置不同,其他特性全部一致。
LPOP
移出並獲取列表的第一個元素(從左至右),如獲取到元素則返回元素信息,沒有元素則立即返回null。
如下命令:
LPOP list1

RPOP
移出並獲取列表的最后一個元素(從左至右),如獲取到元素則返回元素信息,沒有元素則立即返回null。該命令與LPOP除了獲取的元素位置不同其他特性全部一致;
RPOPLPUSH
移除列表的最后一個元素(最右側的元素),並將該元素添加到另一個列表並返回。該命令如獲取到元素則返回元素信息,否則返回錯誤信息。
可以通過RPOPLPUSH這個命令的特性對MQ內一致性要求較高的業務進行處理,在從列表獲取元素成功后將該元素添加到一個備份列表,在業務處理完畢后再從備份列表將該元素刪除。
執行下面命令測試下:
RPOPLPUSH list1 listback

BRPOPLPUSH
從列表中彈出一個值,將彈出的元素插入到另外一個列表中並返回它; 如果列表沒有元素會阻塞列表直到等待超時或發現可彈出元素為止。
該命令其實就是在BRPOP的基礎上將LPUSH的功能加上了,依舊也保留了指定超時時間內未獲取到元素則阻塞線程。
執行下面命令測試下:
BRPOPLPUSH list1 listback 10
執行結果如下:

完善代碼
基於上面Redis的相關命令,我們再完善下上篇博客的代碼。這里我們需要新增一個控制台啟動項,將它作為消費服務,原來的控制台即訂單保存的控制台作為消息發布的服務。
下單代碼更改為下面的樣子:
/// <summary>
/// 異步方式觸發訂單相關事件
/// </summary>
public static void AsynEventHandle()
{
Guid userId = Guid.NewGuid();
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
var order = new OrderModel()
{
CreateTime = DateTime.Now,
Id = Guid.NewGuid(),
Money = (decimal)300.00,
Number = 1,
ProductName = "鮮花一束",
UserId = userId
};
Console.WriteLine($"模擬存儲訂單【采取Redis做消息隊列的異步方式】");
Thread.Sleep(1000);
FullRedis fullRedis = new FullRedis("127.0.0.1:6379", "", 1);
//這里嘗試過使用redis 的訂閱發布模式,在執行發布命令時候發現值但凡出現空格或者"符號則會異常...
fullRedis.LPUSH("orders", new OrderModel[] { order });
stopwatch.Stop();
Console.WriteLine($"下單總耗時:{stopwatch.ElapsedMilliseconds}毫秒");
Console.ReadLine();
}
可以看到,我們已經將事件總線相關代碼給移除了,上面代碼除了向Redis的隊列(List)里寫入元素外就只是對訂單進行了持久化動作,所以看代碼就知道執行效率的提升了。
接下來,看消費服務的代碼。
static void Main(string[] args)
{
XTrace.UseConsole();
Console.WriteLine("進入Redis消息訂閱者模式訂單消息推送訂閱者客戶端!");
EventBus eventBus = new EventBus();
eventBus.EventRegister(typeof(OrderCreateEventNotifyHandle), typeof(OrderCreateEventData));
eventBus.EventRegister(typeof(OrderCreateEventStockLockHandle), typeof(OrderCreateEventData));
FullRedis fullRedis = new FullRedis("127.0.0.1:6379", "", 1);
fullRedis.Log = XTrace.Log;
fullRedis.Timeout = 30000;
OrderModel order = null;
while (order == null)
{
order = fullRedis.BLPOP<OrderModel>("orders", 20);
if (order != null)
{
Console.WriteLine($"得到訂單信息:{JsonConvert.SerializeObject(order)}");
//執行相關事件
eventBus.Trigger(new OrderCreateEventData()
{
Order = order,
});
//再次設置為null方便循環讀取
order = null;
}
}
Console.ReadLine();
}
消費服務首先從Redis里通過BLPOP從orders列表中獲取元素,再觸發事件總線,執行訂單保存相關業務處理。
最終看下執行效率如何?
消息發布的執行效率(訂單保存)

消息消費

可以看到目前消息發布的執行效率下單總耗時間為1170毫秒,我們再改為同步的測試下結果:

可以看到,同步執行的結果是3035毫秒。
小結
兩種方式相差了將近2000毫秒~ 而且后續如果再繼續擴展訂單存儲相關處理的話同步執行的響應時間會更加拉長,而采取Redis MQ的方式配合事件總線我們可以將整個業務拆分為獨立的應用,采取分布式的方式提高響應效率,同時事件總線的加入方便我們后續業務的擴展。
消息發布端將訂單信息寫入到列表后如果消息消費者在拉取到數據后業務執行過程中代碼出現異常導致無法滿足業務的完整性如何處理
答:可以使用上述Redis命令中的RPOPLPUSH或BRPOPLPUSH在拉取元素后寫入到一個備份的列表中,在我們的邏輯代碼執行完畢后在將備份列表中的該元素值移除。
上述代碼已發布到Github,有需要的自行下載。
地址為:https://github.com/QQ897878763/OrderRedisSample
