一個進程間同步和通訊的 C# 框架


轉自原文 一個進程間同步和通訊的 C# 框架

threadmsg_demo.zip ~ 41KB    下載
threadmsg_src.zip ~ 65KB    下載

 

0.背景簡介

微軟在 .NET 框架中提供了多種實用的線程同步手段,其中包括 monitor 類及 reader-writer鎖。但跨進程的同步方法還是非常欠缺。另外,目前也沒有方便的線程間及進程間傳遞消息的方法。例如C/S和SOA,又或者生產者/消費者模式中就常常需要傳遞消息。為此我編寫了一個獨立完整的框架,實現了跨線程和跨進程的同步和通訊。這框架內包含了信號量,信箱,內存映射文件,阻塞通道,及簡單消息流控制器等組件。這篇文章里提到的類同屬於一個開源的庫項目(BSD許可),你可以從這里下載到 www.cdrnet.net/projects/threadmsg/.

這個框架的目的是:

  1. 封裝性:通過MSMQ消息隊列發送消息的線程無需關心消息是發送到另一個線程還是另一台機器。
  2. 簡單性:向其他進程發送消息只需調用一個方法。

 

注意:我刪除了本文中全部代碼的XML注釋以節省空間。如果你想知道這些方法和參數的詳細信息,請參考附件中的代碼。


 

1.先看一個簡單例子

使用了這個庫后,跨進程的消息傳遞將變得非常簡單。我將用一個小例子來作示范:一個控制台程序,根據參數可以作為發送方也可以作為接收方運行。在發送程序里,你可以輸入一定的文本並發送到信箱內(返回key),接收程序將顯示所有從信箱內收到的消息。你可以運行無數個發送程序和接收程序,但是每個消息只會被具體的某一個接收程序所收到。

[Serializable]
struct Message
{
  public string Text; } class Test { IMailBox mail; public Test() { mail = new ProcessMailBox("TMProcessTest",1024); } public void RunWriter() { Console.WriteLine("Writer started"); Message msg; while(true) { msg.Text = Console.ReadLine(); if(msg.Text.Equals("exit")) break; mail.Content = msg; } } public void RunReader() { Console.WriteLine("Reader started"); while(true) { Message msg = (Message)mail.Content; Console.WriteLine(msg.Text); } } [STAThread] static void Main(string[] args) { Test test = new Test(); if(args.Length > 0) test.RunWriter(); else test.RunReader(); } }

信箱一旦創建之后(這上面代碼里是 ProcessMailBox ),接收消息只需要讀取 Content 屬性,發送消息只需要給這個屬性賦值。當沒有數據時,獲取消息將會阻塞當前線程;發送消息時如果信箱里已經有數據,則會阻塞當前線程。正是有了這個阻塞,整個程序是完全基於中斷的,並且不會過度占用CPU(不需要進行輪詢)。發送和接收的消息可以是任意支持序列化(Serializable)的類型。

然而,實際上暗地里發生的事情有點復雜:消息通過內存映射文件來傳遞,這是目前唯一的跨進程共享內存的方法,這個例子里我們只會在 pagefile 里面產生虛擬文件。對這個虛擬文件的訪問是通過 win32 信號量來確保同步的。消息首先序列化成二進制,然后再寫進該文件,這就是為什么需要聲明Serializable屬性。內存映射文件和 win32 信號量都需要調用 NT內核的方法。多得了 .NET 框架中的 Marshal 類,我們可以避免編寫不安全的代碼。我們將在下面討論更多的細節。


 

2. .NET里面的跨線程/進程同步

線程/進程間的通訊需要共享內存或者其他內建機制來發送/接收數據。即使是采用共享內存的方式,也還需要一組同步方法來允許並發訪問。

同一個進程內的所有線程都共享公共的邏輯地址空間(堆)。對於不同進程,從 win2000 開始就已經無法共享內存。然而,不同的進程可以讀寫同一個文件。WinAPI提供了多種系統調用方法來映射文件到進程的邏輯空間,及訪問系統內核對象(會話)指向的 pagefile 里面的虛擬文件。無論是共享堆,還是共享文件,並發訪問都有可能導致數據不一致。我們就這個問題簡單討論一下,該怎樣確保線程/進程調用的有序性及數據的一致性。

 

2.1 線程同步

.NET 框架和 C# 提供了方便直觀的線程同步方法,即 monitor 類和 lock 語句(本文將不會討論 .NET 框架的互斥量)。對於線程同步,雖然本文提供了其他方法,我們還是推薦使用 lock 語句。

void Work1() { NonCriticalSection1(); Monitor.Enter(this); try { CriticalSection(); } finally { Monitor.Exit(this); } NonCriticalSection2(); }
void Work2() {   NonCriticalSection1();   lock(this)   {     CriticalSection();   }   NonCriticalSection2(); } 

Work1 和 Work2 是等價的。在C#里面,很多人喜歡第二個方法,因為它更短,且不容易出錯。


 

2.2 跨線程信號量

信號量是經典的同步基本概念之一(由 Edsger Dijkstra 引入)。信號量是指一個有計數器及兩個操作的對象。它的兩個操作是:獲取(也叫P或者等待),釋放(也叫V或者收到信號)。信號量在獲取操作時如果計數器為0則阻塞,否則將計數器減一;在釋放時將計數器加一,且不會阻塞。雖然信號量的原理很簡單,但是實現起來有點麻煩。好在,內建的 monitor 類有阻塞特性,可以用來實現信號量。

public sealed class ThreadSemaphore : ISemaphore { private int counter; private readonly int max; public ThreadSemaphore() : this(0, int.Max) {} public ThreadSemaphore(int initial) : this(initial, int.Max) {} public ThreadSemaphore(int initial, int max) { this.counter = Math.Min(initial,max); this.max = max; } public void Acquire() { lock(this) { counter--; if(counter < 0 && !Monitor.Wait(this)) throw new SemaphoreFailedException(); } } public void Acquire(TimeSpan timeout) { lock(this) { counter--; if(counter < 0 && !Monitor.Wait(this,timeout)) throw new SemaphoreFailedException(); } } public void Release() { lock(this) { if(counter >= max) throw new SemaphoreFailedException(); if(counter < 0) Monitor.Pulse(this); counter++; } } }

信號量在復雜的阻塞情景下更加有用,例如我們后面將要討論的通道(channel)。你也可以使用信號量來實現臨界區的排他性(如下面的 Work3),但是我還是推薦使用內建的 lock 語句,像上面的 Work2 那樣。

請注意:如果使用不當,信號量也是有潛在危險的。正確的做法是:當獲取信號量失敗時,千萬不要再調用釋放操作;當獲取成功時,無論發生了什么錯誤,都要記得釋放信號量。遵循這樣的原則,你的同步才是正確的。Work3 中的 finally 語句就是為了保證正確釋放信號量。注意:獲取信號量( s.Acquire() )的操作必須放到 try 語句的外面,只有這樣,當獲取失敗時才不會調用釋放操作。

ThreadSemaphore s = new ThreadSemaphore(1); void Work3() { NonCriticalSection1(); s.Acquire(); try { CriticalSection(); } finally { s.Release(); } NonCriticalSection2(); }

 


 

2.3 跨進程信號量

為了協調不同進程訪問同一資源,我們需要用到上面討論過的概念。很不幸,.NET 中的 monitor 類不可以跨進程使用。但是,win32 API提供的內核信號量對象可以用來實現跨進程同步。 Robin Galloway-Lunn 介紹了怎樣將 win32 的信號量映射到 .NET 中(見 Using Win32 Semaphores in C# )。我們的實現也類似:

[DllImport("kernel32",EntryPoint="CreateSemaphore", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern uint CreateSemaphore( SecurityAttributes auth, int initialCount, int maximumCount, string name); [DllImport("kernel32",EntryPoint="WaitForSingleObject", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern uint WaitForSingleObject( uint hHandle, uint dwMilliseconds); [DllImport("kernel32",EntryPoint="ReleaseSemaphore", SetLastError=true,CharSet=CharSet.Unicode)] [return : MarshalAs( UnmanagedType.VariantBool )] internal static extern bool ReleaseSemaphore( uint hHandle, int lReleaseCount, out int lpPreviousCount); [DllImport("kernel32",EntryPoint="CloseHandle",SetLastError=true, CharSet=CharSet.Unicode)] [return : MarshalAs( UnmanagedType.VariantBool )] internal static extern bool CloseHandle(uint hHandle);


public class ProcessSemaphore : ISemaphore, IDisposable { private uint handle; private readonly uint interruptReactionTime; public ProcessSemaphore(string name) : this( name,0,int.MaxValue,500) {} public ProcessSemaphore(string name, int initial) : this( name,initial,int.MaxValue,500) {} public ProcessSemaphore(string name, int initial, int max, int interruptReactionTime) { this.interruptReactionTime = (uint)interruptReactionTime; this.handle = NTKernel.CreateSemaphore(null, initial, max, name); if(handle == 0) throw new SemaphoreFailedException(); } public void Acquire() { while(true) { //looped 0.5s timeout to make NT-blocked threads interruptable. uint res = NTKernel.WaitForSingleObject(handle, interruptReactionTime); try {System.Threading.Thread.Sleep(0);} catch(System.Threading.ThreadInterruptedException e) { if(res == 0) { //Rollback int previousCount; NTKernel.ReleaseSemaphore(handle,1,out previousCount); } throw e; } if(res == 0) return; if(res != 258) throw new SemaphoreFailedException(); } } public void Acquire(TimeSpan timeout) { uint milliseconds = (uint)timeout.TotalMilliseconds; if(NTKernel.WaitForSingleObject(handle, milliseconds) != 0) throw new SemaphoreFailedException(); } public void Release() { int previousCount; if(!NTKernel.ReleaseSemaphore(handle, 1, out previousCount)) throw new SemaphoreFailedException(); } #region IDisposable Member public void Dispose() { if(handle != 0) { if(NTKernel.CloseHandle(handle)) handle = 0; } } #endregion }

有一點很重要:win32中的信號量是可以命名的。這允許其他進程通過名字來創建相應信號量的句柄。為了讓阻塞線程可以中斷,我們使用了一個(不好)的替代方法:使用超時和 Sleep(0)。我們需要中斷來安全關閉線程。更好的做法是:確定沒有線程阻塞之后才釋放信號量,這樣程序才可以完全釋放資源並正確退出。

你可能也注意到了:跨線程和跨進程的信號量都使用了相同的接口。所有相關的類都使用了這種模式,以實現上面背景介紹中提到的封閉性。需要注意:出於性能考慮,你不應該將跨進程的信號量用到跨線程的場景,也不應該將跨線程的實現用到單線程的場景。


 

3. 跨進程共享內存:內存映射文件

我們已經實現了跨線程和跨進程的共享資源訪問同步。但是傳遞/接收消息還需要共享資源。對於線程來說,只需要聲明一個類成員變量就可以了。但是對於跨進程來說,我們需要使用到 win32 API 提供的內存映射文件(Memory Mapped Files,簡稱MMF)。使用 MMF和使用 win32 信號量差不多。我們需要先調用 CreateFileMapping 方法來創建一個內存映射文件的句柄:

[DllImport("Kernel32.dll",EntryPoint="CreateFileMapping", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern IntPtr CreateFileMapping(uint hFile, SecurityAttributes lpAttributes, uint flProtect, uint dwMaximumSizeHigh, uint dwMaximumSizeLow, string lpName); [DllImport("Kernel32.dll",EntryPoint="MapViewOfFile", SetLastError=true,CharSet=CharSet.Unicode)] internal static extern IntPtr MapViewOfFile(IntPtr hFileMappingObject, uint dwDesiredAccess, uint dwFileOffsetHigh, uint dwFileOffsetLow, uint dwNumberOfBytesToMap); [DllImport("Kernel32.dll",EntryPoint="UnmapViewOfFile", SetLastError=true,CharSet=CharSet.Unicode)] [return : MarshalAs( UnmanagedType.VariantBool )] internal static extern bool UnmapViewOfFile(IntPtr lpBaseAddress);
public static MemoryMappedFile CreateFile(string name, FileAccess access, int size) { if(size < 0) throw new ArgumentException("Size must not be negative","size"); IntPtr fileMapping = NTKernel.CreateFileMapping(0xFFFFFFFFu,null, (uint)access,0,(uint)size,name); if(fileMapping == IntPtr.Zero) throw new MemoryMappingFailedException(); return new MemoryMappedFile(fileMapping,size,access); }

我們希望直接使用 pagefile 中的虛擬文件,所以我們用 -1(0xFFFFFFFF) 來作為文件句柄來創建我們的內存映射文件句柄。我們也指定了必填的文件大小,以及相應的名稱。這樣其他進程就可以通過這個名稱來同時訪問該映射文件。創建了內存映射文件后,我們就可以映射這個文件不同的部分(通過偏移量和字節大小來指定)到我們的進程地址空間。我們通過 MapViewOfFile 系統方法來指定:

public MemoryMappedFileView CreateView(int offset, int size, MemoryMappedFileView.ViewAccess access) { if(this.access == FileAccess.ReadOnly && access == MemoryMappedFileView.ViewAccess.ReadWrite) throw new ArgumentException( "Only read access to views allowed on files without write access", "access"); if(offset < 0) throw new ArgumentException("Offset must not be negative","size"); if(size < 0) throw new ArgumentException("Size must not be negative","size"); IntPtr mappedView = NTKernel.MapViewOfFile(fileMapping, (uint)access,0,(uint)offset,(uint)size); return new MemoryMappedFileView(mappedView,size,access); }

在不安全的代碼中,我們可以將返回的指針強制轉換成我們指定的類型。盡管如此,我們不希望有不安全的代碼存在,所以我們使用 Marshal 類來從中讀寫我們的數據。偏移量參數是用來從哪里開始讀寫數據,相對於指定的映射視圖的地址。

public byte ReadByte(int offset) { return Marshal.ReadByte(mappedView,offset); } public void WriteByte(byte data, int offset) { Marshal.WriteByte(mappedView,offset,data); } public int ReadInt32(int offset) { return Marshal.ReadInt32(mappedView,offset); } public void WriteInt32(int data, int offset) { Marshal.WriteInt32(mappedView,offset,data); } public void ReadBytes(byte[] data, int offset) { for(int i=0;i<data.Length;i++) data[i] = Marshal.ReadByte(mappedView,offset+i); } public void WriteBytes(byte[] data, int offset) { for(int i=0;i<data.Length;i++) Marshal.WriteByte(mappedView,offset+i,data[i]); }

但是,我們希望讀寫整個對象樹到文件中,所以我們需要支持自動進行序列化和反序列化的方法。

public object ReadDeserialize(int offset, int length) { byte[] binaryData = new byte[length]; ReadBytes(binaryData,offset); System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); System.IO.MemoryStream ms = new System.IO.MemoryStream( binaryData,0,length,true,true); object data = formatter.Deserialize(ms); ms.Close(); return data; } public void WriteSerialize(object data, int offset, int length) { System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter(); byte[] binaryData = new byte[length]; System.IO.MemoryStream ms = new System.IO.MemoryStream( binaryData,0,length,true,true); formatter.Serialize(ms,data); ms.Flush(); ms.Close(); WriteBytes(binaryData,offset); }

請注意:對象序列化之后的大小不應該超過映射視圖的大小。序列化之后的大小總是比對象本身占用的內存要大的。我沒有試過直接將對象內存流綁定到映射視圖,那樣做應該也可以,甚至可能帶來少量的性能提升。

 

4. 信箱:在線程/進程間傳遞消息

這里的信箱與 Email 及 NT 中的郵件槽(Mailslots)無關。它是一個只能保留一個對象的安全共享內存結構。信箱的內容通過一個屬性來讀寫。如果信箱內容為空,試圖讀取該信箱的線程將會阻塞,直到另一個線程往其中寫內容。如果信箱已經有了內容,當一個線程試圖往其中寫內容時將被阻塞,直到另一個線程將信箱內容讀取出去。信箱的內容只能被讀取一次,它的引用在讀取后自動被刪除。基於上面的代碼,我們已經可以實現信箱了。

4.1 跨線程的信箱

我們可以使用兩個信號量來實現一個信箱:一個信號量在信箱內容為空時觸發,另一個在信箱有內容時觸發。在讀取內容之前,線程先等待信箱已經填充了內容,讀取之后觸發空信號量。在寫入內容之前,線程先等待信箱內容清空,寫入之后觸發滿信號量。注意:空信號量在一開始時就被觸發了。

public sealed class ThreadMailBox : IMailBox { private object content; private ThreadSemaphore empty, full; public ThreadMailBox() { empty = new ThreadSemaphore(1,1); full = new ThreadSemaphore(0,1); } public object Content { get { full.Acquire(); object item = content; empty.Release(); return item; } set { empty.Acquire(); content = value; full.Release(); } } }

 

 

4.2  跨進程信箱

跨進程信箱與跨線程信箱的實現基本上一樣簡單。不同的是我們使用兩個跨進程的信號量,並且我們使用內存映射文件來代替類成員變量。由於序列化可能會失敗,我們使用了一小段異常處理來回滾信箱的狀態。失敗的原因有很多(無效句柄,拒絕訪問,文件大小問題,Serializable屬性缺失等等)。

public sealed class ProcessMailBox : IMailBox, IDisposable { private MemoryMappedFile file; private MemoryMappedFileView view; private ProcessSemaphore empty, full; public ProcessMailBox(string name,int size) { empty = new ProcessSemaphore(name+".EmptySemaphore.MailBox",1,1); full = new ProcessSemaphore(name+".FullSemaphore.MailBox",0,1); file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.MailBox", MemoryMappedFile.FileAccess.ReadWrite,size); view = file.CreateView(0,size, MemoryMappedFileView.ViewAccess.ReadWrite); } public object Content { get { full.Acquire(); object item; try {item = view.ReadDeserialize();} catch(Exception e) { //Rollback full.Release(); throw e; } empty.Release(); return item; } set { empty.Acquire(); try {view.WriteSerialize(value);} catch(Exception e) { //Rollback empty.Release(); throw e; } full.Release(); } } #region IDisposable Member public void Dispose() { view.Dispose(); file.Dispose(); empty.Dispose(); full.Dispose(); } #endregion }

到這里我們已經實現了跨進程消息傳遞(IPC)所需要的組件。你可能需要再回頭本文開頭的那個例子,看看 ProcessMailBox 應該如何使用。

 
 

5.通道:基於隊列的消息傳遞

信箱最大的限制是它們每次只能保存一個對象。如果一系列線程(使用同一個信箱)中的一個線程需要比較長的時間來處理特定的命令,那么整個系列都會阻塞。通常我們會使用緩沖的消息通道來處理,這樣你可以在方便的時候從中讀取消息,而不會阻塞消息發送者。這種緩沖通過通道來實現,這里的通道比信箱要復雜一些。同樣,我們將分別從線程和進程級別來討論通道的實現。

5.1 可靠性

信箱和通道的另一個重要的不同是:通道擁有可靠性。例如:自動將發送失敗(可能由於線程等待鎖的過程中被中斷)的消息轉存到一個內置的容器中。這意味着處理通道的線程可以安全地停止,同時不會丟失隊列中的消息。這通過兩個抽象類來實現, ThreadReliability 和 ProcessReliability。每個通道的實現類都繼承其中的一個類。

5.2 跨線程的通道

跨線程的通道基於信箱來實現,但是使用一個同步的隊列來作為消息緩沖而不是一個變量。得益於信號量,通道在空隊列時阻塞接收線程,在隊列滿時阻塞發送線程。這樣你就不會碰到由入隊/出隊引發的錯誤。為了實現這個效果,我們用隊列大小來初始化空信號量,用0來初始化滿信號量。如果某個發送線程在等待入隊的時候被中斷,我們將消息復制到內置容器中,並將異常往外面拋。在接收操作中,我們不需要做異常處理,因為即使線程被中斷你也不會丟失任何消息。注意:線程只有在阻塞狀態才能被中斷,就像調用信號量的獲取操作(Aquire)方法時。

public sealed class ThreadChannel : ThreadReliability, IChannel { private Queue queue; private ThreadSemaphore empty, full; public ThreadChannel(int size) { queue = Queue.Synchronized(new Queue(size)); empty = new ThreadSemaphore(size,size); full = new ThreadSemaphore(0,size); } public void Send(object item) { try {empty.Acquire();} catch(System.Threading.ThreadInterruptedException e) { DumpItem(item); throw e; } queue.Enqueue(item); full.Release(); } public void Send(object item, TimeSpan timeout) { try {empty.Acquire(timeout);} ... } public object Receive() { full.Acquire(); object item = queue.Dequeue(); empty.Release(); return item; } public object Receive(TimeSpan timeout) { full.Acquire(timeout); ... } protected override void DumpStructure() { lock(queue.SyncRoot) { foreach(object item in queue) DumpItem(item); queue.Clear(); } } }

5.3 跨進程通道

實現跨進程通道有點麻煩,因為你需要首先提供一個跨進程的緩沖區。一個可能的解決方法是使用跨進程信箱並根據需要將接收/發送方法加入隊列。為了避免這種方案的幾個缺點,我們將直接使用內存映射文件來實現一個隊列。MemoryMappedArray 類將內存映射文件分成幾部分,可以直接使用數組索引來訪問。 MemoryMappedQueue 類,為這個數組提供了一個經典的環(更多細節請查看附件中的代碼)。為了支持直接以 byte/integer 類型訪問數據並同時支持二進制序列化,調用方需要先調用入隊(Enqueue)/出隊(Dequeue)操作,然后根據需要使用讀寫方法(隊列會自動將數據放到正確的位置)。這兩個類都不是線程和進程安全的,所以我們需要使用跨進程的信號量來模擬互斥量(也可以使用 win32 互斥量),以此實現相互間的互斥訪問。除了這兩個類,跨進程的通道基本上和跨線程信箱一樣。同樣,我們也需要在 Send() 中處理線程中斷及序列化可能失敗的問題。

public sealed class ProcessChannel : ProcessReliability, IChannel, IDisposable { private MemoryMappedFile file; private MemoryMappedFileView view; private MemoryMappedQueue queue; private ProcessSemaphore empty, full, mutex; public ProcessChannel( int size, string name, int maxBytesPerEntry) { int fileSize = 64+size*maxBytesPerEntry; empty = new ProcessSemaphore(name+".EmptySemaphore.Channel",size,size); full = new ProcessSemaphore(name+".FullSemaphore.Channel",0,size); mutex = new ProcessSemaphore(name+".MutexSemaphore.Channel",1,1); file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.Channel", MemoryMappedFile.FileAccess.ReadWrite,fileSize); view = file.CreateView(0,fileSize, MemoryMappedFileView.ViewAccess.ReadWrite); queue = new MemoryMappedQueue(view,size,maxBytesPerEntry,true,0); if(queue.Length < size || queue.BytesPerEntry < maxBytesPerEntry) throw new MemoryMappedArrayFailedException(); } public void Send(object item) { try {empty.Acquire();} catch(System.Threading.ThreadInterruptedException e) { DumpItemSynchronized(item); throw e; } try {mutex.Acquire();} catch(System.Threading.ThreadInterruptedException e) { DumpItemSynchronized(item); empty.Release(); throw e; } queue.Enqueue(); try {queue.WriteSerialize(item,0);} catch(Exception e) { queue.RollbackEnqueue(); mutex.Release(); empty.Release(); throw e; } mutex.Release(); full.Release(); } public void Send(object item, TimeSpan timeout) { try {empty.Acquire(timeout);} ... } public object Receive() { full.Acquire(); mutex.Acquire(); object item; queue.Dequeue(); try {item = queue.ReadDeserialize(0);} catch(Exception e) { queue.RollbackDequeue(); mutex.Release(); full.Release(); throw e; } mutex.Release(); empty.Release(); return item; } public object Receive(TimeSpan timeout) { full.Acquire(timeout); ... } protected override void DumpStructure() { mutex.Acquire(); byte[][] dmp = queue.DumpClearAll(); for(int i=0;i<dmp.Length;i++) DumpItemSynchronized(dmp[i]); mutex.Release(); } #region IDisposable Member public void Dispose() { view.Dispose(); file.Dispose(); empty.Dispose(); full.Dispose(); mutex.Dispose(); } #endregion }

 


 

6. 消息路由

我們目前已經實現了線程和進程同步及消息傳遞機制(使用信箱和通道)。當你使用阻塞隊列的時候,有可能會遇到這樣的問題:你需要在一個線程中同時監聽多個隊列。為了解決這樣的問題,我們提供了一些小型的類:通道轉發器,多用復用器,多路復用解碼器和通道事件網關。你也可以通過簡單的 IRunnable 模式來實現類似的通道處理器。IRunnable模式由兩個抽象類SingleRunnable和 MultiRunnable 來提供(具體細節請參考附件中的代碼)。

6.1 通道轉發器

通道轉發器僅僅監聽一個通道,然后將收到的消息轉發到另一個通道。如果有必要,轉發器可以將每個收到的消息放到一個信封中,並加上一個數字標記,然后再轉發出去(下面的多路利用器使用了這個特性)。

public class ChannelForwarder : SingleRunnable { private IChannel source, target; private readonly int envelope; public ChannelForwarder(IChannel source, IChannel target, bool autoStart, bool waitOnStop) : base(true,autoStart,waitOnStop) { this.source = source; this.target = target; this.envelope = -1; } public ChannelForwarder(IChannel source, IChannel target, int envelope, bool autoStart, bool waitOnStop) : base(true,autoStart,waitOnStop) { this.source = source; this.target = target; this.envelope = envelope; } protected override void Run() { //NOTE: IChannel.Send is interrupt save and //automatically dumps the argument. if(envelope == -1) while(running) target.Send(source.Receive()); else { MessageEnvelope env; env.ID = envelope; while(running) { env.Message = source.Receive(); target.Send(env); } } } }

 

6.2 通道多路復用器和通道復用解碼器

通道多路復用器監聽多個來源的通道並將接收到的消息(消息使用信封來標記來源消息)轉發到一個公共的輸出通道。這樣就可以一次性地監聽多個通道。復用解碼器則是監聽一個公共的輸出通道,然后根據信封將消息轉發到某個指定的輸出通道。

public class ChannelMultiplexer : MultiRunnable { private ChannelForwarder[] forwarders; public ChannelMultiplexer(IChannel[] channels, int[] ids, IChannel output, bool autoStart, bool waitOnStop) { int count = channels.Length; if(count != ids.Length) throw new ArgumentException("Channel and ID count mismatch.","ids"); forwarders = new ChannelForwarder[count]; for(int i=0;i<count;i++) forwarders[i] = new ChannelForwarder(channels[i], output,ids[i],autoStart,waitOnStop); SetRunnables((SingleRunnable[])forwarders); } } public class ChannelDemultiplexer : SingleRunnable { private HybridDictionary dictionary; private IChannel input; public ChannelDemultiplexer(IChannel[] channels, int[] ids, IChannel input, bool autoStart, bool waitOnStop) : base(true,autoStart,waitOnStop) { this.input = input; int count = channels.Length; if(count != ids.Length) throw new ArgumentException("Channel and ID count mismatch.","ids"); dictionary = new HybridDictionary(count,true); for(int i=0;i<count;i++) dictionary.add(ids[i],channels[i]); } protected override void Run() { //NOTE: IChannel.Send is interrupt save and //automatically dumps the argument. while(running) { MessageEnvelope env = (MessageEnvelope)input.Receive(); IChannel channel = (IChannel)dictionary[env.ID]; channel.send(env.Message); } } }

6.3 通道事件網關

通道事件網關監聽指定的通道,在接收到消息時觸發一個事件。這個類對於基於事件的程序(例如GUI程序)很有用,或者在使用系統線程池(ThreadPool)來初始化輕量的線程。需要注意的是:使用 WinForms 的程序中你不能在事件處理方法中直接訪問UI控件,只能調用Invoke 方法。因為事件處理方法是由事件網關線程調用的,而不是UI線程。

public class ChannelEventGateway : SingleRunnable { private IChannel source; public event MessageReceivedEventHandler MessageReceived; public ChannelEventGateway(IChannel source, bool autoStart, bool waitOnStop) : base(true,autoStart,waitOnStop) { this.source = source; } protected override void Run() { while(running) { object c = source.Receive(); MessageReceivedEventHandler handler = MessageReceived; if(handler != null) handler(this,new MessageReceivedEventArgs(c)); } } }

 

7. 比薩外賣店的例子

萬事俱備,只欠東風。我們已經討論了這個同步及消息傳遞框架中的大部分重要的結構和技術(本文沒有討論框架中的其他類如Rendezvous及Barrier)。就像開頭一樣,我們用一個例子來結束這篇文章。這次我們用一個小型比薩外賣店來做演示。下圖展示了這個例子:四個並行進程相互之間進行通訊。圖中展示了消息(數據)是如何使用跨進程通道在四個進程中流動的,且在每個進程中使用了性能更佳的跨線程通道和信箱。


一開始,一個顧客點了一個比薩和一些飲料。他調用了顧客(customer)接口的方法,向顧客訂單(CustomerOrders)通道發送了一個下單(Order)消息。接單員,在顧客下單后,發送了兩條配餐指令(分別對應比薩和飲料)到廚師指令(CookInstruction)通道。同時他通過收銀(CashierOrder)通道將訂單轉發給收銀台。收銀台從價格中心獲取總價並將票據發給顧客,希望能提高收銀的速度 。與此同時,廚師將根據配餐指令將餐配好之后交給打包員工。打包員工處理好之后,等待顧客付款,然后將外賣遞給顧客。


 

為了運行這個例子,打開4個終端(cmd.exe),用 "PizzaDemo.exe cook" 啟動多個廚師進程(多少個都可以),用 "PizzaDemo.exe backend" 啟動后端進程,用 "PizzaDemo.exe facade" 啟動顧客接口門面(用你的程序名稱來代替 PizzaDemo )。注意:為了模擬真實情景,某些線程(例如廚師線程)會隨機休眠幾秒。按下回車鍵就會停止和退出進程。如果你在進程正在處理數據的時候退出,你將可以在內存轉存報告的結尾看到幾個未處理的消息。在真實世界的程序里面,消息一般都會被轉存到磁盤中,以便下次可以使用。

 

這個例子使用了上文中討論過的幾個機制。比如說,收銀台使用一個通道復用器(ChannelMultiplexer)來監聽顧客的訂單和支付通道,用了兩個信箱來實現價格服務。分發時使用了一個通道事件網關(ChannelEventGateway),顧客在食物打包完成之后馬上會收到通知。你也可以將這些程序注冊成 Windows NT 服務運行,也可以遠程登錄后運行。



 

8. 總結

本文已經討論了C#中如何基於服務的架構及實現跨進程同步和通訊。然后,這個不是唯一的解決方案。例如:在大項目中使用那么多的線程會引來嚴重的問題。這個框架中缺失的是事務支持及其他的通道/信箱實現(例如命名管道和TCP sockets)。這個框架中可能也有許多不足之處,請告訴我。

 

9. 參考資料

 
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM