注意連接隊列服務器時,參數必須和服務器配置一致
private string queue;//隊列名
private bool durable;//持久化
private bool exclusive;//獨占
private bool autoDelete;//自動刪除
默認帳號guest不能遠程。
默認訪問隊列端口是5672,后台網站端口默認是15672。
1、實現發送和接收,類RabbitMQServerT

using System; using MQServer; using RabbitMQ.Client; using System.Text; using System.Configuration; using RabbitMQ.Client.Events; using Newtonsoft.Json; namespace MQServer { /// <summary> /// RabbitMQ消息隊列類 /// </summary> public class RabbitMQServerT { protected readonly Action<string, object> receive;//接收回調 private object penetrate;//接收回調透傳參數 private string queue;//隊列名 private bool durable;//持久化 private bool exclusive;//獨占 private bool autoDelete;//自動刪除 private bool isBeginInvoke;//接收后業務是否異步,異步的話消息可能在確認前被其他線程讀走,造成重復讀。//不異步就阻塞。//異步請獨占 //接收消息對象 private IConnection connection; private IModel channel; public bool IsReceive; private ConnectionFactory factory; private RabbitMQServerT() { } /// <summary> /// 使用默認配置參數 /// </summary> /// <param name="_receive">消費事件,空則不消費</param> /// <param name="_queue">消息路徑最后一層名字,可用於區分業務</param> /// <param name="_penetrate">接收回調透傳參數</param> public RabbitMQServerT(Action<string, object> _receive, string _queue = @"hello", object _penetrate = null) { queue = _queue; receive = _receive; penetrate = _penetrate; isBeginInvoke = false; durable = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_durable"].ToString());// exclusive = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_exclusive"].ToString());// autoDelete = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_autoDelete"].ToString());// factory = new ConnectionFactory(); factory.HostName = ConfigurationManager.AppSettings["RabbitMQHostName"];//RabbitMQ服務器 factory.UserName = ConfigurationManager.AppSettings["RabbitMQUserName"];//用戶名 factory.Password = ConfigurationManager.AppSettings["RabbitMQPassword"];//密碼 factory.Port = int.Parse(ConfigurationManager.AppSettings["RabbitMQPort"].ToString());// if (!string.IsNullOrWhiteSpace(ConfigurationManager.AppSettings["RabbitMQVirtualHost"])) { factory.VirtualHost = ConfigurationManager.AppSettings["RabbitMQVirtualHost"];// } } /// <summary> /// 使用手動參數 /// </summary> /// <param name="_receive">消費事件,空則不消費</param> /// <param name="_queue">消息路徑最后一層名字,可用於區分業務</param> /// <param name="_penetrate">接收回調透傳參數</param> /// <param name="factory">連接隊列服務器</param> /// <param name="durable">持久化</param> /// <param name="exclusive">獨占</param> /// <param name="autoDelete">自動刪除</param> /// <param name="isBeginInvoke">接收是否異步//異步請獨占,否則異常</param> public RabbitMQServerT(Action<string, object> _receive, string _queue, object _penetrate, ConnectionFactory factory ,bool durable,bool exclusive, bool autoDelete,bool isBeginInvoke) { queue = _queue; receive = _receive; penetrate = _penetrate; this.factory = factory; this.durable = durable; this.exclusive = exclusive; this.autoDelete = autoDelete; this.isBeginInvoke = isBeginInvoke; //異步請獨占,不然會重復讀 if (isBeginInvoke == true && exclusive == false) { throw new Exception("接收消息隊列對象RabbitMQServerT參數isBeginInvoke=true異步執行接收業務,如果要異步執行業務,請獨占該消息exclusive=true,否則會被其他線程重復讀取。"); } } /// <summary> /// 發送消息 /// </summary> /// <param name="message"></param> public void Send(string message) { //發送消息隊列 try { using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue, durable, exclusive, autoDelete, null);//創建一個消息隊列 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", queue, null, body); //開始傳遞 //TLogHelper.Info(message, "RabbitMQServerTSend");//發送的內容寫進TXT } } } catch (Exception ex) { TLogHelper.Error(ex.Message, "發送消息隊列異常RabbitMQServerTSend:\n" + message); } } /// <summary> /// 發送消息 /// </summary> /// <param name="message"></param> public void Send(RabbitMQMsgModel model) { //發送消息隊列 string message = JsonConvert.SerializeObject(model); Send(message); } /// <summary> /// 進行接收消息隊列 /// </summary> public void Receive() { if (receive == null) { return; } IsReceive = true; try { connection = factory.CreateConnection(); channel = connection.CreateModel(); channel.QueueDeclare(queue, durable, exclusive, autoDelete, null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { try { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); //接收后業務 if (isBeginInvoke) { receive?.BeginInvoke(message, penetrate,(e)=>{ //確認消息 channel.BasicAck(ea.DeliveryTag, false); },null); } else { receive?.Invoke(message, penetrate); //確認消息 channel.BasicAck(ea.DeliveryTag, false); } } catch (Exception ex) { TLogHelper.Error(ex.Message, "接收消息隊列業務異常Received:"+queue); } finally { //再次生成接收 Receive(); } }; channel.BasicConsume(queue, true, consumer); } catch (Exception ex) { TLogHelper.Error(ex.Message, "接收消息隊列異常Receive"); } } /// <summary> /// 取消接收 /// </summary> public void EndReceive() { IsReceive=false; channel.Dispose(); connection.Dispose(); } } }
消息格式RabbitMQMsgModel

namespace MQServer { public class RabbitMQMsgModel { /// <summary> /// 業務名 /// </summary> public string BLLName { get; set; } /// <summary> /// 業務數據 /// </summary> public object Data { get; set; } } }
2、BLL實現消息業務
BaseMQBLL

using System; namespace MQServer { /// <summary> /// 使用隊列業務基類 /// </summary> public abstract class BaseMQBLL : IDisposable { public bool IsReceive { get { return MQ.IsReceive; } } protected readonly RabbitMQServerT MQ; private BaseMQBLL(){ } protected BaseMQBLL(string queue,object _penetrate) { MQ = new MQServer.RabbitMQServerT((string source, object o) => { try { ReceiveBack(source, _penetrate); ////test //throw new Exception("測試消息異常"); } catch (Exception) { throw; } }, queue, _penetrate: null); } /// <summary> /// 開啟接收 /// </summary> public void Receive() { MQ.Receive(); } /// <summary> /// 關閉接收 /// </summary> public void EndReceive() { MQ.EndReceive(); } /// <summary> /// 聲明必須重寫的接收回調 /// </summary> /// <param name="source"></param> /// <param name="receiveO"></param> protected abstract void ReceiveBack(string source, object receiveO); public void Dispose() { EndReceive(); } } }
MQTestHello: BaseMQBLL

using MQServer; using Newtonsoft.Json; namespace BLL { public class MQTestHello : BaseMQBLL { public MQTestHello() : base("hello", null) { } /// <summary> /// 重寫接收回調 /// </summary> /// <param name="source"></param> /// <param name="receiveO"></param> protected override void ReceiveBack(string source, object receiveO) { //解析source,根據source中的BLLName方法名,執行不同業務 RabbitMQMsgModel model = JsonConvert.DeserializeObject<RabbitMQMsgModel>(source); switch (model.BLLName) { case "Hello": Hello(model.Data); break; default: break; } } /// <summary> /// 發送Hello消息 /// </summary> public void SendHello(string msg) { MQ.Send(new RabbitMQMsgModel() { BLLName = "Hello", Data = msg }); } /// <summary> /// 接收到Hello消息回調 /// </summary> /// <param name="data"></param> public void Hello(object data) { TLogHelper.Info(JsonConvert.SerializeObject(data), "讀取消息在MQTestHello"); } } }
3、記錄日志
TLogHelper

using Newtonsoft.Json; using System; using System.IO; using System.Messaging; using System.Text; namespace MQServer.Log { public class TLogHelper { public static object _lock = new object(); public static void MQ(Message myMessage, string detail = "") { string msg = JsonConvert.SerializeObject(myMessage.Body); Write(msg, detail, "MessageQueue"); } public static void Info(string msg, string detail = "") { Write(msg, detail, "Info"); } public static void Info(object msg, string detail = "") { Write(JsonConvert.SerializeObject(msg), detail, "Info"); } public static void Error(string msg, string detail = "") { Write(msg, detail, "Error"); } private static void Write(string msg,string detail="", string title = "Info") { DateTime now = DateTime.Now; string logPath = System.Configuration.ConfigurationManager.AppSettings["MQServerTLogPath"]; if (!Directory.Exists(logPath)) { Directory.CreateDirectory(logPath); } logPath += now.ToString("yyyyMMdd") + ".txt"; lock (_lock) { FileStream fs = new FileStream(@logPath, FileMode.OpenOrCreate, FileAccess.Write); StreamWriter m_streamWriter = new StreamWriter(fs); m_streamWriter.BaseStream.Seek(0, SeekOrigin.End); m_streamWriter.WriteLine(); m_streamWriter.WriteLine(now.ToString("yyyyMMddHHmmssfff") + " " + title); if (!string.IsNullOrWhiteSpace(detail)) { m_streamWriter.WriteLine(detail); } m_streamWriter.WriteLine(msg); m_streamWriter.Flush(); m_streamWriter.Close(); fs.Close(); } } public static string Read() { string res = ""; string logPath = System.Configuration.ConfigurationManager.AppSettings["MQServerTLogPath"]; logPath += DateTime.Now.ToString("yyyyMMdd") + ".txt"; lock (_lock) { StreamReader fs = new StreamReader(@logPath, Encoding.UTF8); res = fs.ReadToEnd(); fs.Dispose(); } return res; } } }
4、Form窗體測試
RabbitMQForm : Form

using BLL; using Newtonsoft.Json; using System; using System.Windows.Forms; namespace WinFormActiveMQ { public partial class RabbitMQForm : Form { static MQTestHello hello; static TCPTestHello tcpHello; static TCPTestHello tcpHello2; private Label la_main; private Timer tim; private System.Threading.Thread thrListener; private System.Threading.Thread thrListener2; public RabbitMQForm() { InitializeComponent(); la_main = new Label(); la_main.Name = "la_main"; la_main.Width = 282; la_main.TabIndex = 0; Controls.Add(la_main); tim = new Timer(); tim.Interval = 1000; tim.Tick += CheckReceive; tim.Start(); } private void Form1_Load(object sender, EventArgs e) { textBox1.ScrollBars = ScrollBars.Vertical; la_main.Text = "RabbitMQ消息隊列,點擊開啟接收。"; TCPIp1.Text = "127.0.0.1"; TCPPort1.Text = "90"; TCPIp2.Text = "127.0.0.1"; TCPPort2.Text = "91"; hello = new MQTestHello(); } /// <summary> /// 守護線程 /// 檢測接收線程是否還在,不在了重新接收 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void CheckReceive(object sender, EventArgs e) { //測試Hello隊列 if (TestHello.Checked) { if (hello!=null && !hello.IsReceive) { hello.Receive(); } } if (TCPHello.Checked) { if (tcpHello != null && !tcpHello.IsListening) { thrListener = tcpHello.ListenStart(ListenClientBack); } } if (TCPListen2.Checked) { if (tcpHello2 != null && !tcpHello2.IsListening) { thrListener2 = tcpHello2.ListenStart(ListenClientBack); } } } /// <summary> /// 開啟接收Hello隊列,並發送一條測試消息 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void TestHello_CheckedChanged(object sender, EventArgs e) { if (TestHello.Checked) { //開啟接收 if (!hello.IsReceive) { hello.Receive(); } //測試發送 hello.SendHello("測試第一消息"); } else { //關閉接收 hello.EndReceive(); } } /// <summary> /// 開啟接收TCP請求,hello業務的 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void TCPHello_CheckedChanged(object sender, EventArgs e) { //設置ip端口輸入 TCPIp1.Enabled = !TCPHello.Checked; TCPPort1.Enabled = !TCPHello.Checked; if (TCPHello.Checked) { int port = 0; try { port = int.Parse(TCPPort1.Text); } catch (Exception) { TCPHello.Checked = !TCPHello.Checked; TCPIp1.Enabled = !TCPHello.Checked; TCPPort1.Enabled = !TCPHello.Checked; MessageBox.Show("請輸入正確端口號"); return; } tcpHello = new TCPTestHello(TCPIp1.Text, port, "", port); ///開啟監聽 thrListener = tcpHello.ListenStart(ListenClientBack); } else { //關閉監聽 tcpHello.CloseListen(); } } /// <summary> /// 收到終端的TCP請求后回調顯示 /// </summary> private string ListenClientBack(object obj) { if (obj == null) { //監聽執行回調出錯 changeListenText("\r\n 監聽執行回調出錯"); //用選擇框改變事件關閉監聽 TCPHello_CheckedChanged(null,null); return ""; } string res = JsonConvert.SerializeObject(obj); changeListenText("\r\n收到終端的請求在Form:" + res); return res; } //修改主線程文本框值 private delegate void SetTextCallback(string sssext); /// <summary> /// 修改主線程文本框值 /// </summary> /// <param name="text"></param> private void changeListenText(string text) { if (this.textBox1.InvokeRequired) //InvokeRequired會比較調用線程ID和創建線程ID { //如果它們不相同則返回true SetTextCallback sss = new SetTextCallback(changeListenText);//類似於重載,調用自己 this.Invoke(sss, new object[] { text }); } else { this.textBox1.Text = text + this.textBox1.Text; } } /// <summary> /// 測試發送TCP請求 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void TestRequestTcp_Click(object sender, EventArgs e) { try { if (tcpHello == null) { MessageBox.Show("請先打開監聽"); return; } tcpHello.SendHello("測試發送TCP請求:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff")); } catch (Exception) { } } /// <summary> /// 監聽2 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void TCPListen2_CheckedChanged(object sender, EventArgs e) { //設置ip端口輸入 TCPIp2.Enabled = !TCPListen2.Checked; TCPPort2.Enabled = !TCPListen2.Checked; if (TCPListen2.Checked) { int port = 0; try { port = int.Parse(TCPPort2.Text); } catch (Exception) { TCPListen2.Checked = !TCPListen2.Checked; TCPIp2.Enabled = !TCPListen2.Checked; TCPPort2.Enabled = !TCPListen2.Checked; MessageBox.Show("請輸入正確端口號"); return; } ///開啟監聽2 tcpHello2 = new TCPTestHello(TCPIp2.Text, port, "", port); thrListener2 = tcpHello2.ListenStart(ListenClientBack2); } else { tcpHello2.CloseListen(); } } /// <summary> /// 收到終端的TCP請求后回調顯示2 /// </summary> private string ListenClientBack2(object obj) { if (obj == null) { //監聽執行回調出錯 changeListenText("\r\n監聽2執行回調出錯"); //用選擇框改變事件關閉監聽 TCPListen2_CheckedChanged(null, null); return ""; } string res = JsonConvert.SerializeObject(obj); changeListenText("\r\n收到終端2的請求在Form:" + res); return res; } private void SentTCP2_Click(object sender, EventArgs e) { try { if (tcpHello2 == null) { MessageBox.Show("請先打開監聽2"); return; } tcpHello2.SendHello("測試發送TCP請求222:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff")); } catch (Exception) { } } } }
RabbitMQForm

namespace WinFormActiveMQ { partial class RabbitMQForm { /// <summary> /// Required designer variable. /// </summary> private System.ComponentModel.IContainer components = null; /// <summary> /// Clean up any resources being used. /// </summary> /// <param name="disposing">true if managed resources should be disposed; otherwise, false.</param> protected override void Dispose(bool disposing) { if (disposing && (components != null)) { components.Dispose(); } base.Dispose(disposing); } #region Windows Form Designer generated code /// <summary> /// Required method for Designer support - do not modify /// the contents of this method with the code editor. /// </summary> private void InitializeComponent() { this.TestHello = new System.Windows.Forms.CheckBox(); this.TCPHello = new System.Windows.Forms.CheckBox(); this.textBox1 = new System.Windows.Forms.TextBox(); this.TestRequestTcp = new System.Windows.Forms.Button(); this.TCPListen2 = new System.Windows.Forms.CheckBox(); this.TCPIp1 = new System.Windows.Forms.TextBox(); this.label1 = new System.Windows.Forms.Label(); this.TCPPort1 = new System.Windows.Forms.TextBox(); this.label2 = new System.Windows.Forms.Label(); this.TCPIp2 = new System.Windows.Forms.TextBox(); this.TCPPort2 = new System.Windows.Forms.TextBox(); this.SentTCP2 = new System.Windows.Forms.Button(); this.SuspendLayout(); // // TestHello // this.TestHello.AutoSize = true; this.TestHello.Location = new System.Drawing.Point(12, 12); this.TestHello.Name = "TestHello"; this.TestHello.Size = new System.Drawing.Size(129, 19); this.TestHello.TabIndex = 0; this.TestHello.Text = "接收隊列Hello"; this.TestHello.UseVisualStyleBackColor = true; this.TestHello.CheckedChanged += new System.EventHandler(this.TestHello_CheckedChanged); // // TCPHello // this.TCPHello.AutoSize = true; this.TCPHello.Location = new System.Drawing.Point(216, 77); this.TCPHello.Name = "TCPHello"; this.TCPHello.Size = new System.Drawing.Size(123, 19); this.TCPHello.TabIndex = 1; this.TCPHello.Text = "監聽TCPHello"; this.TCPHello.UseVisualStyleBackColor = true; this.TCPHello.CheckedChanged += new System.EventHandler(this.TCPHello_CheckedChanged); // // textBox1 // this.textBox1.Location = new System.Drawing.Point(13, 248); this.textBox1.Multiline = true; this.textBox1.Name = "textBox1"; this.textBox1.Size = new System.Drawing.Size(427, 160); this.textBox1.TabIndex = 2; // // TestRequestTcp // this.TestRequestTcp.Location = new System.Drawing.Point(345, 73); this.TestRequestTcp.Name = "TestRequestTcp"; this.TestRequestTcp.Size = new System.Drawing.Size(122, 23); this.TestRequestTcp.TabIndex = 3; this.TestRequestTcp.Text = "發送TCP請求"; this.TestRequestTcp.UseVisualStyleBackColor = true; this.TestRequestTcp.Click += new System.EventHandler(this.TestRequestTcp_Click); // // TCPListen2 // this.TCPListen2.AutoSize = true; this.TCPListen2.Location = new System.Drawing.Point(216, 106); this.TCPListen2.Name = "TCPListen2"; this.TCPListen2.Size = new System.Drawing.Size(67, 19); this.TCPListen2.TabIndex = 4; this.TCPListen2.Text = "監聽2"; this.TCPListen2.UseVisualStyleBackColor = true; this.TCPListen2.CheckedChanged += new System.EventHandler(this.TCPListen2_CheckedChanged); // // TCPIp1 // this.TCPIp1.Location = new System.Drawing.Point(13, 75); this.TCPIp1.Name = "TCPIp1"; this.TCPIp1.Size = new System.Drawing.Size(114, 25); this.TCPIp1.TabIndex = 5; // // label1 // this.label1.AutoSize = true; this.label1.Location = new System.Drawing.Point(12, 57); this.label1.Name = "label1"; this.label1.Size = new System.Drawing.Size(53, 15); this.label1.TabIndex = 6; this.label1.Text = "請求ip"; // // TCPPort1 // this.TCPPort1.Location = new System.Drawing.Point(144, 74); this.TCPPort1.Name = "TCPPort1"; this.TCPPort1.Size = new System.Drawing.Size(66, 25); this.TCPPort1.TabIndex = 7; // // label2 // this.label2.AutoSize = true; this.label2.Location = new System.Drawing.Point(141, 56); this.label2.Name = "label2"; this.label2.Size = new System.Drawing.Size(112, 15); this.label2.TabIndex = 8; this.label2.Text = "請求和監聽端口"; // // TCPIp2 // this.TCPIp2.Location = new System.Drawing.Point(12, 106); this.TCPIp2.Name = "TCPIp2"; this.TCPIp2.Size = new System.Drawing.Size(114, 25); this.TCPIp2.TabIndex = 9; // // TCPPort2 // this.TCPPort2.Location = new System.Drawing.Point(144, 106); this.TCPPort2.Name = "TCPPort2"; this.TCPPort2.Size = new System.Drawing.Size(66, 25); this.TCPPort2.TabIndex = 10; // // SentTCP2 // this.SentTCP2.Location = new System.Drawing.Point(345, 108); this.SentTCP2.Name = "SentTCP2"; this.SentTCP2.Size = new System.Drawing.Size(122, 23); this.SentTCP2.TabIndex = 11; this.SentTCP2.Text = "SentTCP2"; this.SentTCP2.UseVisualStyleBackColor = true; this.SentTCP2.Click += new System.EventHandler(this.SentTCP2_Click); // // RabbitMQForm // this.ClientSize = new System.Drawing.Size(498, 417); this.Controls.Add(this.SentTCP2); this.Controls.Add(this.TCPPort2); this.Controls.Add(this.TCPIp2); this.Controls.Add(this.label2); this.Controls.Add(this.TCPPort1); this.Controls.Add(this.label1); this.Controls.Add(this.TCPIp1); this.Controls.Add(this.TCPListen2); this.Controls.Add(this.TestRequestTcp); this.Controls.Add(this.textBox1); this.Controls.Add(this.TCPHello); this.Controls.Add(this.TestHello); this.Name = "RabbitMQForm"; this.Text = "RabbitMQForm"; this.Load += new System.EventHandler(this.Form1_Load); this.ResumeLayout(false); this.PerformLayout(); } #endregion private System.Windows.Forms.CheckBox TestHello; private System.Windows.Forms.CheckBox TCPHello; private System.Windows.Forms.TextBox textBox1; private System.Windows.Forms.Button TestRequestTcp; private System.Windows.Forms.CheckBox TCPListen2; private System.Windows.Forms.TextBox TCPIp1; private System.Windows.Forms.Label label1; private System.Windows.Forms.TextBox TCPPort1; private System.Windows.Forms.Label label2; private System.Windows.Forms.TextBox TCPIp2; private System.Windows.Forms.TextBox TCPPort2; private System.Windows.Forms.Button SentTCP2; } }
5、默認配置
<appSettings> <add key="MQServerTLogPath" value="d:/WinFormActiveMQLog/" /> <add key="RabbitMQHostName" value="192.168.0.233" /> <add key="RabbitMQUserName" value="admin" /> <add key="RabbitMQPassword" value="123456" /> <add key="RabbitMQPort" value="5672" /> <add key="RabbitMQVirtualHost" value="" /> <add key="RabbitMQ_durable" value="true" /> <add key="RabbitMQ_exclusive" value="false" /> <add key="RabbitMQ_autoDelete" value="false" /> </appSettings>
TCP監聽demo
6、TCP監聽類

using MQServer; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading.Tasks; namespace TCPServer { public class TCPServerT { /// <summary> /// 聲明監聽回調委托類型 /// </summary> /// <param name="s"></param> /// <returns></returns> public delegate object ListenBackDelegate(object s); /// <summary> /// 監聽狀態 /// </summary> public bool IsListening = false; static Dictionary<string, TCPServerT> tcps;//ip加端口唯一 string ip; int port; private TcpClient tcpClient; private Socket listener; private TCPServerT() { } private TCPServerT(string ip, int port) { this.ip = ip; this.port = port; } /// <summary> /// 得到TCP服務實例 /// </summary> /// <param name="ip"></param> /// <param name="port"></param> /// <returns></returns> public static TCPServerT Instance(string ip, int port) { if (tcps == null) { tcps = new Dictionary<string, TCPServerT>(); } string key = ip + port; if (tcps.Keys.Contains(key)) { return tcps[key]; } else { TCPServerT nowTcpSetvice = new TCPServerT(ip, port); tcps.Add(key, nowTcpSetvice); return nowTcpSetvice; } } /// <summary> /// 發送TCP消息 /// </summary> /// <param name="data"></param> public void Request(string data) { tcpClient = new TcpClient(); tcpClient.Connect(IPAddress.Parse(ip), port); NetworkStream ntwStream = tcpClient.GetStream(); if (ntwStream.CanWrite) { Byte[] bytSend = Encoding.UTF8.GetBytes(data); ntwStream.Write(bytSend, 0, bytSend.Length); } else { ntwStream.Close(); tcpClient.Close(); throw new Exception("無法寫入互數據流"); } ntwStream.Close(); tcpClient.Close(); } /// <summary> /// 發送TCP消息 /// </summary> /// <param name="data"></param> public void Request(TCPServiceMsgModel model) { Request(JsonConvert.SerializeObject(model)); } /// <summary> /// 監聽數據開始 /// </summary> /// <param name="back">兩個監聽回調都會執行</param> /// <param name="back2">正常時調用back2(obj),失敗時調用back2(null)</param> public void Listen(ListenBackDelegate back, ListenBackDelegate back2=null) { if (IsListening) { throw new Exception("端口"+ port +"已處於監聽狀態,線程中只能存在一個Ip端口連接。"); } IsListening = true; listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); IPAddress ipAddress; if (string.IsNullOrWhiteSpace(ip)) { ipAddress = IPAddress.Any; } else { ipAddress = IPAddress.Parse(ip); } listener.Bind(new IPEndPoint(ipAddress, port)); //不斷監聽端口 while (IsListening) { try { listener.Listen(0); Socket socket = listener.Accept(); NetworkStream ntwStream = new NetworkStream(socket); StreamReader strmReader = new StreamReader(ntwStream); var objs = new object[] { strmReader.ReadToEnd() }; try { back.Invoke(objs); if (back2 != null) { back2.Invoke(objs); } } catch (Exception ex) { //回調失敗時,執行back2傳空,判斷為空處理回調失敗 if (back2 != null) { back2.Invoke(null); } TLogHelper.Error(ex.Message, "回調失敗"); ////執行失敗時是否終止監聽 //CloseListen(); throw ex; } finally { socket.Close(); } } catch (Exception) { } } CloseListen(); } /// <summary> /// 關閉監聽 /// </summary> public void CloseListen () { if (listener != null) { try { listener.Shutdown(SocketShutdown.Both); } catch (Exception) { } listener.Dispose(); listener.Close(); } IsListening = false; } } }
數據格式類

namespace TCPServer { public class TCPServiceMsgModel { /// <summary> /// 業務名 /// </summary> public string BLLName { get; set; } /// <summary> /// 業務數據 /// </summary> public object Data { get; set; } } }
7、BLL業務
BaseTCPBLL

using System; using System.Threading; namespace TCPServer { public abstract class BaseTCPBLL: IDisposable { public bool IsListening { get { if (tcp == null) { return false; } return tcp.IsListening; } } private TCPServerT tcp; private TCPServerT tcpSend; private TCPServerT.ListenBackDelegate back2;//啟動監聽動作的線程的回調,由啟動監聽方法傳入 private Thread thrListener; private BaseTCPBLL(){} /// <summary> /// /// </summary> /// <param name="requestIp">請求ip</param> /// <param name="requestPort">請求端口</param> /// <param name="listenIp">監聽ip</param> /// <param name="listenPort">監聽端口</param> protected BaseTCPBLL(string requestIp,int requestPort, string listenIp,int listenPort) { //實例化Tcp服務 tcp = TCPServerT.Instance(listenIp, listenPort); tcpSend = TCPServerT.Instance(requestIp, requestPort); } #region TCP服務 /// <summary> /// 開始監聽客戶請求處理業務 /// ,正常時調用back2(obj),失敗時調用back2(null) /// </summary> public Thread ListenStart(TCPServerT.ListenBackDelegate back2 = null) { if (IsListening) { return thrListener; } this.back2 = back2; thrListener = new Thread(new ThreadStart(Listen)); thrListener.Start(); return thrListener; } /// <summary> /// 監聽執行 /// </summary> private void Listen() { tcp.Listen(ListenClientBack, back2); } /// <summary> /// 關閉監聽 /// </summary> public void CloseListen() { tcp.CloseListen(); } /// <summary> /// 收到終端的請求后回調 /// </summary> protected abstract string ListenClientBack(object obj); #endregion /// <summary> /// 發送請求 /// </summary> protected void Request(TCPServiceMsgModel model) { tcpSend.Request(model); } public void Dispose() { tcp.CloseListen(); tcpSend.CloseListen(); } } }
TCPTestHello:BaseTCPBLL

using MQServer; using Newtonsoft.Json; using TCPServer; namespace BLL { public class TCPTestHello:BaseTCPBLL { /// <summary> /// /// </summary> /// <param name="requestIp">請求ip,本機"127.0.0.1"</param> /// <param name="requestPort">請求端口</param> /// <param name="listenIp">監聽ip,本機""</param> /// <param name="listenPort">監聽端口</param> public TCPTestHello(string requestIp, int requestPort, string listenIp, int listenPort) :base(requestIp, requestPort, listenIp, listenPort) { } #region TCP服務 /// <summary> /// 收到終端的請求后回調 /// </summary> protected override string ListenClientBack(object obj) { var res = JsonConvert.SerializeObject(obj); //解析source,根據source中的BLLName方法名,執行不同業務 TCPServiceMsgModel model; try { res = JsonConvert.DeserializeObject<string[]>(res)[0]; model = JsonConvert.DeserializeObject<TCPServiceMsgModel>(res); } catch (System.Exception) { throw new System.Exception("TCP接收到對象不是本模塊接受的格式!"+ res); } switch (model.BLLName) { case "Hello": Hello(model.Data); break; default: break; } return res; } #endregion /// <summary> /// 發送Hello請求 /// </summary> public void SendHello(object data) { Request(new TCPServiceMsgModel() { BLLName = "Hello", Data = data }); } public void Hello(object data) { TLogHelper.Info(JsonConvert.SerializeObject(data), "接收到消息在TCPTestHello"); } } }