之前看wcf服務的時候看到wcf有支持管道通信協議,之前不知道,最近剛好有用到這個,這里寫個簡單實例
.net有已經封裝好的pip通信的對象NamedPipeServerStream 和NamedPipeClientStream對象,底層應該還是調用C++實現的api實現的
對服務端和客戶端做個簡單的封裝方便調用:
server:
public class PipServer:Log { public Action<string> ReceiveEvent; NamedPipeServerStream m_pipServer; AutoResetEvent monitor = new AutoResetEvent(false); Thread m_thread; bool run = true; string servname; public PipServer(string name) { m_pipServer = new NamedPipeServerStream(name,PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); servname = name; } public void Listen() { try { m_thread = new Thread(() => { WaitConnect(); }); m_thread.Start(); } catch (Exception ex) { P(ex, "[PipServer.WaitForConnect]"); } } void WaitConnect() { AsyncCallback callback = null; callback = new AsyncCallback(ar => { var pipeServer = (NamedPipeServerStream)ar.AsyncState; pipeServer.EndWaitForConnection(ar); Accept(); pipeServer.Disconnect(); m_pipServer.BeginWaitForConnection(callback, m_pipServer); }); m_pipServer.BeginWaitForConnection(callback, m_pipServer); } void Accept() { try { var res = Read(); if(!string.IsNullOrEmpty(res)) ReceiveEvent?.Invoke(res); } catch(Exception ex) { P(ex, "[PipServer.Accept]"); } } public bool Send(string msg) { try { var buf = Encoding.UTF8.GetBytes(msg); if (m_pipServer.CanWrite) { m_pipServer.Write(buf, 0, buf.Length); m_pipServer.Flush(); return true; } return false; } catch (Exception ex) { P(ex, "[PipServer.Send]"); return false; } } public string Read() { try { if (m_pipServer.CanRead) { int count = 0; List<byte> data = new List<byte>(); byte[] buf = new byte[1024]; do { count=m_pipServer.Read(buf, 0, buf.Length); if (count == buf.Length) { data.AddRange(buf); } else { var dst = new byte[count]; Buffer.BlockCopy(buf, 0, dst, 0, count); data.AddRange(dst); } } while (count > 0&&m_pipServer.CanRead); var res = Encoding.UTF8.GetString(data.ToArray()); return res; } return null; } catch (Exception ex) { P(ex, "[PipServer.Read]"); return null; } } public void Close() { run = false; m_thread.Join(); if (m_pipServer.IsConnected) { m_pipServer.Close(); } } }
client:
public class PipClient:Log { string serv; public PipClient(string server) { serv = server; } public bool Send(string msg) { try { var buf = Encoding.UTF8.GetBytes(msg); NamedPipeClientStream pipclient = new NamedPipeClientStream(serv); pipclient.Connect(3000); if (pipclient.CanWrite) { pipclient.Write(buf, 0, buf.Length); pipclient.Flush(); pipclient.Close(); return true; } return false; } catch (Exception ex) { P(ex, "[PipClient.Send]"); return false; } } }
log類寫了一個簡單日志打印類,集成下方便打印日志,可以直接去掉繼承,吧日志打印去掉:
public class Log { public void L(string msg) { Console.WriteLine(msg); } public void L(string format, params string[] data) { Console.WriteLine(string.Format(format,data)); } public void P(Exception ex, string format, params string[] data) { var msg = string.Format(format, data); Console.WriteLine(string.Format("{0}:{1},{1}", msg, ex.Message, ex.StackTrace)); } }
調用實例:
static void PipTest() { Thread thread = new Thread(() => { PipServer pip = new PipServer("TEST_PIP"); pip.ReceiveEvent += s => { w(string.Format("receive:{0}",s)); }; pip.Listen(); }); thread.Start(); bool send = true; int count = 0; AutoResetEvent monitor = new AutoResetEvent(false); Thread client = new Thread(() => { PipClient ct = new PipClient("TEST_PIP"); while (send) { string msg = string.Format("這是第{0}條數據", count); w(msg); ct.Send(msg); count++; if (monitor.WaitOne(1000)) { break; } } }); client.Start(); while (true) { var input = Console.ReadLine(); if (input == "q" || input == "Q") { send = false; monitor.Set(); break; } } }
運行時,是客戶端向服務端每隔一秒發送一次數據
有幾個要注意的點:
1 要注意編碼方式,怎么編碼就怎么解碼,最好是要有固定編碼,不要直接寫string,因為如果是不同的語言和不同平台實現的類,可能default對應的編碼方式是不一樣的,這樣會造成讀取亂碼
2 這里可以用streamreader來讀取,但是不要用readend這種寫法,如果發送方不及時調用close方法,這樣寫會一直卡住,調用flush也沒用
3 這里初始化只傳入了servername,實際底層的地址是\\\\.\\pipe\\TEST_PIP,調試的時候下個斷點可以看到的,如果用C++寫的話,直接調用API傳入的地址就是全名,到C#這邊會自動被解析
4 可以再傳入的信息上做一些文章,加上ID,發送方和接收方,這樣可以實現類似回調的功能,這個是支持雙向通信的,這里只有單向
5 類庫是支持同步和異步的,這里是異步的等待連接,同步的讀取,但是貌似沒有直接支持4.5await寫法的方法,只有AsyncCallback的寫法