“一切都是消息”--iMSF(即時消息服務框架)之【發布-訂閱】模式


MSF的名字是 Message Service Framework 的簡稱,由於目前框架主要功能在於處理即時(immediately)消息,所以iMSF就是 immediately Message Service Framework,中文名稱:即時消息服務框架,它是PDF.NET框架的一部分。
在后續的文章中,iMSF跟MSF是一個意思,或者你也可以給它取一個好聽的中文名稱:愛美XX :)

在上一篇,“一切都是消息”--MSF(消息服務框架)之【請求-響應】模式 ,我們演示了MSF實現簡單的請求-響應模式的示例,今天來看看如何實現【發布-訂閱】模式。簡單來說,該模式的工作過程是:

客戶端發起訂閱--》服務器接受訂閱--》服務器處理被訂閱的服務方法--》 服務器將處理結果推送給客戶端--》客戶端收到消息--》客戶端關閉訂閱連接

 

MSF的【發布-訂閱】通信模式,支持2種模式,分別是:

一、定時推送模式

這是最普通最常見的推送模式,只要客戶端訂閱了MSF的服務,服務器會每隔一秒向客戶端推送一次服務處理結果。在下面的示例中,我們先來演示一個簡單的“服務器時間服務”的功能。

1.1,編寫“時間服務”

在TestService項目添加一個類文件 TimeService.cs ,其代碼如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace TestService
{
    public class TimeService:ServiceBase
    {
        public DateTime ServerTime()
        {
            return DateTime.Now;
        }
    }
}

注意:今天我們這個MSF服務類TimeService 集成的不是前一篇說的IService接口,而是 ServiceBase 抽象類,實際上它也是實現了IService接口的類,這樣可以讓我們的服務類代碼更簡單。

別忘了,在IOC配置文件 IOCConfig.xml 注冊我們新添加的服務:

<IOC Name="TestService">
     <Add Key="TestTimeService" InterfaceName="IService" FullClassName="TestService.TimeService" Assembly="TestService" />
     <!-- 其它略 --> 
</IOC>

該配置需要注意3點:

  1. 雖然TimeService 繼承的是ServiceBase 對象,但在這里配置 InterfaceName的時候,仍然使用 IService
  2. Key="TestTimeService" 而不是 Key="TimeService" ,實際上這里配置的Key 可以是任意名字,只要跟配置文件中其它Key的值不重復即可
  3. 調用服務的時候,ServiceRequest 對象的 ServiceName 屬性指定的服務名稱,是這里配置的Key的值,而不是MSF服務類的類名

 

1.2,在TestClient 項目添加訂閱服務的代碼:

在訂閱前,我們可以直接請求下上面的【服務器時間】服務,測試下服務是否可行:

DateTime serverTime = client.RequestServiceAsync<DateTime>("Service://TestTimeService/ServerTime/", 
                PWMIS.EnterpriseFramework.Common.DataType.DateTime).Result;
Console.WriteLine("MSF Get Server Time:{0}", serverTime);

 測試成功,下面繼續編寫訂閱模式的代碼:

            ServiceRequest request3 = new ServiceRequest();
            request3.ServiceName = "TestTimeService";
            request3.MethodName = "ServerTime";
            int count = 0;
            client.Subscribe<DateTime>(request3, 
                PWMIS.EnterpriseFramework.Common.DataType.DateTime, 
                s => 
                {
                    if (s.Succeed)
                    {
                        Console.WriteLine("MSF Server Time:{0}", s.Result);
                      
                    }
                    else
                    {
                        Console.WriteLine("MSF Server Error:{0}", s.ErrorMessage);
                    }
                    count++;
                    if (count > 10)
                    {
                        client.Close();
                        Console.WriteLine("訂閱【服務器時鍾服務】結束。按回車鍵繼續。");
                    }
                });

與請求模式不同,客戶端要使用訂閱模式,只需要將服務代理類的 RequestService 方法替換成 Subscribe 方法,該方法的第一個泛型參數類型表示訂閱的結果的類型。

由於是訂閱模式, Subscribe 不提供Async的同名方法,因為服務器會多次向客戶端推送訂閱的結果,何時訂閱結束,可以由客戶端來決定,在客戶端提供的服務端回調方法內來關閉訂閱的連接即可。所以Subscribe 方法的下一行代碼會立即執行,無法實現RequestServiceAsync 這種“同步”效果。

在當前示例中,服務端會向客戶端推送10次服務器時間,然后客戶端會關閉訂閱連接。假如客戶端不關閉訂閱連接,服務器會一直向客戶端推送訂閱結果,每秒推送一次。

下面是這個示例的運行結果:

MSF Server Time:2017-10-11 10:33:48
MSF Server Time:2017-10-11 10:33:49
MSF Server Time:2017-10-11 10:33:50
MSF Server Time:2017-10-11 10:33:51
MSF Server Time:2017-10-11 10:33:52
MSF Server Time:2017-10-11 10:33:53
MSF Server Time:2017-10-11 10:33:54
MSF Server Time:2017-10-11 10:33:55
MSF Server Time:2017-10-11 10:33:56
MSF Server Time:2017-10-11 10:33:58
MSF Server Time:2017-10-11 10:33:59
訂閱【服務器時鍾服務】結束。按回車鍵繼續。

 1.3,改變推送頻率

默認情況下,定時推送模式是每秒推送一次,你可以在定義方法中調用基類的方法來修改它,具體代碼略。

二、事件推送模式

有時候我們並不需要固定間隔時間(例如每秒)調用服務方法然后將處理結果推送給客戶端,而是在某個特定的時間才向客戶端推送訂閱的服務結果,這個需求可以在服務端實現一個定時器,在時間到了后才推送,或者,進行某項業務處理過程,滿足某項業務條件后,觸發一個業務事件,在這個業務事件中,將訂閱的結果推送給客戶端。

定時器處理的是它觸發的事件,業務處理過程也可以觸發某種業務操作事件,所以這種推送模式,就是“事件推送模式”,跟前面的“定時推送模式”是完全不同的模式,在事件推送模式中,看起來是將服務端的事件,推送到客戶端訂閱的方法里面去了,事件的實際處理,到了客戶端,因此,事件推送模式,也是一種“分布式事件”處理模式。

下面我們來實現一個“鬧鈴服務”,客戶端訂閱此鬧鈴服務,指定響鈴的時間和響鈴的次數,服務端的鬧鈴到了指定時間,就會向客戶端推送“鬧鈴服務”:“鬧鈴響了”,一直推送到客戶端指定的次數為止。

與定時推送不同的是,事件推送模式,要求被訂閱的方法,返回 ServiceEventSource 類型,它表示一個事件源對象,請看下面的鬧鍾服務示例。

2.1,編寫鬧鍾服務

在TestService項目添加鬧鍾服務類文件 AlarmClockService.cs,其代碼如下:

 public class AlarmClockService:ServiceBase
    {
        System.Timers.Timer timer;
        DateTime AlarmTime;
        int AlarmCount;
        int MaxAlarmCount;

        public event EventHandler Alarming;

        public AlarmClockService()
        {
            timer = new System.Timers.Timer();
            timer.Interval = 10000;
            timer.Elapsed += timer_Elapsed;
        }

        void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
        {
            if (e.SignalTime >= this.AlarmTime)
            {
                if (Alarming != null)
                    Alarming(this, new EventArgs());

                base.CurrentContext.PublishData(DateTime.Now); //e.SignalTime
                AlarmCount++;
                Console.WriteLine("AlarmClockService Publish Count:{0}", AlarmCount);
            }
            else
            {
                Console.WriteLine("Alarm Time:{0},AlarmClock waiting...",this.AlarmTime);
            }
            if (AlarmCount > MaxAlarmCount)
            {
                timer.Stop();
                //推送一個結束標記值:1900-1-1
                base.CurrentContext.PublishData(new DateTime(1900, 1, 1));
                Console.WriteLine("[{0}] AlarmClockService Timer Stoped. ", new DateTime(1900,1,1));
                base.CurrentContext.PublishEventSource.DeActive();
            }
        }


        public ServiceEventSource SetAlarmTime(AlarmClockParameter para)
        {
            this.MaxAlarmCount = para.AlarmCount;
            this.AlarmTime = para.AlarmTime;
            return new ServiceEventSource(timer, 2, () =>
            {
                //要初始化執行的代碼或者方法
                AlarmCount = 0;
                timer.Start();
                //如果上面的代碼是一個執行時間比較長的方法,但又不知道何時執行完成,
                //並且不想等待超時回收服務對象,而是在執行完成后立即回收服務對象,可以調用下面的代碼:
                //CurrentContext.PublishEventSource.DeActive();
                //注意:調用DeActive 方法后將會停止事件推送,所以請注意此方法調用的時機。

                //下面代碼僅做測試,查看服務事件源對象的活動生命周期
                //在 ActiveLife 時間之后,一直沒有事件推送,則事件源對象被視為非活動狀態,發布工作線程會被回收。
                //在本例中,ActiveLife 為ServiceEventSource 構造函數的第二個參數,值為 2分鍾,可以通過下面一行代碼證實:
                int life = base.CurrentContext.PublishEventSource.ActiveLife;

                //如果上面執行的是一個執行時間比較長的方法,並且有返回值,想將返回值也推送給訂閱端,可以再次執行CurrentContext.PublishData
                //CurrentContext.PublishData(DateTime.Now);

                //如果事件推送結束,需要設置事件源為非活動狀態,否則,需要等待 ActiveLife 時間之后自然過期成為非活動狀態。
                //如果你無法確定事件推送何時結束,請不要調用下面的方法
                //CurrentContext.PublishEventSource.DeActive();
            });
        }
    }

注意:

跟上面一樣,不要忘記了在IOCConfig.xml文件注冊此鬧鍾服務。


鬧鍾服務的類中有一個定時器對象,當訂閱鬧鍾服務的 SetAlarmTime 方法的時候,會給鬧鍾服務傳入必要的參數以便鬧鍾工作,參數類AlarmClockParameter 定義在 TestDto項目中,其代碼如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace TestDto
{
    public class AlarmClockParameter
    {
        /// <summary>
        /// 響鈴時間
        /// </summary>
        public DateTime AlarmTime { get; set; }
        /// <summary>
        /// 響鈴次數
        /// </summary>
        public int AlarmCount { get; set; }
    }
}

2.2,編寫鬧鈴服務訂閱客戶端

            AlarmClockParameter acp = new AlarmClockParameter();
            acp.AlarmCount = 10;
            acp.AlarmTime = alarmTime;

            ServiceRequest request4 = new ServiceRequest();
            request4.ServiceName = "AlarmClockService";
            request4.MethodName = "SetAlarmTime";
            request4.Parameters = new object[] { acp };

            client.Subscribe<DateTime>(request4,
                  PWMIS.EnterpriseFramework.Common.DataType.DateTime, 
                  s =>
                  {
                      if (s.Succeed)
                      {
                          Console.WriteLine("鬧鍾響了,現在時間:{0}", s.Result);
                          if (s.Result == new DateTime(1900, 1, 1))
                          {
                              client.Close();
                              Console.WriteLine("鬧鈴服務結束,按回車鍵繼續。");
                          }
                      }
                      else
                      {
                          Console.WriteLine("MSF Server Error:{0}", s.ErrorMessage);
                          client.Close();
                      }
              });

這個訂閱客戶端,像前面訂閱服務器時間一樣,沒有區別,這里不多解釋。

2.3,注冊iMSF服務方法的參數類

運行此服務端和客戶端,發現客戶端輸出了下面的異常信息:

---處理服務時錯誤:系統不能處理當前類型的參數:TestDto.AlarmClockParameter

這個消息是前面服務代理類的錯誤處理事件輸出的結果:

  Proxy client = new Proxy();
   client.ErrorMessage += client_ErrorMessage;

   static void client_ErrorMessage(object sender, MessageSubscriber.MessageEventArgs e)
        {
            Console.WriteLine("---處理服務時錯誤:{0}",e.MessageText);
        }

現在我們去看MSF Host控制台輸出的相信錯誤信息:

[2017-10-11 09:12:23.736]訂閱消息-- From: 127.0.0.1:57822
[2017-10-11 09:12:23.752]正在處理服務請求--From: 127.0.0.1:57822,Identity:WMI2114256838
>>[PMID:1]Publish://AlarmClockService/SetAlarmTime/TestDto.AlarmClockParameter=TestDto.AlarmClockParameter
[2017-10-11 09:12:23]處理服務的時候發生異常:執行服務方法錯誤:
源錯誤信息:系統不能處理當前類型的參數:TestDto.AlarmClockParameter,
請求的Uri:
Publish://AlarmClockService/SetAlarmTime/TestDto.AlarmClockParameter=TestDto.AlarmClockParameter,
127.0.0.1:57822,WMI2114256838

錯誤發生時的異常對象調用堆棧:
System.ArgumentException: 系統不能處理當前類型的參數:TestDto.AlarmClockParameter
[2017-10-11 09:12:23.767]請求處理完畢(15.6339ms)--To: 127.0.0.1:57822,Identity:WMI2114256838
>>[PMID:1]消息長度:63字節 -------
result:Service_Execute_Error:系統不能處理當前類型的參數:TestDto.AlarmClockParameter
Publish Message OK.

這說明MSF服務端不識別當前調用的服務方法上的參數類型 TestDto.AlarmClockParameter ,這里需要將這個自定義的參數類型注冊到MSF的IOC配置文件上:

 <IOC Name="ServiceModel">
      <Add Key="AlarmClockParameter"  InterfaceName=""  FullClassName="TestDto.AlarmClockParameter" Assembly="TestDto" />
      <!-- 其它略-->
 </IOC>

 注意:服務訪問需要的自定義參數類型,必須注冊在 ServiceModel 節點下。

2.4,運行訂閱服務

如果前面的配置都正確了,我們重新生成項目,啟動MS Host 和TestClient,就可以看到客戶端輸出的結果了:

請輸入鬧鈴響鈴時間(示例輸入格式 11:54) >>11:55
訂閱鬧鍾服務,鬧鍾將在 11:55 響鈴...
鬧鍾響了,現在時間:2017-10-11 11:55:09
鬧鍾響了,現在時間:2017-10-11 11:55:19
鬧鍾響了,現在時間:2017-10-11 11:55:29
鬧鍾響了,現在時間:2017-10-11 11:55:39
鬧鍾響了,現在時間:2017-10-11 11:55:49
鬧鍾響了,現在時間:2017-10-11 11:55:59
鬧鍾響了,現在時間:2017-10-11 11:56:09
鬧鍾響了,現在時間:2017-10-11 11:56:19
鬧鍾響了,現在時間:2017-10-11 11:56:29
鬧鍾響了,現在時間:2017-10-11 11:56:39
鬧鍾響了,現在時間:2017-10-11 11:56:49
鬧鍾響了,現在時間:1900-1-1 0:00:00
鬧鈴服務結束,按回車鍵繼續。

在客戶端控制台輸入鬧鈴時間,我們看到在時間到了后,服務器才向客戶端推送了“響鈴通知”消息,客戶端處理這個事件將結果打印在屏幕上。

三、iMSF的Actor模式

在MSF的入門篇介紹中,我們說MSF具有實現Actor編程模型的能力,在MSF中,每一個被訂閱的服務,它本質上都是一個分布式的Actor對象,這些Actor對象在第一次被訂閱的時候激活,一直到超過一定時間沒有任何消息推送的時候為止,在Actor生存期間沒有任何客戶端訂閱的情況下也會繼續工作。

對於同一個MSF服務類下的服務方法,當我們以訂閱的方式激活此Actor的時候,是以被訂閱的服務方法的參數來區分的,簡單說,就是訂閱的服務方法參數一樣,那么多個客戶端訂閱的都是同一個MSF的服務對象實例。

這個現象,可以通過本篇的“鬧鍾服務”訂閱過程來驗證,在第一個客戶端訂閱鬧鍾服務后,啟動第二個TestClient程序,也來訂閱鬧鍾服務,注意,2個進程訂閱的鬧鍾服務,它的鬧鈴時間設置為一樣。訂閱后,我們發現,即使第一個訂閱客戶端已經開始收到服務器的“鬧鈴消息”推送,第二個訂閱客戶端加入進來后,可以馬上收到同樣的消息推送,這說明,兩個客戶端訂閱的是同一個MSF的服務對象,也就是同一個Actor對象。我們注意觀察 MSF Host的屏幕輸出,也能驗證這個結果,它會提示消息發送給了2個客戶端,具體過程,大家可以去仔細看看,本篇不再說明。下面是效果圖:

 

---------------------------分界線------------------------------------------------------------------------

歡迎加入我們的QQ群討論MSF框架的使用,群號:敏思(PWMIS) .NET 18215717,加群請注明:PDF.NET技術交流,否則可能被拒。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM