C#通訊框架改寫


現有項目是利用C#的socket與PLC進行實時通訊,PLC有兩種通訊模式——常規采集&高頻采集。

其中常規采集大概在10ms左右發送一次數據,高頻采集大概在2ms左右發送一次數據。

現有代碼框架:在與PLC進行連接時,通過建立委托並創建線程的方式,來循環讀取數據

//創建委托
public delegate void PLC_HD_Receive(byte[] recv_data);

public PLC_HD_Receive PLC_Recv_Delegate_HD;

//給委托綁定方法
PLC_Recv_Delegate_HD = new PLC_HD_Receive(PLC_Receive_Callback_HD);

//創建線程 PLC_Thread_HD
= new Thread(new ThreadStart(PLC_ReadThread_HD));
PLC_Thread_HD.IsBackground
= true; PLC_Thread_HD.Start();
//在線程內調用委托
this.BeginInvoke(this.PLC_Recv_Delegate_HD, new Object[] { recv_buffer_hd });

 只要連接PLC成功后,會一直在后台讀取PLC發送來的數據,並解析數據

現有問題:實時性和數據完整性不夠,有些操作會導致socket斷掉連接。

計划:改寫現有代碼框架,加深對通訊的理解,和對實時數據流的處理。                  2019-5-22

**************************************************************************************************************************************************

思路:原有框架讀取數據使用的是同步通信,出錯時反饋TimeOut錯誤,先准備改成異步通信

 1                     SocketError socket_error;
 2 
 3                     while (total_length < recv_buffer_len_hd)
 4                     {
 5                        //同步接收數據
 6                         ret_length = m_socket_hd.Receive(recv_buffer_hd, total_length, data_left, SocketFlags.None, out socket_error);
 7                         if (socket_error == SocketError.TimedOut || socket_error == SocketError.Shutdown || socket_error == SocketError.ConnectionAborted || ret_length == 0)
 8                         {
 9                             // 網絡不正常,委托退出接收線程
10                             thread_id = 1;
11                             this.Invoke(this.PLC_ExitThread_Delegate_HD, new Object[] { thread_id });
12                             return;
13                         }
14                         total_length += ret_length;
15                         data_left -= ret_length;
16                     }  

控制台異步輸出數據

首先搭建一個簡單的winform窗口demo,實現控制台異步輸出數據

此處參考鏈接:https://blog.csdn.net/smartsmile2012/article/details/71172450 異步接收

但網上搜到的大部分都是服務器接收,項目上的應用是客戶端接收,做了一點修改

搭建的過程中遇到了winform無法直接控制台輸出,需要引用AllocConsole()和FreeConsole()

此處參考鏈接:https://blog.csdn.net/b510030/article/details/52621312 WinForm添加Console

在反復點擊按鈕的過程中發現,AllocConsole()最好在窗口構造函數中使用,否則多次調用AllocConsole()會導致Console.Readkey()報錯

  1     public partial class Form1 : Form
  2     {
  3         //winform調用console窗口
  4         [DllImport("Kernel32.dll")]
  5         public static extern Boolean AllocConsole();
  6 
  7         [DllImport("Kernel32.dll")]
  8         public static extern Boolean FreeConsole();
  9         //socket模塊
 10         IPAddress ip;
 11         Socket m_sokcet;
 12         IPEndPoint local_endpoint;
 13         byte[] buffer;
19 public Form1() 20 { 21 buffer = new byte[8]; 22 InitializeComponent();
25
AllocConsole(); 26 } 27 28 private void button1_Click(object sender, EventArgs e) 29 { 30 ip = IPAddress.Parse("127.0.0.1"); 31 local_endpoint = new IPEndPoint(ip, 60000); 32 m_sokcet = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 33 m_sokcet.Connect(local_endpoint); 34 m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet); 35 Console.ReadKey(); 36 } 71 void ReceiveCallback(IAsyncResult result) 72 { 73 Socket m_sokcet = (Socket)result.AsyncState; 74 m_sokcet.EndReceive(result); 75 result.AsyncWaitHandle.Close();
91
Console.WriteLine("收到消息:{0}", Encoding.ASCII.GetString(buffer));
94 //清空數據,重新開始異步接收 95 buffer = new byte[buffer.Length]; 96 m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet); 97 } 98 }

服務端利用socketTool調試工具,發送數據后看控制台窗口的刷新情況,測試結果如下:

測試結果OK

**************************************************************************************************************************************************

 流式數據框架構思

 此處參考鏈接:https://www.csdn.net/article/2014-06-12/2820196-Storm 實時計算

二.  實時計算的相關技術
主要分為三個階段(大多是日志流):
數據的產生與收集階段、傳輸與分析處理階段、存儲對對外提供服務階段

 

鏈接內說的是大數據和流式處理框架Storm,項目上還遠遠達不到大數據級別,所以只是參考一下思路。

數據的產生:PLC,數據的接受:Socket,數據的存儲:隊列,數據的分析處理:解析數據,數據的對外服務:刷新UI

框架思路有了,接下來就是具體實現

**************************************************************************************************************************************************

生產者-消費者模式和隊列

此處參考鏈接:https://www.cnblogs.com/samgk/p/4772806.html 隊列

 1         //隊列模塊
 2         readonly static object _locker = new object();
 3         Queue<byte[]> _tasks = new Queue<byte[]>();
 4         EventWaitHandle _wh = new AutoResetEvent(false);
 5         Thread _worker;
 6 
 7         //窗口初始化時開始消費者線程
 8         public Form1()
 9         {
10             buffer = new byte[8];
11             InitializeComponent();
12             _worker = new Thread(Work);
13             _worker.Start();
14             AllocConsole();
15         }
16 
17        //加了鎖和信號量
18         void Work()
19         {
20             while (true)
21             {
22                 byte[] work = null;
23                 lock (_locker)
24                 {
25                     if (_tasks.Count > 0)
26                     {
27                         work = _tasks.Dequeue(); // 有任務時,出列任務
28 
29                         if (work == null)  // 退出機制:當遇見一個null任務時,代表任務結束
30                             return;
31                     }
32                 }
33 
34                 if (work != null)
35                     SaveData(work);  // 任務不為null時,處理並保存數據
36                 else
37                     _wh.WaitOne();   // 沒有任務了,等待信號
38             }
39         }
40  
41         //在異步接收的方法中把控制台輸出修改為加入隊列
42         void EnqueueTask(byte[] task)
43         {
44             lock (_locker)
45                 _tasks.Enqueue(task);  // 向隊列中插入任務 
46 
47             _wh.Set();  // 給工作線程發信號
48         }
1                 //TODO 將收到的數據放入隊列
2                 EnqueueTask(buffer);
3                 Thread.Sleep(10);
4                 //Console.WriteLine("收到消息:{0}", Encoding.ASCII.GetString(buffer));
5                 //
1          void SaveData(byte[] buffer)
2          {
3             //從隊列中取出數據           
4             Console.WriteLine("收到消息:{0}", Encoding.ASCII.GetString(buffer));
5          }

 

這樣就把數據先存入隊列,再取出數據,通過控制台輸出數據,實現了生產者-消費者模式和隊列存儲數據            2019-5-23

**************************************************************************************************************************************************

 解析數據&刷新UI

 項目真正的業務需求是解析數據和刷新UI,所以我們需要把SaveData方法改造一下

 PLC會源源不斷的輸出數據,我們需要在接收到數據后對數據進行處理和刷新UI,不可能對每一個數據都進行處理

 而且項目不是大數據級別的,不使用數據庫存放數據,純粹的實時處理,我們需要定義一下處理數據的采集時間和UI的刷新時間

 原有框架的常規采集是16ms,高頻采集是2ms,所以在測試階段定義10ms采集一次,UI刷新500ms一次

邏輯是在最后解析&刷新時間記錄時間戳,和SaveData當前執行時間戳比較,大於10ms則解析,大於500ms則刷新

 1         int count_UI = 0;
 2         int count_Data = 0;
 3         float time_UI = 0F;
 4         float time_Data = 0F;
 5         float time_over_UI = 0F;
 6         float time_over_Data = 0F;
 7         /// <summary>處理保存</summary>
 8         bool SaveData(byte[] buffer)
 9         {
11             //從隊列中取出數據,解析並刷新UI
13             //解析數據
14             time_Data = Environment.TickCount - time_over_Data;
15             time_UI = Environment.TickCount - time_over_UI;
16             //if (time_Data > 10)//解析數據——10ms一次
17             //{
18             //解析數據函數
19             count_Data++;
23 Console.WriteLine("解析成功:{0},耗時{1}ms,序號:{2}", Encoding.ASCII.GetString(buffer), time_Data.ToString(), count_Data.ToString()); 24 time_over_Data = Environment.TickCount; 25 //} 26 27 //刷新UI——500ms一次 28 if (time_UI > 500) 29 { 30 //刷新UI函數 31 count_UI++;
33 Console.WriteLine("刷新UI:{0},耗時{1}ms,序號:{2}", Encoding.ASCII.GetString(buffer), time_UI.ToString(), count_UI.ToString()); 34 time_over_UI = Environment.TickCount; 35 } 36 Thread.Sleep(200);// 模擬數據保存
37 return true; 38 }

 

使用SockeTool發送數據100次,會看到數據被過濾到了一部分

 

測試到這里我對時間片有一點疑惑,查閱了一些資料和做了一些實際測試

 

 此處參考鏈接:https://zhidao.baidu.com/question/1051646628145878899.html 時間片

 

socket處理數據流的速度非常快,如果不加10ms的過濾則每一條數據都會顯示在控制台頁面,如果加了10ms的過濾則只顯示一部分,至於為什么大部分情況下是16ms,和線程調度有關

 

我們現在把解析數據的函數和UI調用的函數放在指定的地方就可以實測了。

**************************************************************************************************************************************************

socket粘包&服務端斷開連接異常&異步接收檢測socket通斷

1、粘包——在測試過程中發現,如果buffer的大小與每次發送的數據不一致,會發生粘包現象。

 

項目上PLC發送的數據固定為4096字節,所以和服務端保持一致即可。

 

2、服務端連接斷開——測試的另一個問題是如果服務端斷開連接,客戶端無法有效監測,回調函數會一直執行。

3、監測通斷——網上查了很多資料,利用select方法和poll方法的,試了一下沒有效果,最后采用flag的方式成功在連接異常后終止回調函數

 

EndReceive方法會反饋當前獲取到的字節數,否則沒有數據則為0,如果重復接收20次,每次延時100ms都沒有為0,則判斷為連接已斷。

項目是和PLC連接,和其他互聯網應用有一定的差異。

 1         int flag_connect = 0;
 2         void ReceiveCallback(IAsyncResult result)
 3         {
11             Socket m_sokcet = (Socket)result.AsyncState;
12             int a = m_sokcet.EndReceive(result);
13             result.AsyncWaitHandle.Close();
14             if (a == 0)
15             {
16                 if (flag_connect == 20)
17                 {
18                     flag_connect = 0;
19                     return;
20                 }
21                 flag_connect++;
22                 Thread.Sleep(100);
23             }
24             else
25             {
27                 EnqueueTask(buffer);32             }
33             //清空數據,重新開始異步接收
34             buffer = new byte[buffer.Length];
35             m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet);
36         }

 

**************************************************************************************************************************************************

解析數據連接PLC實測

 

上面的測試都是筆記本電腦上利用socketTool測試的,現在開始連接PLC做真實的數據解析測試

1、測試遇到的問題是,如何斷開解析數據線程和異步接收回調函數

一開始直接使用的是Abort方法,但是效果不好,沒有辦法再次連接

查詢相關資料后,使用flag的方式來退出線程,使用信號量的方式來結束回調函數

另外考慮到PLC是無限的數據流,對隊列的最大數量做了一個限制,如果超過1000個則停止接收

此處參考鏈接:https://blog.csdn.net/pc0de/article/details/52841458 Abort異常

此處參考鏈接:https://blog.csdn.net/shizhibuyi1234/article/details/78202647 結束線程

還有兩個線程相關的,Mark一下日后學習

https://www.cnblogs.com/doforfuture/p/6293926.html 線程池相關
https://www.cnblogs.com/wjcnet/p/6955756.html  Task

2、連接斷開過程中的,隊列內的數據處理。經過測試,最后還是采用信號量的方式

在隊列達到最大數量1000時,異步接收回調函數等待。

在隊列為空時,解析數據線程給異步接收回調函數發信號。

另外,實測Queue為空時,調用Dequeue會報錯隊列為空。

完整代碼: 

  1     public partial class Form1 : Form
  2     {
  3         //winform調用console窗口
  4         [DllImport("Kernel32.dll")]
  5         public static extern Boolean AllocConsole();
  6 
  7         [DllImport("Kernel32.dll")]
  8         public static extern Boolean FreeConsole();
  9         //socket模塊
 10         IPAddress ip;
 11         Socket m_sokcet;
 12         IPEndPoint local_endpoint;
 13         byte[] buffer;
 14         //隊列模塊
 15         readonly static object _locker = new object();
 16         Queue<byte[]> _tasks = new Queue<byte[]>();
 17         EventWaitHandle _wh;
 18         EventWaitHandle _recieve_call;
 19         Thread _worker;
 20         public Form1()
 21         {
 22             buffer = new byte[256];
 23             InitializeComponent();
 24             AllocConsole();
 25         }
 26 
 27         private void button1_Click(object sender, EventArgs e)
 28         {
 29             connect_status = true;
 30             if (_wh == null)//隊列信號量  31                 _wh = new AutoResetEvent(false);
 32             if (_recieve_call == null)//隊列滿或空信號量  33                 _recieve_call = new AutoResetEvent(false);
 34             _worker = new Thread(Work);
 35             _worker.Start();
 36             if (m_sokcet == null)
 37             {
 38                 ip = IPAddress.Parse("169.254.11.22");//TODO IP修改
 39                 local_endpoint = new IPEndPoint(ip, 2001);//TODO 端口修改
 40                 m_sokcet = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
 41                 m_sokcet.Connect(local_endpoint);
 42             }
 43             m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet);
 44             //Console.ReadKey();
 45         }
 46 
 47         bool connect_status = false;
 48         int flag_connect = 0;
 49         void ReceiveCallback(IAsyncResult result)
 50         {
 51             if (_tasks.Count > 10000)
 52             {
 53                 //TODO 區分當前連接狀態,執行wait還是return
 54                 _recieve_call.WaitOne();
 55                 //return;
 56             }
 57 
 58             Socket m_sokcet = (Socket)result.AsyncState;
 59             int a = m_sokcet.EndReceive(result);
 60             result.AsyncWaitHandle.Close();
 61             if (a == 0)//判斷是否與服務端斷開連接  62             {
 63                 if (flag_connect == 20)
 64                 {
 65                     flag_connect = 0;
 66                     return;
 67                 }
 68                 flag_connect++;
 69                 Thread.Sleep(100);
 70             }
 71             else
 72             {
 73                 //TODO 將收到的數據放入隊列
 74                 EnqueueTask(buffer);
 75                 //Thread.Sleep(1);
 76                 //Delay(1);
 77                 //Console.WriteLine("收到消息:{0}", Encoding.ASCII.GetString(buffer));
 78                 //
 79             }
 80             //清空數據,重新開始異步接收
 81             buffer = new byte[buffer.Length];
 82             m_sokcet.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), m_sokcet);
 83         }
 84 
 85         void Work()
 86         {
 87             bool result;
 88             while (connect_status)
 89             {
 90                 byte[] work = null;
 91                 lock (_locker)
 92                 {
 93                     if (_tasks.Count > 0)
 94                     {
 95                         work = _tasks.Dequeue(); // 有任務時,出列任務
 96                     }
 97                     else
 98                     {
 99                         _recieve_call.Set();
100                         //return;
101                     }
102                 }
103 
104                 if (work != null)
105                     result = SaveData(work);  // 任務不為null時,處理並保存數據
106                 else
107                     _wh.WaitOne();   // 沒有任務了,等待信號
108             }
109         }
110 
111         /// <summary>插入任務</summary>
112         void EnqueueTask(byte[] task)
113         {
114             lock (_locker)
115                 _tasks.Enqueue(task);  // 向隊列中插入任務 
116 
117             _wh.Set();  // 給工作線程發信號
118         }
119 
120         int count_UI = 0;
121         int count_Data = 0;
122         float time_UI = 0F;
123         float time_Data = 0F;
124         float time_over_UI = 0F;
125         float time_over_Data = 0F;
126         /// <summary>處理保存</summary>
127         bool SaveData(byte[] buffer)
128         {
129 
130             //TODO 從隊列中取出數據,解析並刷新UI
131 
132             //解析數據——全部解析並保存
133             time_Data = Environment.TickCount - time_over_Data;
134             time_UI = Environment.TickCount - time_over_UI;
135             //if (time_Data > 10)
136             //{
137             //解析數據函數
138             count_Data++;
139             bool result = PLC_Receive_Callback_HD(buffer);
140             //Console.WriteLine(count_Data.ToString() + "," + _tasks.Count.ToString() + "," + result.ToString());
141             //Thread.Sleep(1);
142             Console.WriteLine("解析成功:{0},耗時{1}ms,序號:{2}", Encoding.ASCII.GetString(buffer), time_Data.ToString(), count_Data.ToString());
143             time_over_Data = Environment.TickCount;
144             //}
145 
146             //刷新UI——500ms刷新一次
147             if (time_UI > 500)
148             {
149                 //刷新UI函數
150                 count_UI++;
151                 //Console.WriteLine(count_UI.ToString() + "," + _tasks.Count.ToString() + "刷新UI成功");
152                 Console.WriteLine("刷新UI:{0},耗時{1}ms,序號:{2}", Encoding.ASCII.GetString(buffer), time_UI.ToString(), count_UI.ToString());
153                 time_over_UI = Environment.TickCount;
154             }
155             return true;
156             //Thread.Sleep(200);  // 模擬數據保存
157         }
158 
159         private void button2_Click(object sender, EventArgs e)
160         {
161             connect_status = false;
162             if (_worker != null && _worker.IsAlive)
163             {
164                 _wh.Set();
165                 //_worker.Join();
166             }
167         }

 最后加入了解析數據的函數,對4096個字節解析,但是把刷新UI全部屏蔽

實測PLC_Receive_Callback_HD內900多行代碼解析數據很快

原打算采用異步調用方式調用解析數據函數,現在看來不需要,因為不涉及數據存儲

 

 

通訊框架基本改寫完成,剩下的就是把刷新UI的函數加上去

**************************************************************************************************************************************************

總結:

 

參考了網上的很多資料,實現了一個簡單的異步通訊和生產者-消費者模式加隊列存儲,實際測試效果自己還是比較滿意的

果然用輪子不如造輪子,重復造輪子是提升技術的最好方法。                                                                                        2019-5-24


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM