現有項目是利用C#的socket與PLC進行實時通訊,PLC有兩種通訊模式——常規采集&高頻采集。
其中常規采集大概在10ms左右發送一次數據,高頻采集大概在2ms左右發送一次數據。
現有代碼框架:在與PLC進行連接時,通過建立委托並創建線程的方式,來循環讀取數據
//創建委托 public delegate void PLC_HD_Receive(byte[] recv_data); public PLC_HD_Receive PLC_Recv_Delegate_HD; //給委托綁定方法 PLC_Recv_Delegate_HD = new PLC_HD_Receive(PLC_Receive_Callback_HD);
//創建線程 PLC_Thread_HD = new Thread(new ThreadStart(PLC_ReadThread_HD));
PLC_Thread_HD.IsBackground = true; PLC_Thread_HD.Start();
//在線程內調用委托 this.BeginInvoke(this.PLC_Recv_Delegate_HD, new Object[] { recv_buffer_hd });
只要連接PLC成功后,會一直在后台讀取PLC發送來的數據,並解析數據
現有問題:實時性和數據完整性不夠,有些操作會導致socket斷掉連接。
計划:改寫現有代碼框架,加深對通訊的理解,和對實時數據流的處理。 2019-5-22
**************************************************************************************************************************************************
思路:原有框架讀取數據使用的是同步通信,出錯時反饋TimeOut錯誤,先准備改成異步通信
1 SocketError socket_error; 2 3 while (total_length < recv_buffer_len_hd) 4 { 5 //同步接收數據 6 ret_length = m_socket_hd.Receive(recv_buffer_hd, total_length, data_left, SocketFlags.None, out socket_error); 7 if (socket_error == SocketError.TimedOut || socket_error == SocketError.Shutdown || socket_error == SocketError.ConnectionAborted || ret_length == 0) 8 { 9 // 網絡不正常,委托退出接收線程 10 thread_id = 1; 11 this.Invoke(this.PLC_ExitThread_Delegate_HD, new Object[] { thread_id }); 12 return; 13 } 14 total_length += ret_length; 15 data_left -= ret_length; 16 }
控制台異步輸出數據
首先搭建一個簡單的winform窗口demo,實現控制台異步輸出數據
此處參考鏈接:https://blog.csdn.net/smartsmile2012/article/details/71172450 異步接收
但網上搜到的大部分都是服務器接收,項目上的應用是客戶端接收,做了一點修改
搭建的過程中遇到了winform無法直接控制台輸出,需要引用AllocConsole()和FreeConsole()
此處參考鏈接:https://blog.csdn.net/b510030/article/details/52621312 WinForm添加Console
在反復點擊按鈕的過程中發現,AllocConsole()最好在窗口構造函數中使用,否則多次調用AllocConsole()會導致Console.Readkey()報錯
1 public partial class Form1 : Form 2 { 3 //winform調用console窗口 4 [DllImport("Kernel32.dll")] 5 public static extern Boolean AllocConsole(); 6 7 [DllImport("Kernel32.dll")] 8 public static extern Boolean FreeConsole(); 9 //socket模塊 10 IPAddress ip; 11 Socket m_sokcet; 12 IPEndPoint local_endpoint; 13 byte[] buffer;
19 public Form1() 20 { 21 buffer = new byte[8]; 22 InitializeComponent();
25 AllocConsole(); 26 } 27 28 private void button1_Click(object sender, EventArgs e) 29 { 30 ip = IPAddress.Parse("127.0.0.1"); 31 local_endpoint = new IPEndPoint(ip, 60000); 32 m_sokcet = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 33 m_sokcet.Connect(local_endpoint); 34 m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet); 35 Console.ReadKey(); 36 } 71 void ReceiveCallback(IAsyncResult result) 72 { 73 Socket m_sokcet = (Socket)result.AsyncState; 74 m_sokcet.EndReceive(result); 75 result.AsyncWaitHandle.Close();
91 Console.WriteLine("收到消息:{0}", Encoding.ASCII.GetString(buffer));
94 //清空數據,重新開始異步接收 95 buffer = new byte[buffer.Length]; 96 m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet); 97 } 98 }
服務端利用socketTool調試工具,發送數據后看控制台窗口的刷新情況,測試結果如下:
測試結果OK
**************************************************************************************************************************************************
流式數據框架構思
此處參考鏈接:https://www.csdn.net/article/2014-06-12/2820196-Storm 實時計算
二. 實時計算的相關技術
主要分為三個階段(大多是日志流):
數據的產生與收集階段、傳輸與分析處理階段、存儲對對外提供服務階段
鏈接內說的是大數據和流式處理框架Storm,項目上還遠遠達不到大數據級別,所以只是參考一下思路。
數據的產生:PLC,數據的接受:Socket,數據的存儲:隊列,數據的分析處理:解析數據,數據的對外服務:刷新UI
框架思路有了,接下來就是具體實現
**************************************************************************************************************************************************
生產者-消費者模式和隊列
此處參考鏈接:https://www.cnblogs.com/samgk/p/4772806.html 隊列
1 //隊列模塊 2 readonly static object _locker = new object(); 3 Queue<byte[]> _tasks = new Queue<byte[]>(); 4 EventWaitHandle _wh = new AutoResetEvent(false); 5 Thread _worker; 6 7 //窗口初始化時開始消費者線程 8 public Form1() 9 { 10 buffer = new byte[8]; 11 InitializeComponent(); 12 _worker = new Thread(Work); 13 _worker.Start(); 14 AllocConsole(); 15 } 16 17 //加了鎖和信號量 18 void Work() 19 { 20 while (true) 21 { 22 byte[] work = null; 23 lock (_locker) 24 { 25 if (_tasks.Count > 0) 26 { 27 work = _tasks.Dequeue(); // 有任務時,出列任務 28 29 if (work == null) // 退出機制:當遇見一個null任務時,代表任務結束 30 return; 31 } 32 } 33 34 if (work != null) 35 SaveData(work); // 任務不為null時,處理並保存數據 36 else 37 _wh.WaitOne(); // 沒有任務了,等待信號 38 } 39 } 40 41 //在異步接收的方法中把控制台輸出修改為加入隊列 42 void EnqueueTask(byte[] task) 43 { 44 lock (_locker) 45 _tasks.Enqueue(task); // 向隊列中插入任務 46 47 _wh.Set(); // 給工作線程發信號 48 }
1 //TODO 將收到的數據放入隊列 2 EnqueueTask(buffer); 3 Thread.Sleep(10); 4 //Console.WriteLine("收到消息:{0}", Encoding.ASCII.GetString(buffer)); 5 //
1 void SaveData(byte[] buffer) 2 { 3 //從隊列中取出數據 4 Console.WriteLine("收到消息:{0}", Encoding.ASCII.GetString(buffer)); 5 }
這樣就把數據先存入隊列,再取出數據,通過控制台輸出數據,實現了生產者-消費者模式和隊列存儲數據 2019-5-23
**************************************************************************************************************************************************
解析數據&刷新UI
項目真正的業務需求是解析數據和刷新UI,所以我們需要把SaveData方法改造一下
PLC會源源不斷的輸出數據,我們需要在接收到數據后對數據進行處理和刷新UI,不可能對每一個數據都進行處理
而且項目不是大數據級別的,不使用數據庫存放數據,純粹的實時處理,我們需要定義一下處理數據的采集時間和UI的刷新時間
原有框架的常規采集是16ms,高頻采集是2ms,所以在測試階段定義10ms采集一次,UI刷新500ms一次
邏輯是在最后解析&刷新時間記錄時間戳,和SaveData當前執行時間戳比較,大於10ms則解析,大於500ms則刷新
1 int count_UI = 0; 2 int count_Data = 0; 3 float time_UI = 0F; 4 float time_Data = 0F; 5 float time_over_UI = 0F; 6 float time_over_Data = 0F; 7 /// <summary>處理保存</summary> 8 bool SaveData(byte[] buffer) 9 { 11 //從隊列中取出數據,解析並刷新UI 13 //解析數據 14 time_Data = Environment.TickCount - time_over_Data; 15 time_UI = Environment.TickCount - time_over_UI; 16 //if (time_Data > 10)//解析數據——10ms一次 17 //{ 18 //解析數據函數 19 count_Data++;
23 Console.WriteLine("解析成功:{0},耗時{1}ms,序號:{2}", Encoding.ASCII.GetString(buffer), time_Data.ToString(), count_Data.ToString()); 24 time_over_Data = Environment.TickCount; 25 //} 26 27 //刷新UI——500ms一次 28 if (time_UI > 500) 29 { 30 //刷新UI函數 31 count_UI++;
33 Console.WriteLine("刷新UI:{0},耗時{1}ms,序號:{2}", Encoding.ASCII.GetString(buffer), time_UI.ToString(), count_UI.ToString()); 34 time_over_UI = Environment.TickCount; 35 } 36 Thread.Sleep(200);// 模擬數據保存
37 return true; 38 }
使用SockeTool發送數據100次,會看到數據被過濾到了一部分
測試到這里我對時間片有一點疑惑,查閱了一些資料和做了一些實際測試
此處參考鏈接:https://zhidao.baidu.com/question/1051646628145878899.html 時間片
socket處理數據流的速度非常快,如果不加10ms的過濾則每一條數據都會顯示在控制台頁面,如果加了10ms的過濾則只顯示一部分,至於為什么大部分情況下是16ms,和線程調度有關
我們現在把解析數據的函數和UI調用的函數放在指定的地方就可以實測了。
**************************************************************************************************************************************************
socket粘包&服務端斷開連接異常&異步接收檢測socket通斷
1、粘包——在測試過程中發現,如果buffer的大小與每次發送的數據不一致,會發生粘包現象。
項目上PLC發送的數據固定為4096字節,所以和服務端保持一致即可。
2、服務端連接斷開——測試的另一個問題是如果服務端斷開連接,客戶端無法有效監測,回調函數會一直執行。
3、監測通斷——網上查了很多資料,利用select方法和poll方法的,試了一下沒有效果,最后采用flag的方式成功在連接異常后終止回調函數
EndReceive方法會反饋當前獲取到的字節數,否則沒有數據則為0,如果重復接收20次,每次延時100ms都沒有為0,則判斷為連接已斷。
項目是和PLC連接,和其他互聯網應用有一定的差異。
1 int flag_connect = 0; 2 void ReceiveCallback(IAsyncResult result) 3 { 11 Socket m_sokcet = (Socket)result.AsyncState; 12 int a = m_sokcet.EndReceive(result); 13 result.AsyncWaitHandle.Close(); 14 if (a == 0) 15 { 16 if (flag_connect == 20) 17 { 18 flag_connect = 0; 19 return; 20 } 21 flag_connect++; 22 Thread.Sleep(100); 23 } 24 else 25 { 27 EnqueueTask(buffer);32 } 33 //清空數據,重新開始異步接收 34 buffer = new byte[buffer.Length]; 35 m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet); 36 }
**************************************************************************************************************************************************
解析數據連接PLC實測
上面的測試都是筆記本電腦上利用socketTool測試的,現在開始連接PLC做真實的數據解析測試
1、測試遇到的問題是,如何斷開解析數據線程和異步接收回調函數
一開始直接使用的是Abort方法,但是效果不好,沒有辦法再次連接
查詢相關資料后,使用flag的方式來退出線程,使用信號量的方式來結束回調函數
另外考慮到PLC是無限的數據流,對隊列的最大數量做了一個限制,如果超過1000個則停止接收
此處參考鏈接:https://blog.csdn.net/pc0de/article/details/52841458 Abort異常
此處參考鏈接:https://blog.csdn.net/shizhibuyi1234/article/details/78202647 結束線程
還有兩個線程相關的,Mark一下日后學習
https://www.cnblogs.com/doforfuture/p/6293926.html 線程池相關
https://www.cnblogs.com/wjcnet/p/6955756.html Task
2、連接斷開過程中的,隊列內的數據處理。經過測試,最后還是采用信號量的方式
在隊列達到最大數量1000時,異步接收回調函數等待。
在隊列為空時,解析數據線程給異步接收回調函數發信號。
另外,實測Queue為空時,調用Dequeue會報錯隊列為空。
完整代碼:
1 public partial class Form1 : Form 2 { 3 //winform調用console窗口 4 [DllImport("Kernel32.dll")] 5 public static extern Boolean AllocConsole(); 6 7 [DllImport("Kernel32.dll")] 8 public static extern Boolean FreeConsole(); 9 //socket模塊 10 IPAddress ip; 11 Socket m_sokcet; 12 IPEndPoint local_endpoint; 13 byte[] buffer; 14 //隊列模塊 15 readonly static object _locker = new object(); 16 Queue<byte[]> _tasks = new Queue<byte[]>(); 17 EventWaitHandle _wh; 18 EventWaitHandle _recieve_call; 19 Thread _worker; 20 public Form1() 21 { 22 buffer = new byte[256]; 23 InitializeComponent(); 24 AllocConsole(); 25 } 26 27 private void button1_Click(object sender, EventArgs e) 28 { 29 connect_status = true; 30 if (_wh == null)//隊列信號量 31 _wh = new AutoResetEvent(false); 32 if (_recieve_call == null)//隊列滿或空信號量 33 _recieve_call = new AutoResetEvent(false); 34 _worker = new Thread(Work); 35 _worker.Start(); 36 if (m_sokcet == null) 37 { 38 ip = IPAddress.Parse("169.254.11.22");//TODO IP修改 39 local_endpoint = new IPEndPoint(ip, 2001);//TODO 端口修改 40 m_sokcet = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 41 m_sokcet.Connect(local_endpoint); 42 } 43 m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet); 44 //Console.ReadKey(); 45 } 46 47 bool connect_status = false; 48 int flag_connect = 0; 49 void ReceiveCallback(IAsyncResult result) 50 { 51 if (_tasks.Count > 10000) 52 { 53 //TODO 區分當前連接狀態,執行wait還是return 54 _recieve_call.WaitOne(); 55 //return; 56 } 57 58 Socket m_sokcet = (Socket)result.AsyncState; 59 int a = m_sokcet.EndReceive(result); 60 result.AsyncWaitHandle.Close(); 61 if (a == 0)//判斷是否與服務端斷開連接 62 { 63 if (flag_connect == 20) 64 { 65 flag_connect = 0; 66 return; 67 } 68 flag_connect++; 69 Thread.Sleep(100); 70 } 71 else 72 { 73 //TODO 將收到的數據放入隊列 74 EnqueueTask(buffer); 75 //Thread.Sleep(1); 76 //Delay(1); 77 //Console.WriteLine("收到消息:{0}", Encoding.ASCII.GetString(buffer)); 78 // 79 } 80 //清空數據,重新開始異步接收 81 buffer = new byte[buffer.Length]; 82 m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet); 83 } 84 85 void Work() 86 { 87 bool result; 88 while (connect_status) 89 { 90 byte[] work = null; 91 lock (_locker) 92 { 93 if (_tasks.Count > 0) 94 { 95 work = _tasks.Dequeue(); // 有任務時,出列任務 96 } 97 else 98 { 99 _recieve_call.Set(); 100 //return; 101 } 102 } 103 104 if (work != null) 105 result = SaveData(work); // 任務不為null時,處理並保存數據 106 else 107 _wh.WaitOne(); // 沒有任務了,等待信號 108 } 109 } 110 111 /// <summary>插入任務</summary> 112 void EnqueueTask(byte[] task) 113 { 114 lock (_locker) 115 _tasks.Enqueue(task); // 向隊列中插入任務 116 117 _wh.Set(); // 給工作線程發信號 118 } 119 120 int count_UI = 0; 121 int count_Data = 0; 122 float time_UI = 0F; 123 float time_Data = 0F; 124 float time_over_UI = 0F; 125 float time_over_Data = 0F; 126 /// <summary>處理保存</summary> 127 bool SaveData(byte[] buffer) 128 { 129 130 //TODO 從隊列中取出數據,解析並刷新UI 131 132 //解析數據——全部解析並保存 133 time_Data = Environment.TickCount - time_over_Data; 134 time_UI = Environment.TickCount - time_over_UI; 135 //if (time_Data > 10) 136 //{ 137 //解析數據函數 138 count_Data++; 139 bool result = PLC_Receive_Callback_HD(buffer); 140 //Console.WriteLine(count_Data.ToString() + "," + _tasks.Count.ToString() + "," + result.ToString()); 141 //Thread.Sleep(1); 142 Console.WriteLine("解析成功:{0},耗時{1}ms,序號:{2}", Encoding.ASCII.GetString(buffer), time_Data.ToString(), count_Data.ToString()); 143 time_over_Data = Environment.TickCount; 144 //} 145 146 //刷新UI——500ms刷新一次 147 if (time_UI > 500) 148 { 149 //刷新UI函數 150 count_UI++; 151 //Console.WriteLine(count_UI.ToString() + "," + _tasks.Count.ToString() + "刷新UI成功"); 152 Console.WriteLine("刷新UI:{0},耗時{1}ms,序號:{2}", Encoding.ASCII.GetString(buffer), time_UI.ToString(), count_UI.ToString()); 153 time_over_UI = Environment.TickCount; 154 } 155 return true; 156 //Thread.Sleep(200); // 模擬數據保存 157 } 158 159 private void button2_Click(object sender, EventArgs e) 160 { 161 connect_status = false; 162 if (_worker != null && _worker.IsAlive) 163 { 164 _wh.Set(); 165 //_worker.Join(); 166 } 167 }
最后加入了解析數據的函數,對4096個字節解析,但是把刷新UI全部屏蔽
實測PLC_Receive_Callback_HD內900多行代碼解析數據很快
原打算采用異步調用方式調用解析數據函數,現在看來不需要,因為不涉及數據存儲
通訊框架基本改寫完成,剩下的就是把刷新UI的函數加上去
**************************************************************************************************************************************************
總結:
參考了網上的很多資料,實現了一個簡單的異步通訊和生產者-消費者模式加隊列存儲,實際測試效果自己還是比較滿意的
果然用輪子不如造輪子,重復造輪子是提升技術的最好方法。 2019-5-24