在分布式調度系統中,如果要實現調度服務器與多台計算節點服務器之間通信,采用socket來實現是一種實現方式,當然我們也可以通過數據存儲任務,子節點來完成任務,但是往往使用數據作為任務存儲都需要定制開發,要維護數據庫中任務記錄狀態等等。開發的東西還是有點多,而且還不夠靈活。因此,我個人是比較偏向於使用socket來實現任務的調度工作。原因:使用socket實現調度比較靈活,而且擴展性都比較好。
實現思路:調度服務器要實現調度工作,它必須與所有計算節點之間建立連接。而且他需要知道每台計算節點的任務狀況,因此服務器節點必須存儲與所有計算節點的socket連接對象。
在客戶端唯一需要知道的就是它歸屬的調度服務器的通信IP和端口,因此client是發送連接的主動方,由調度服務器監聽是否有client請求建立連接,當建立連接成功后,把該連接信息存儲到一個結合中以便監控client的存貨狀態及通信使用。
擴展:
由於server端是存儲了所有server與client的連接對象,因此我們是可以基於此demo的基礎上實現聊天系統:
* 每當一個與用戶發言時,是由server接收到的某個用戶的發言信息的,此時服務器端可以通過循環發送該用戶發送的信息給每個已經連接連接的用戶(排除發送者)。
Server端代碼(Window Console Project):
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading; 6 using System.Net.Sockets; 7 using System.Net; 8 9 namespace SocketServerAcceptMultipleClient 10 { 11 public class SocketServer 12 { 13 // 創建一個和客戶端通信的套接字 14 static Socket socketwatch = null; 15 //定義一個集合,存儲客戶端信息 16 static Dictionary<string, Socket> clientConnectionItems = new Dictionary<string, Socket> { }; 17 18 public static void Main(string[] args) 19 { 20 //定義一個套接字用於監聽客戶端發來的消息,包含三個參數(IP4尋址協議,流式連接,Tcp協議) 21 socketwatch = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 22 //服務端發送信息需要一個IP地址和端口號 23 IPAddress address = IPAddress.Parse("127.0.0.1"); 24 //將IP地址和端口號綁定到網絡節點point上 25 IPEndPoint point = new IPEndPoint(address, 8098); 26 //此端口專門用來監聽的 27 28 //監聽綁定的網絡節點 29 socketwatch.Bind(point); 30 31 //將套接字的監聽隊列長度限制為20 32 socketwatch.Listen(20); 33 34 //負責監聽客戶端的線程:創建一個監聽線程 35 Thread threadwatch = new Thread(watchconnecting); 36 37 //將窗體線程設置為與后台同步,隨着主線程結束而結束 38 threadwatch.IsBackground = true; 39 40 //啟動線程 41 threadwatch.Start(); 42 43 Console.WriteLine("開啟監聽。。。"); 44 Console.WriteLine("點擊輸入任意數據回車退出程序。。。"); 45 Console.ReadKey(); 46 Console.WriteLine("退出監聽,並關閉程序。"); 47 } 48 49 //監聽客戶端發來的請求 50 static void watchconnecting() 51 { 52 Socket connection = null; 53 54 //持續不斷監聽客戶端發來的請求 55 while (true) 56 { 57 try 58 { 59 connection = socketwatch.Accept(); 60 } 61 catch (Exception ex) 62 { 63 //提示套接字監聽異常 64 Console.WriteLine(ex.Message); 65 break; 66 } 67 68 //獲取客戶端的IP和端口號 69 IPAddress clientIP = (connection.RemoteEndPoint as IPEndPoint).Address; 70 int clientPort = (connection.RemoteEndPoint as IPEndPoint).Port; 71 72 //讓客戶顯示"連接成功的"的信息 73 string sendmsg = "連接服務端成功!\r\n" + "本地IP:" + clientIP + ",本地端口" + clientPort.ToString(); 74 byte[] arrSendMsg = Encoding.UTF8.GetBytes(sendmsg); 75 connection.Send(arrSendMsg); 76 77 //客戶端網絡結點號 78 string remoteEndPoint = connection.RemoteEndPoint.ToString(); 79 //顯示與客戶端連接情況 80 Console.WriteLine("成功與" + remoteEndPoint + "客戶端建立連接!\t\n"); 81 //添加客戶端信息 82 clientConnectionItems.Add(remoteEndPoint, connection); 83 84 //IPEndPoint netpoint = new IPEndPoint(clientIP,clientPort); 85 IPEndPoint netpoint = connection.RemoteEndPoint as IPEndPoint; 86 87 //創建一個通信線程 88 ParameterizedThreadStart pts = new ParameterizedThreadStart(recv); 89 Thread thread = new Thread(pts); 90 //設置為后台線程,隨着主線程退出而退出 91 thread.IsBackground = true; 92 //啟動線程 93 thread.Start(connection); 94 } 95 } 96 97 /// <summary> 98 /// 接收客戶端發來的信息,客戶端套接字對象 99 /// </summary> 100 /// <param name="socketclientpara"></param> 101 static void recv(object socketclientpara) 102 { 103 Socket socketServer = socketclientpara as Socket; 104 105 while (true) 106 { 107 //創建一個內存緩沖區,其大小為1024*1024字節 即1M 108 byte[] arrServerRecMsg = new byte[1024 * 1024]; 109 //將接收到的信息存入到內存緩沖區,並返回其字節數組的長度 110 try 111 { 112 int length = socketServer.Receive(arrServerRecMsg); 113 114 //將機器接受到的字節數組轉換為人可以讀懂的字符串 115 string strSRecMsg = Encoding.UTF8.GetString(arrServerRecMsg, 0, length); 116 117 //將發送的字符串信息附加到文本框txtMsg上 118 Console.WriteLine("客戶端:" + socketServer.RemoteEndPoint + ",time:" + GetCurrentTime() + "\r\n" + strSRecMsg + "\r\n\n"); 119 120 socketServer.Send(Encoding.UTF8.GetBytes("測試server 是否可以發送數據給client ")); 121 } 122 catch (Exception ex) 123 { 124 clientConnectionItems.Remove(socketServer.RemoteEndPoint.ToString()); 125 126 Console.WriteLine("Client Count:" + clientConnectionItems.Count); 127 128 //提示套接字監聽異常 129 Console.WriteLine("客戶端" + socketServer.RemoteEndPoint + "已經中斷連接" + "\r\n" + ex.Message + "\r\n" + ex.StackTrace + "\r\n"); 130 //關閉之前accept出來的和客戶端進行通信的套接字 131 socketServer.Close(); 132 break; 133 } 134 } 135 } 136 137 /// 138 /// 獲取當前系統時間的方法 139 /// 當前時間 140 static DateTime GetCurrentTime() 141 { 142 DateTime currentTime = new DateTime(); 143 currentTime = DateTime.Now; 144 return currentTime; 145 } 146 } 147 }
Client端代碼(Window Form Project):
1 using System; 2 using System.Collections.Generic; 3 using System.ComponentModel; 4 using System.Data; 5 using System.Drawing; 6 using System.Linq; 7 using System.Text; 8 using System.Windows.Forms; 9 using System.Threading; 10 using System.Net.Sockets; 11 using System.Net; 12 using System.Diagnostics; 13 14 namespace SocketClient 15 { 16 public partial class Main : Form 17 { 18 //創建 1個客戶端套接字 和1個負責監聽服務端請求的線程 19 Thread threadclient = null; 20 Socket socketclient = null; 21 22 public Main() 23 { 24 InitializeComponent(); 25 26 StartPosition = FormStartPosition.CenterScreen; 27 //關閉對文本框的非法線程操作檢查 28 TextBox.CheckForIllegalCrossThreadCalls = false; 29 30 this.btnSendMessage.Enabled = false; 31 this.btnSendMessage.Visible = false; 32 33 this.txtMessage.Visible = false; 34 } 35 36 private void btnConnection_Click(object sender, EventArgs e) 37 { 38 this.btnConnection.Enabled = false; 39 //定義一個套接字監聽 40 socketclient = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 41 42 //獲取文本框中的IP地址 43 IPAddress address = IPAddress.Parse("127.0.0.1"); 44 45 //將獲取的IP地址和端口號綁定在網絡節點上 46 IPEndPoint point = new IPEndPoint(address, 8098); 47 48 try 49 { 50 //客戶端套接字連接到網絡節點上,用的是Connect 51 socketclient.Connect(point); 52 this.btnSendMessage.Enabled = true; 53 this.btnSendMessage.Visible = true; 54 this.txtMessage.Visible = true; 55 } 56 catch (Exception) 57 { 58 Debug.WriteLine("連接失敗\r\n"); 59 60 this.txtDebugInfo.AppendText("連接失敗\r\n"); 61 return; 62 } 63 64 threadclient = new Thread(recv); 65 threadclient.IsBackground = true; 66 threadclient.Start(); 67 } 68 69 // 接收服務端發來信息的方法 70 void recv() 71 { 72 int x = 0; 73 //持續監聽服務端發來的消息 74 while (true) 75 { 76 try 77 { 78 //定義一個1M的內存緩沖區,用於臨時性存儲接收到的消息 79 byte[] arrRecvmsg = new byte[1024 * 1024]; 80 81 //將客戶端套接字接收到的數據存入內存緩沖區,並獲取長度 82 int length = socketclient.Receive(arrRecvmsg); 83 84 //將套接字獲取到的字符數組轉換為人可以看懂的字符串 85 string strRevMsg = Encoding.UTF8.GetString(arrRecvmsg, 0, length); 86 if (x == 1) 87 { 88 this.txtDebugInfo.AppendText("服務器:" + GetCurrentTime() + "\r\n" + strRevMsg + "\r\n\n"); 89 Debug.WriteLine("服務器:" + GetCurrentTime() + "\r\n" + strRevMsg + "\r\n\n"); 90 } 91 else 92 { 93 this.txtDebugInfo.AppendText(strRevMsg + "\r\n\n"); 94 Debug.WriteLine(strRevMsg + "\r\n\n"); 95 x = 1; 96 } 97 } 98 catch (Exception ex) 99 { 100 Debug.WriteLine("遠程服務器已經中斷連接" + "\r\n\n"); 101 Debug.WriteLine("遠程服務器已經中斷連接" + "\r\n"); 102 break; 103 } 104 } 105 } 106 107 //獲取當前系統時間 108 DateTime GetCurrentTime() 109 { 110 DateTime currentTime = new DateTime(); 111 currentTime = DateTime.Now; 112 return currentTime; 113 } 114 115 //發送字符信息到服務端的方法 116 void ClientSendMsg(string sendMsg) 117 { 118 //將輸入的內容字符串轉換為機器可以識別的字節數組 119 byte[] arrClientSendMsg = Encoding.UTF8.GetBytes(sendMsg); 120 //調用客戶端套接字發送字節數組 121 socketclient.Send(arrClientSendMsg); 122 //將發送的信息追加到聊天內容文本框中 123 Debug.WriteLine("hello...." + ": " + GetCurrentTime() + "\r\n" + sendMsg + "\r\n\n"); 124 this.txtDebugInfo.AppendText("hello...." + ": " + GetCurrentTime() + "\r\n" + sendMsg + "\r\n\n"); 125 } 126 127 private void btnSendMessage_Click(object sender, EventArgs e) 128 { 129 //調用ClientSendMsg方法 將文本框中輸入的信息發送給服務端 130 ClientSendMsg(this.txtMessage.Text.Trim()); 131 this.txtMessage.Clear(); 132 } 133 } 134 }
測試結果截圖:
server端:

client端:

代碼下載地址
鏈接:http://pan.baidu.com/s/1kVBUOD5 密碼:16ib
