一、引言
在上一專題中,商家發貨和用戶確認收貨功能引入了消息隊列來實現的,引入消息隊列的好處可以保證消息的順序處理,並且具有良好的可擴展性。但是上一專題消息隊列是基於內存中隊列對象來實現,這樣實現有一個弊端,就是一旦服務重啟或出現故障時,此時消息隊列中的消息會丟失,並且也記錄不了日志。所以就會出現,商家發貨成功后,用戶並沒有收到郵件通知,並且也沒有日志讓我們發現是否發送了郵件通知。為了解決這個問題,就需要引入一種可恢復的消息隊列。目前有很多開源的消息隊列都支持可恢復的,例如TibcoEms.net等。然而,微軟的MSMQ也是支持這種特性的。並且MSMQ還支持分布式部署,關於MSMQ更多內容可以參考:http://www.cnblogs.com/zhili/p/MSMQ.html
在本專題中將介紹為網上書店案例引入分布式消息隊列和分布式緩存的實現。
二、分布式消息隊列的實現
MSMQ的實現原理是:消息的發送者把自己想要發送的信息放入一個容器,然后把它保存到一個系統公用空間的消息隊列中,本地或異地的消息接收程序再從該隊列中取出發給它的消息進行處理。所以,即使服務器突然重啟,消息也會存在於系統公用空間的消息隊列中,待服務器重新啟動后,可以繼續接受消息進行處理,從而解決上一專題存在的問題。另外,上一專題的消息隊列只能被用在當前服務器中,而MSMQ支持分布式部署,不同機器都可以對MSMQ進行接收消息來處理,此時MSMQ起到一個中間件的作用。
在為網上書店引入分布式消息隊列之前,讓我們先理一下實現思路:
- 上一專題中把發貨事件和收貨事件發布到EventBus中,而此時需要用MsmqEventBus來替代EventBus。而MsmqEventBus的實現就很簡單了,完全可以參考EventBus來實現,只是此時消息並不是進入Queue對象中,而是通過MessageQueue對象發送到系統的消息隊列中。
- 而Commit方法即從系統的消息隊列中出隊來獲得消息。再獲得消息的處理器時,與上一專題的實現有點不同,因為把事件對象發送到消息隊列時,需要先把事件對象先序列化為Message對象再放入消息隊列中,而出隊的也是消息對象,而不是上一專題中的發貨事件對象。所以此時需要把出隊的消息對象反序列化為對應的事件對象。
有了上面的實現思路,接下來讓我們一起看看MsmqEventBus的具體實現代碼吧。
public class MsmqEventBus : DisposableObject, IEventBus { public void Publish<TMessage>(TMessage message) where TMessage : class, IEvent { // 將消息放入Message中Body屬性進行序列化發送到消息隊列中 var msmqMessage = new Message(message) { Formatter = new XmlMessageFormatter(new[] { message.GetType() }), Label = message.GetType().ToString()}; _messageQueue.Send(msmqMessage); _committed = false; } public void Publish<TMessage>(IEnumerable<TMessage> messages) where TMessage : class, IEvent { messages.ToList().ForEach(m => { _messageQueue.Send(m); _committed = false; }); } public void Commit() { if (this._useInternalTransaction) { using (var transaction = new MessageQueueTransaction()) { try { transaction.Begin(); var message = _messageQueue.Receive(); if (message != null) { message.Formatter = new XmlMessageFormatter(new[] { typeof(string) }); var evntType = ConvertStringToType(message.Body.ToString()); var method = _publishMethod.MakeGenericMethod(evntType); var evnt = Activator.CreateInstance(evntType); method.Invoke(_aggregator, new object[] { evnt }); transaction.Commit(); } } catch { transaction.Abort(); throw; } } } else { // 從msmq消息隊列中出隊,此時獲得的對象是消息對象 var message = _messageQueue.Receive(); if (message != null) { // 指定反序列化的對象,由於我們之前把對應的事件類型保存在MessageQueue中的Label屬性 // 所以此時可以通過Label屬性來獲得目標序列化類型 message.Formatter = new XmlMessageFormatter(new[] { ConvertStringToType(message.Label) }); // 這樣message.Body獲得就是對應的事件對象,后面的處理邏輯就和EventBus一樣了 var evntType =message.Body.GetType(); var method = _publishMethod.MakeGenericMethod(evntType); method.Invoke(_aggregator, new object[] { message.Body }); } } _committed = true; } }
結合上面代碼的注釋和前面實現思路的介紹,相信理解MsmqEventBus應該沒什么問題了。接下來,我們需要在配置文件中指定EventBus為MsmqEventBus類,另外需要在你本地專有隊列中創建"OnlineStoreQueue"隊列來接受消息。具體的配置文件修改為:
<!--Event Bus--> <!--<register type="OnlineStore.Events.Bus.IEventBus, OnlineStore.Events" mapTo="OnlineStore.Events.Bus.EventBus, OnlineStore.Events"> <lifetime type="singleton" /> </register>--> <!--注入MsmqEventBus--> <register type="OnlineStore.Events.Bus.IEventBus, OnlineStore.Events" mapTo="OnlineStore.Events.Bus.MsmqEventBus, OnlineStore.Events"> <lifetime type="singleton" /> <constructor> <param name="path" value=".\Private$\OnlineStoreQueue" /> </constructor> </register> </container>
到此,分布式消息隊列的實現就完成了,具體分布式消息隊列的實現效果和上一專題使用EventBus的效果是一樣的,這里就不再貼圖了,大家可以自行下載源碼查看。
三、緩存的實現
在實際開發過程中,緩存的實現是必不可少的,對於已經查詢過的數據可以直接從緩存中進行讀取返回給調用者,利用緩存不但可以加快響應速度,還能減輕數據庫服務器的壓力。在大型電子商務網站中,緩存的實現更是必不可少的功能。然而緩存的實現也有兩種,一種是分布式緩存,另一種本地緩存。在大型網站中,更多實現的是分布式緩存,對於一些少用戶的企業系統,可能才會使用到本地緩存。所以在本專題中,將在網上書店案例中對這兩種緩存分別進行實現。
3.1 本地緩存的實現
首先,我們來介紹本地緩存的實現。由於這里需要實現兩種緩存,根據面向接口編程原則,我們自然首先需要定義一個緩存接口,然后這兩種具體緩存都需要實現該接口。針對緩存接口,無非是緩存數據的添加,移除,更新等操作,所以緩存接口的定義如下所示:
// 緩存接口的定義 public interface ICacheProvider { /// <summary> /// 向緩存中添加一個對象 /// </summary> /// <param name="key">緩存的鍵值</param> /// <param name="valueKey">緩存值的鍵值</param> /// <param name="value">緩存的對象</param> void Add(string key, string valueKey, object value); void Update(string key, string valueKey, object value); object Get(string key, string valueKey); void Remove(string key); bool Exists(string key); bool Exists(string key, string valueKey); }
在介紹本地緩存的實現之前,讓我們先來思考下本地緩存的實現思路——就是在本地緩存類中定義一個字典對象,添加緩存就是往該字典插入鍵值對而已,其中key就是緩存數據對應的鍵值,value就是真正的緩存數據,如果緩存在字典中存在的話,就直接根據鍵值查找出緩存數據進行返回。
然而網上書店的本地緩存是基於Enterprise Library Caching庫來實現的,其實現思路和我之前介紹的思路也是一樣的,只不過此時字典對象不需要我們在類中定義,此時直接用Enterprise Library Caching庫中定義的就好。有了上面的分析,本地緩存的實現理解起來也就不那么難了,具體本地緩存的實現代碼如下所示:
// 表示基於Microsoft Patterns & Practices - Enterprise Library Caching Application Block的緩存機制的實現 // 該類簡單理解為對Enterprise Library Caching中的CacheManager封裝 // 該緩存實現不支持分布式緩存,更多信息參考: // http://stackoverflow.com/questions/7799664/enterpriselibrary-caching-in-load-balance public class EntLibCacheProvider : ICacheProvider { // 獲得CacheManager實例,該實例的注冊通過cachingConfiguration進行注冊進去的,具體看配置文件 private readonly ICacheManager _cacheManager = CacheFactory.GetCacheManager(); #region ICahceProvider public void Add(string key, string valueKey, object value) { Dictionary<string, object> dict = null; if (_cacheManager.Contains(key)) { dict = (Dictionary<string, object>) _cacheManager[key]; dict[valueKey] = value; } else { dict = new Dictionary<string, object> { { valueKey, value }}; } _cacheManager.Add(key, dict); } public void Update(string key, string valueKey, object value) { Add(key, valueKey, value); } public object Get(string key, string valueKey) { if (!_cacheManager.Contains(key)) return null; var dict = (Dictionary<string, object>)_cacheManager[key]; if (dict != null && dict.ContainsKey(valueKey)) return dict[valueKey]; else return null; } // 從緩存中移除對象 public void Remove(string key) { _cacheManager.Remove(key); } // 判斷指定的鍵值的緩存是否存在 public bool Exists(string key) { return _cacheManager.Contains(key); } // 判斷指定的鍵值和緩存鍵值的緩存是否存在 public bool Exists(string key, string valueKey) { return _cacheManager.Contains(key) && ((Dictionary<string, object>)_cacheManager[key]).ContainsKey(valueKey); } #endregion }
到此,網上書店案例中本地緩存的實現就完成了。由於本地緩存不支持分布式部署,所有的緩存都存在於單獨緩存服務器中,然而,針對一些大型網站來說,這樣的實現並不適合,因為在大型網站中,需要通過多個緩存服務進行集群,需要使得緩存均勻分布在集群中的緩存服務器中。此時就需要引入分布式緩存的實現。下面讓我們具體看看分布式緩存如何在該案例中實現。
3.2 分布式緩存的實現
分布式緩存可以通過具體的算法把緩存均勻地分布在集群中緩存服務器中,從而用戶請求的不同數據可以路由到對應的緩存服務器中進行添加、更新或獲得。分布式緩存的實現有很多種方式,可以利用Memcached和Redis開源庫來實現。然而,微軟的Windows Azure也提供了分布式緩存的實現,本案例中分布式緩存就是基於Windows Azure的。在對分布式緩存實現之前,需要先下載對應的dll,然后再在項目中進行引用。需要下載的dll已經包含在項目根目錄下的libs文件夾下,具體需要下載的程序集截圖如下所示:
然后在基礎設施層引入這些程序集,之前就可以去實現基於Windows Azure的分布式緩存了。具體的實現代碼如下所示:
// 分布式緩存,該類是對微軟分布式緩存服務的封裝 // 在該案例中沒用用到該緩存,但是提供在這里讓大家明白微軟的分布式緩存實現,並不是只有memcached和Redis // Redis參考:http://www.cnblogs.com/ceecy/p/3279407.html 和 http://blog.csdn.net/suifeng3051/article/details/23739295 // 關於微軟分布式緩存更多介紹參考:http://www.cnblogs.com/shanyou/archive/2010/06/29/AppFabricCaching.html // 和http://www.cnblogs.com/mlj322/archive/2010/04/05/1704624.html public class AppfabricCacheProvider : ICacheProvider { private readonly DataCacheFactory _factory = new DataCacheFactory(); private readonly DataCache _cache; public AppfabricCacheProvider() { this._cache = _factory.GetDefaultCache(); } #region ICacheProvider Members public void Add(string key, string valueKey, object value) { // DataCache中不包含Contain方法,所有用Get方法來判斷對應的key值是否在緩存中存在 var val = (Dictionary<string, object>)_cache.Get(key); if (val == null) { val = new Dictionary<string, object> {{ valueKey, value}}; _cache.Add(key, val); } else { if (!val.ContainsKey(valueKey)) val.Add(valueKey, value); else val[valueKey] = value; _cache.Put(key, val); } } public void Update(string key, string valueKey, object value) { Add(key, valueKey, value); } public object Get(string key, string valueKey) { return Exists(key, valueKey) ? ((Dictionary<string, object>)_cache.Get(key))[valueKey] : null; } public void Remove(string key) { _cache.Remove(key); } public bool Exists(string key) { return _cache.Get(key) != null; } public bool Exists(string key, string valueKey) { var val = _cache.Get(key); if (val == null) return false; return ((Dictionary<string, object>)val).ContainsKey(valueKey); } #endregion }
通過上面的步驟,分布式緩存的實現就完成了。其實,分布式緩存和本地緩存不同之處就在於:分布式緩存支持對應的算法可以把緩存存放在不同的服務器上,而本地緩存只能存在於本地,而不能跨機器分布。所以對於大型網站,分布式緩存才是最好的選擇,由於分布式緩存的實現和部署,無疑會增加開發和維護成本,所以對於一些小型系統(指定是單數據庫服務器系統),可以考慮使用本地緩存。
在本案例中,由於本人沒有Windows Azure環境,所以對於分布式緩存的實現也不能進行測試,所以本案例中使用的還是本地緩存。要使緩存生效,還需要對配置文件進行修改。具體配置文件修改為:
<unity xmlns="http://schemas.microsoft.com/practices/2010/unity"> <sectionExtension type="Microsoft.Practices.Unity.InterceptionExtension.Configuration.InterceptionConfigurationExtension, Microsoft.Practices.Unity.Interception.Configuration" /> <container> <extension type="Interception" /> <!--Cache Provider--> <register type="OnlineStore.Infrastructure.Caching.ICacheProvider, OnlineStore.Infrastructure" mapTo="OnlineStore.Infrastructure.Caching.EntLibCacheProvider, OnlineStore.Infrastructure" /> <!--........--> </container> </unity>
其實,通過上面的配置之后,緩存還是不能生效的,因為我們一般把緩存放在獲得數據方法之前進行調用,在用戶對獲得數據方法調用之前,首先從緩存中進行查找,如果存在,則直接返回緩存中的數據給調用者就可以了,如果不存在再調用獲得數據方法從數據庫中讀取,讀取成功后添加到緩存中再返回給調用者。既然要在方法調用前來查找緩存,從中你是否想到了什么呢?不錯,就是面向切面編程,即AOP。所以要讓緩存生效,在該案例中還需要支持AOP。至於AOP的支持,我將會在下一專題進行介紹。
四、總結
到這里,本專題的內容就結束了,正如前面所說的,在下一專題,我將在網上書店案例中引入對AOP的支持。
本專題所有源碼下載地址:https://github.com/lizhi5753186/OnlineStore_Second/