【C#】【Demo】消息隊列RabbitMQ,和TCP監聽Demo。


 

注意連接隊列服務器時,參數必須和服務器配置一致

  private string queue;//隊列名
        private bool durable;//持久化
        private bool exclusive;//獨占
        private bool autoDelete;//自動刪除

默認帳號guest不能遠程。

默認訪問隊列端口是5672,后台網站端口默認是15672。

 

1、實現發送和接收,類RabbitMQServerT

using System;
using MQServer;
using RabbitMQ.Client;
using System.Text;
using System.Configuration;
using RabbitMQ.Client.Events;
using Newtonsoft.Json;

namespace MQServer
{
    /// <summary>
    /// RabbitMQ消息隊列類
    /// </summary>
    public class RabbitMQServerT 
    {
        protected readonly Action<string, object> receive;//接收回調
        private object penetrate;//接收回調透傳參數
        private string queue;//隊列名
        private bool durable;//持久化
        private bool exclusive;//獨占
        private bool autoDelete;//自動刪除
        private bool isBeginInvoke;//接收后業務是否異步,異步的話消息可能在確認前被其他線程讀走,造成重復讀。//不異步就阻塞。//異步請獨占

        //接收消息對象
        private IConnection connection;
        private IModel channel;

        public bool IsReceive;

        private ConnectionFactory factory;
        private RabbitMQServerT()
        {
        }

        /// <summary>
        /// 使用默認配置參數
        /// </summary>
        /// <param name="_receive">消費事件,空則不消費</param>
        /// <param name="_queue">消息路徑最后一層名字,可用於區分業務</param>
        /// <param name="_penetrate">接收回調透傳參數</param>
        public RabbitMQServerT(Action<string, object> _receive, string _queue = @"hello", object _penetrate = null)
        {
            queue = _queue;
            receive = _receive;
            penetrate = _penetrate;
            isBeginInvoke = false;

            durable = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_durable"].ToString());//
            exclusive = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_exclusive"].ToString());//
            autoDelete = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_autoDelete"].ToString());//

            factory = new ConnectionFactory();
            factory.HostName = ConfigurationManager.AppSettings["RabbitMQHostName"];//RabbitMQ服務器
            factory.UserName = ConfigurationManager.AppSettings["RabbitMQUserName"];//用戶名
            factory.Password = ConfigurationManager.AppSettings["RabbitMQPassword"];//密碼
            factory.Port = int.Parse(ConfigurationManager.AppSettings["RabbitMQPort"].ToString());//
            if (!string.IsNullOrWhiteSpace(ConfigurationManager.AppSettings["RabbitMQVirtualHost"]))
            {
                factory.VirtualHost = ConfigurationManager.AppSettings["RabbitMQVirtualHost"];//
            }
        }

        /// <summary>
        /// 使用手動參數
        /// </summary>
        /// <param name="_receive">消費事件,空則不消費</param>
        /// <param name="_queue">消息路徑最后一層名字,可用於區分業務</param>
        /// <param name="_penetrate">接收回調透傳參數</param>
        /// <param name="factory">連接隊列服務器</param>
        /// <param name="durable">持久化</param>
        /// <param name="exclusive">獨占</param>
        /// <param name="autoDelete">自動刪除</param>
        /// <param name="isBeginInvoke">接收是否異步//異步請獨占,否則異常</param>
        public RabbitMQServerT(Action<string, object> _receive, string _queue, object _penetrate, ConnectionFactory factory
            ,bool durable,bool exclusive, bool autoDelete,bool isBeginInvoke)
        {
            queue = _queue;
            receive = _receive;
            penetrate = _penetrate;

            this.factory = factory;
            this.durable = durable;
            this.exclusive = exclusive;
            this.autoDelete = autoDelete;
            this.isBeginInvoke = isBeginInvoke;

            //異步請獨占,不然會重復讀
            if (isBeginInvoke == true && exclusive == false) 
            {
                throw new Exception("接收消息隊列對象RabbitMQServerT參數isBeginInvoke=true異步執行接收業務,如果要異步執行業務,請獨占該消息exclusive=true,否則會被其他線程重復讀取。");
            }
        }

        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="message"></param>
        public void Send(string message)
        {
            //發送消息隊列
            try
            {
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queue, durable, exclusive, autoDelete, null);//創建一個消息隊列
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish("", queue, null, body); //開始傳遞
                        //TLogHelper.Info(message, "RabbitMQServerTSend");//發送的內容寫進TXT
                    }
                }
            }
            catch (Exception ex)
            {
                TLogHelper.Error(ex.Message, "發送消息隊列異常RabbitMQServerTSend:\n" + message);
            }
        }
        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="message"></param>
        public void Send(RabbitMQMsgModel model)
        {
            //發送消息隊列
            string message = JsonConvert.SerializeObject(model);
            Send(message);
        }

        /// <summary>
        /// 進行接收消息隊列
        /// </summary>
        public void Receive()
        {
            if (receive == null)
            {
                return;
            }
            IsReceive = true;
            try
            {
                connection = factory.CreateConnection();
                channel = connection.CreateModel();
              
                channel.QueueDeclare(queue, durable, exclusive, autoDelete, null);
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    try
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);

                        //接收后業務
                        if (isBeginInvoke)
                        {
                            receive?.BeginInvoke(message, penetrate,(e)=>{
                                //確認消息
                                channel.BasicAck(ea.DeliveryTag, false);
                            },null);
                        }
                        else 
                        {
                            receive?.Invoke(message, penetrate);
                            //確認消息
                            channel.BasicAck(ea.DeliveryTag, false);
                        }
                    }
                    catch (Exception ex)
                    {
                        TLogHelper.Error(ex.Message, "接收消息隊列業務異常Received:"+queue);
                    }
                    finally
                    {
                        //再次生成接收
                        Receive();
                    }
                };
                channel.BasicConsume(queue, true, consumer);
            }
            catch (Exception ex)
            {
                TLogHelper.Error(ex.Message, "接收消息隊列異常Receive");
            }
        }

        /// <summary>
        /// 取消接收
        /// </summary>
        public void EndReceive()
        {
            IsReceive=false;
            channel.Dispose();
            connection.Dispose();
        }
    }
}
View Code

 

消息格式RabbitMQMsgModel

namespace MQServer
{
    public class RabbitMQMsgModel
    {
        /// <summary>
        /// 業務名
        /// </summary>
        public string BLLName { get; set; }

        /// <summary>
        /// 業務數據
        /// </summary>
        public object Data { get; set; }
    }
}
View Code

 

2、BLL實現消息業務

BaseMQBLL

using System;

namespace MQServer
{
    /// <summary>
    /// 使用隊列業務基類
    /// </summary>
    public abstract class BaseMQBLL : IDisposable
    {
        public bool IsReceive { get { return MQ.IsReceive; } }

        protected readonly RabbitMQServerT MQ;

        private BaseMQBLL(){ }

        protected BaseMQBLL(string queue,object _penetrate) 
        {
            
            MQ = new MQServer.RabbitMQServerT((string source, object o) => {
                try
                {
                    ReceiveBack(source, _penetrate);
                    ////test
                    //throw new Exception("測試消息異常");
                }
                catch (Exception)
                {
                    throw;
                }
            }, queue, _penetrate: null);
        }
        /// <summary>
        /// 開啟接收
        /// </summary>
        public void Receive()
        {
            MQ.Receive();
        }

        /// <summary>
        /// 關閉接收
        /// </summary>
        public void EndReceive()
        {
            MQ.EndReceive();
        }

        /// <summary>
        /// 聲明必須重寫的接收回調
        /// </summary>
        /// <param name="source"></param>
        /// <param name="receiveO"></param>
        protected abstract void ReceiveBack(string source, object receiveO);

        public void Dispose() 
        {
            EndReceive();
        }
    }
}
View Code

 

MQTestHello: BaseMQBLL

using MQServer;
using Newtonsoft.Json;

namespace BLL
{
    public class MQTestHello : BaseMQBLL
    {
        public MQTestHello()
            : base("hello", null)
        {
        }

        /// <summary>
        /// 重寫接收回調
        /// </summary>
        /// <param name="source"></param>
        /// <param name="receiveO"></param>
        protected override void ReceiveBack(string source, object receiveO)
        {
            //解析source,根據source中的BLLName方法名,執行不同業務
            RabbitMQMsgModel model = JsonConvert.DeserializeObject<RabbitMQMsgModel>(source);

            switch (model.BLLName)
            {
                case "Hello":
                    Hello(model.Data);
                    break;
                default:
                    break;
            }
        }

        /// <summary>
        /// 發送Hello消息
        /// </summary>
        public void SendHello(string msg)
        {
            MQ.Send(new RabbitMQMsgModel() { BLLName = "Hello", Data = msg });
        }

        /// <summary>
        /// 接收到Hello消息回調
        /// </summary>
        /// <param name="data"></param>
        public void Hello(object data)
        {
            TLogHelper.Info(JsonConvert.SerializeObject(data), "讀取消息在MQTestHello");
        }
    }
}
View Code

 

3、記錄日志

TLogHelper

using Newtonsoft.Json;
using System;
using System.IO;
using System.Messaging;
using System.Text;

namespace MQServer.Log
{
    public class TLogHelper
    {
        public static object _lock = new object();
        public static void MQ(Message myMessage,  string detail = "") 
        {
            string msg = JsonConvert.SerializeObject(myMessage.Body);

            Write(msg, detail, "MessageQueue");
        }

        public static void Info(string msg, string detail = "")
        {
            Write(msg, detail, "Info");
        }
        public static void Info(object msg, string detail = "")
        {
            Write(JsonConvert.SerializeObject(msg), detail, "Info");
        }

        public static void Error(string msg, string detail = "")
        {
            Write(msg, detail, "Error");
        }

        private static void Write(string msg,string detail="", string title = "Info") 
        {
            DateTime now = DateTime.Now;
            string logPath = System.Configuration.ConfigurationManager.AppSettings["MQServerTLogPath"];

            if (!Directory.Exists(logPath))
            {
                Directory.CreateDirectory(logPath);
            }

            logPath += now.ToString("yyyyMMdd") + ".txt";
            lock (_lock)
            {
                FileStream fs = new FileStream(@logPath, FileMode.OpenOrCreate, FileAccess.Write);

                StreamWriter m_streamWriter = new StreamWriter(fs);

                m_streamWriter.BaseStream.Seek(0, SeekOrigin.End);

                m_streamWriter.WriteLine();
                m_streamWriter.WriteLine(now.ToString("yyyyMMddHHmmssfff") + " " + title);
                if (!string.IsNullOrWhiteSpace(detail))
                {
                    m_streamWriter.WriteLine(detail);
                }

                m_streamWriter.WriteLine(msg);

                m_streamWriter.Flush();

                m_streamWriter.Close();

                fs.Close();
            }
            

        }

        public static string Read() 
        {
            string res = "";
            string logPath = System.Configuration.ConfigurationManager.AppSettings["MQServerTLogPath"];
            
            logPath += DateTime.Now.ToString("yyyyMMdd") + ".txt";
            lock (_lock)
            {
                StreamReader fs = new StreamReader(@logPath, Encoding.UTF8);
                res = fs.ReadToEnd();
                fs.Dispose();
            }
            return res;
        }
    }
}
View Code

 

4、Form窗體測試

 RabbitMQForm : Form

using BLL;
using Newtonsoft.Json;
using System;
using System.Windows.Forms;

namespace WinFormActiveMQ
{
    public partial class RabbitMQForm : Form
    {
        static MQTestHello hello;
        static TCPTestHello tcpHello;
        static TCPTestHello tcpHello2;
        private Label la_main;
        private Timer tim;
        private System.Threading.Thread thrListener;
        private System.Threading.Thread thrListener2;
        public RabbitMQForm()
        {
            InitializeComponent();

            la_main = new Label();
            la_main.Name = "la_main";
            la_main.Width = 282;
            la_main.TabIndex = 0;
            Controls.Add(la_main);

            tim = new Timer();
            tim.Interval = 1000;
            tim.Tick += CheckReceive;
            tim.Start();

        }

        private void Form1_Load(object sender, EventArgs e)
        {
            textBox1.ScrollBars = ScrollBars.Vertical;
            la_main.Text = "RabbitMQ消息隊列,點擊開啟接收。";
            TCPIp1.Text = "127.0.0.1";
            TCPPort1.Text = "90";
            TCPIp2.Text = "127.0.0.1";
            TCPPort2.Text = "91";

            hello = new MQTestHello(); 
        }

        /// <summary>
        /// 守護線程
        /// 檢測接收線程是否還在,不在了重新接收
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void CheckReceive(object sender, EventArgs e)
        {
            //測試Hello隊列
            if (TestHello.Checked)
            {
                if (hello!=null && !hello.IsReceive)
                {
                    hello.Receive();
                }
            }

            if (TCPHello.Checked)
            {
                if (tcpHello != null && !tcpHello.IsListening)
                {
                    thrListener = tcpHello.ListenStart(ListenClientBack);
                }
            }
            
            if (TCPListen2.Checked)
            {
                if (tcpHello2 != null && !tcpHello2.IsListening)
                {
                    thrListener2 = tcpHello2.ListenStart(ListenClientBack);
                }
            }

        }

        /// <summary>
        /// 開啟接收Hello隊列,並發送一條測試消息
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void TestHello_CheckedChanged(object sender, EventArgs e)
        {
            if (TestHello.Checked)
            {
                //開啟接收
                if (!hello.IsReceive)
                {
                    hello.Receive();
                }

                //測試發送
                hello.SendHello("測試第一消息");
            }
            else
            {
                //關閉接收
                hello.EndReceive();
            }
        }

        /// <summary>
        /// 開啟接收TCP請求,hello業務的
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void TCPHello_CheckedChanged(object sender, EventArgs e)
        {
            //設置ip端口輸入
            TCPIp1.Enabled = !TCPHello.Checked;
            TCPPort1.Enabled = !TCPHello.Checked;

            if (TCPHello.Checked)
            {
                int port = 0;
                try
                {
                    port = int.Parse(TCPPort1.Text);
                }
                catch (Exception)
                {
                    TCPHello.Checked = !TCPHello.Checked;
                    TCPIp1.Enabled = !TCPHello.Checked;
                    TCPPort1.Enabled = !TCPHello.Checked;
                    MessageBox.Show("請輸入正確端口號");
                    return;
                }

                tcpHello = new TCPTestHello(TCPIp1.Text, port, "", port);
                ///開啟監聽
                thrListener = tcpHello.ListenStart(ListenClientBack);
            }
            else
            {
                //關閉監聽
                tcpHello.CloseListen();
            }
        }

        /// <summary>
        /// 收到終端的TCP請求后回調顯示
        /// </summary>
        private string ListenClientBack(object obj)
        {
            if (obj == null) 
            {
                //監聽執行回調出錯
                changeListenText("\r\n  監聽執行回調出錯");

                //用選擇框改變事件關閉監聽
                TCPHello_CheckedChanged(null,null);
                return "";
            }

            string res = JsonConvert.SerializeObject(obj);
            changeListenText("\r\n收到終端的請求在Form:" + res);
            return res;
        }

        //修改主線程文本框值
        private delegate void SetTextCallback(string sssext);
        /// <summary>
        /// 修改主線程文本框值
        /// </summary>
        /// <param name="text"></param>
        private void changeListenText(string text)
        {
            if (this.textBox1.InvokeRequired)   //InvokeRequired會比較調用線程ID和創建線程ID
            {   //如果它們不相同則返回true
                SetTextCallback sss = new SetTextCallback(changeListenText);//類似於重載,調用自己
                this.Invoke(sss, new object[] { text });
            }
            else
            {
                this.textBox1.Text = text + this.textBox1.Text;
            }
        }

        /// <summary>
        /// 測試發送TCP請求
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void TestRequestTcp_Click(object sender, EventArgs e)
        {
            try
            {
                if (tcpHello == null) 
                {
                    MessageBox.Show("請先打開監聽");
                    return;
                }
                tcpHello.SendHello("測試發送TCP請求:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff"));
            }
            catch (Exception)
            {
            }
        }

        /// <summary>
        /// 監聽2
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void TCPListen2_CheckedChanged(object sender, EventArgs e)
        {

            //設置ip端口輸入
            TCPIp2.Enabled = !TCPListen2.Checked;
            TCPPort2.Enabled = !TCPListen2.Checked;

            if (TCPListen2.Checked)
            {
                int port = 0;
                try
                {
                    port = int.Parse(TCPPort2.Text);
                }
                catch (Exception)
                {
                    TCPListen2.Checked = !TCPListen2.Checked;
                    TCPIp2.Enabled = !TCPListen2.Checked;
                    TCPPort2.Enabled = !TCPListen2.Checked;
                    MessageBox.Show("請輸入正確端口號");
                    return;
                }

                ///開啟監聽2 
                tcpHello2 = new TCPTestHello(TCPIp2.Text, port, "", port);
                thrListener2 = tcpHello2.ListenStart(ListenClientBack2);
                 
            }
            else
            {
                tcpHello2.CloseListen();
            }
        }

        /// <summary>
        /// 收到終端的TCP請求后回調顯示2
        /// </summary>
        private string ListenClientBack2(object obj)
        {
            if (obj == null)
            {
                //監聽執行回調出錯
                changeListenText("\r\n監聽2執行回調出錯");

                //用選擇框改變事件關閉監聽
                TCPListen2_CheckedChanged(null, null);
                return "";
            }

            string res = JsonConvert.SerializeObject(obj);
            changeListenText("\r\n收到終端2的請求在Form:" + res);
            return res;
        }

        private void SentTCP2_Click(object sender, EventArgs e)
        {
            try
            {
                if (tcpHello2 == null)
                {
                    MessageBox.Show("請先打開監聽2");
                    return;
                }
                tcpHello2.SendHello("測試發送TCP請求222:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss fff"));
            }
            catch (Exception)
            {
            }
        }
    }
}
View Code

 

RabbitMQForm

namespace WinFormActiveMQ
{
    partial class RabbitMQForm
    {
        /// <summary>
        /// Required designer variable.
        /// </summary>
        private System.ComponentModel.IContainer components = null;

        /// <summary>
        /// Clean up any resources being used.
        /// </summary>
        /// <param name="disposing">true if managed resources should be disposed; otherwise, false.</param>
        protected override void Dispose(bool disposing)
        {
            if (disposing && (components != null))
            {
                components.Dispose();
            }
            base.Dispose(disposing);
        }

        #region Windows Form Designer generated code

        /// <summary>
        /// Required method for Designer support - do not modify
        /// the contents of this method with the code editor.
        /// </summary>
        private void InitializeComponent()
        {
            this.TestHello = new System.Windows.Forms.CheckBox();
            this.TCPHello = new System.Windows.Forms.CheckBox();
            this.textBox1 = new System.Windows.Forms.TextBox();
            this.TestRequestTcp = new System.Windows.Forms.Button();
            this.TCPListen2 = new System.Windows.Forms.CheckBox();
            this.TCPIp1 = new System.Windows.Forms.TextBox();
            this.label1 = new System.Windows.Forms.Label();
            this.TCPPort1 = new System.Windows.Forms.TextBox();
            this.label2 = new System.Windows.Forms.Label();
            this.TCPIp2 = new System.Windows.Forms.TextBox();
            this.TCPPort2 = new System.Windows.Forms.TextBox();
            this.SentTCP2 = new System.Windows.Forms.Button();
            this.SuspendLayout();
            // 
            // TestHello
            // 
            this.TestHello.AutoSize = true;
            this.TestHello.Location = new System.Drawing.Point(12, 12);
            this.TestHello.Name = "TestHello";
            this.TestHello.Size = new System.Drawing.Size(129, 19);
            this.TestHello.TabIndex = 0;
            this.TestHello.Text = "接收隊列Hello";
            this.TestHello.UseVisualStyleBackColor = true;
            this.TestHello.CheckedChanged += new System.EventHandler(this.TestHello_CheckedChanged);
            // 
            // TCPHello
            // 
            this.TCPHello.AutoSize = true;
            this.TCPHello.Location = new System.Drawing.Point(216, 77);
            this.TCPHello.Name = "TCPHello";
            this.TCPHello.Size = new System.Drawing.Size(123, 19);
            this.TCPHello.TabIndex = 1;
            this.TCPHello.Text = "監聽TCPHello";
            this.TCPHello.UseVisualStyleBackColor = true;
            this.TCPHello.CheckedChanged += new System.EventHandler(this.TCPHello_CheckedChanged);
            // 
            // textBox1
            // 
            this.textBox1.Location = new System.Drawing.Point(13, 248);
            this.textBox1.Multiline = true;
            this.textBox1.Name = "textBox1";
            this.textBox1.Size = new System.Drawing.Size(427, 160);
            this.textBox1.TabIndex = 2;
            // 
            // TestRequestTcp
            // 
            this.TestRequestTcp.Location = new System.Drawing.Point(345, 73);
            this.TestRequestTcp.Name = "TestRequestTcp";
            this.TestRequestTcp.Size = new System.Drawing.Size(122, 23);
            this.TestRequestTcp.TabIndex = 3;
            this.TestRequestTcp.Text = "發送TCP請求";
            this.TestRequestTcp.UseVisualStyleBackColor = true;
            this.TestRequestTcp.Click += new System.EventHandler(this.TestRequestTcp_Click);
            // 
            // TCPListen2
            // 
            this.TCPListen2.AutoSize = true;
            this.TCPListen2.Location = new System.Drawing.Point(216, 106);
            this.TCPListen2.Name = "TCPListen2";
            this.TCPListen2.Size = new System.Drawing.Size(67, 19);
            this.TCPListen2.TabIndex = 4;
            this.TCPListen2.Text = "監聽2";
            this.TCPListen2.UseVisualStyleBackColor = true;
            this.TCPListen2.CheckedChanged += new System.EventHandler(this.TCPListen2_CheckedChanged);
            // 
            // TCPIp1
            // 
            this.TCPIp1.Location = new System.Drawing.Point(13, 75);
            this.TCPIp1.Name = "TCPIp1";
            this.TCPIp1.Size = new System.Drawing.Size(114, 25);
            this.TCPIp1.TabIndex = 5;
            // 
            // label1
            // 
            this.label1.AutoSize = true;
            this.label1.Location = new System.Drawing.Point(12, 57);
            this.label1.Name = "label1";
            this.label1.Size = new System.Drawing.Size(53, 15);
            this.label1.TabIndex = 6;
            this.label1.Text = "請求ip";
            // 
            // TCPPort1
            // 
            this.TCPPort1.Location = new System.Drawing.Point(144, 74);
            this.TCPPort1.Name = "TCPPort1";
            this.TCPPort1.Size = new System.Drawing.Size(66, 25);
            this.TCPPort1.TabIndex = 7;
            // 
            // label2
            // 
            this.label2.AutoSize = true;
            this.label2.Location = new System.Drawing.Point(141, 56);
            this.label2.Name = "label2";
            this.label2.Size = new System.Drawing.Size(112, 15);
            this.label2.TabIndex = 8;
            this.label2.Text = "請求和監聽端口";
            // 
            // TCPIp2
            // 
            this.TCPIp2.Location = new System.Drawing.Point(12, 106);
            this.TCPIp2.Name = "TCPIp2";
            this.TCPIp2.Size = new System.Drawing.Size(114, 25);
            this.TCPIp2.TabIndex = 9;
            // 
            // TCPPort2
            // 
            this.TCPPort2.Location = new System.Drawing.Point(144, 106);
            this.TCPPort2.Name = "TCPPort2";
            this.TCPPort2.Size = new System.Drawing.Size(66, 25);
            this.TCPPort2.TabIndex = 10;
            // 
            // SentTCP2
            // 
            this.SentTCP2.Location = new System.Drawing.Point(345, 108);
            this.SentTCP2.Name = "SentTCP2";
            this.SentTCP2.Size = new System.Drawing.Size(122, 23);
            this.SentTCP2.TabIndex = 11;
            this.SentTCP2.Text = "SentTCP2";
            this.SentTCP2.UseVisualStyleBackColor = true;
            this.SentTCP2.Click += new System.EventHandler(this.SentTCP2_Click);
            // 
            // RabbitMQForm
            // 
            this.ClientSize = new System.Drawing.Size(498, 417);
            this.Controls.Add(this.SentTCP2);
            this.Controls.Add(this.TCPPort2);
            this.Controls.Add(this.TCPIp2);
            this.Controls.Add(this.label2);
            this.Controls.Add(this.TCPPort1);
            this.Controls.Add(this.label1);
            this.Controls.Add(this.TCPIp1);
            this.Controls.Add(this.TCPListen2);
            this.Controls.Add(this.TestRequestTcp);
            this.Controls.Add(this.textBox1);
            this.Controls.Add(this.TCPHello);
            this.Controls.Add(this.TestHello);
            this.Name = "RabbitMQForm";
            this.Text = "RabbitMQForm";
            this.Load += new System.EventHandler(this.Form1_Load);
            this.ResumeLayout(false);
            this.PerformLayout();

        }

        #endregion

        private System.Windows.Forms.CheckBox TestHello;
        private System.Windows.Forms.CheckBox TCPHello;
        private System.Windows.Forms.TextBox textBox1;
        private System.Windows.Forms.Button TestRequestTcp;
        private System.Windows.Forms.CheckBox TCPListen2;
        private System.Windows.Forms.TextBox TCPIp1;
        private System.Windows.Forms.Label label1;
        private System.Windows.Forms.TextBox TCPPort1;
        private System.Windows.Forms.Label label2;
        private System.Windows.Forms.TextBox TCPIp2;
        private System.Windows.Forms.TextBox TCPPort2;
        private System.Windows.Forms.Button SentTCP2;
    }
}
View Code

 

5、默認配置

  <appSettings>
    <add key="MQServerTLogPath" value="d:/WinFormActiveMQLog/" />
    <add key="RabbitMQHostName" value="192.168.0.233" />
    <add key="RabbitMQUserName" value="admin" />
    <add key="RabbitMQPassword" value="123456" />
    <add key="RabbitMQPort" value="5672" />
    <add key="RabbitMQVirtualHost" value="" />
    <add key="RabbitMQ_durable" value="true" />
    <add key="RabbitMQ_exclusive" value="false" />
    <add key="RabbitMQ_autoDelete" value="false" />
  </appSettings>

 

TCP監聽demo

6、TCP監聽類

using MQServer;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace TCPServer
{
    public class TCPServerT
    {

        /// <summary>
        /// 聲明監聽回調委托類型
        /// </summary>
        /// <param name="s"></param>
        /// <returns></returns>
        public delegate object ListenBackDelegate(object s);

        /// <summary>
        /// 監聽狀態
        /// </summary>
        public bool IsListening = false;

        static Dictionary<string, TCPServerT> tcps;//ip加端口唯一
        string ip; 
        int port;
        private TcpClient tcpClient;
        private Socket listener;
        private TCPServerT()
        {
        }
        private TCPServerT(string ip, int port) 
        {
            this.ip = ip;
            this.port = port;
        }

        /// <summary>
        /// 得到TCP服務實例
        /// </summary>
        /// <param name="ip"></param>
        /// <param name="port"></param>
        /// <returns></returns>
        public static TCPServerT Instance(string ip, int port) 
        {
            if (tcps == null) 
            {
                tcps = new Dictionary<string, TCPServerT>();
            }
            string key = ip + port;
            if (tcps.Keys.Contains(key))
            {
                return tcps[key];
            }
            else 
            {
                TCPServerT nowTcpSetvice = new TCPServerT(ip, port);
                tcps.Add(key, nowTcpSetvice);
                return nowTcpSetvice;
            }
        }

        /// <summary>
        /// 發送TCP消息
        /// </summary>
        /// <param name="data"></param>
        public void Request(string data)
        {
            tcpClient = new TcpClient();
            tcpClient.Connect(IPAddress.Parse(ip), port);
            NetworkStream ntwStream = tcpClient.GetStream();
            if (ntwStream.CanWrite)
            {
                Byte[] bytSend = Encoding.UTF8.GetBytes(data);
                ntwStream.Write(bytSend, 0, bytSend.Length);
            }
            else
            {
                ntwStream.Close();
                tcpClient.Close();
                throw new Exception("無法寫入互數據流");
            }
            ntwStream.Close();
            tcpClient.Close();
        }
        /// <summary>
        /// 發送TCP消息
        /// </summary>
        /// <param name="data"></param>
        public void Request(TCPServiceMsgModel model)
        {
            Request(JsonConvert.SerializeObject(model));
        }

        /// <summary>
        /// 監聽數據開始
        /// </summary>
        /// <param name="back">兩個監聽回調都會執行</param>
        /// <param name="back2">正常時調用back2(obj),失敗時調用back2(null)</param>
        public void Listen(ListenBackDelegate back, ListenBackDelegate back2=null)
        {
            if (IsListening) 
            {
                throw new Exception("端口"+ port +"已處於監聽狀態,線程中只能存在一個Ip端口連接。");
            }

            IsListening = true;
            listener = new Socket(AddressFamily.InterNetwork,
                SocketType.Stream, ProtocolType.Tcp);

            IPAddress ipAddress;
            if (string.IsNullOrWhiteSpace(ip))
            {
                ipAddress = IPAddress.Any;
            }
            else 
            {
                ipAddress = IPAddress.Parse(ip);
            }

            listener.Bind(new IPEndPoint(ipAddress, port));

            //不斷監聽端口
            while (IsListening)
            {
                try
                {
                    listener.Listen(0);
                    Socket socket = listener.Accept();
                    NetworkStream ntwStream = new NetworkStream(socket);
                    StreamReader strmReader = new StreamReader(ntwStream);
                    var objs = new object[] { strmReader.ReadToEnd() };
                    try
                    {
                        back.Invoke(objs);
                        if (back2 != null)
                        {
                            back2.Invoke(objs);
                        }
                    }
                    catch (Exception ex)
                    {
                        //回調失敗時,執行back2傳空,判斷為空處理回調失敗
                        if (back2 != null)
                        {
                            back2.Invoke(null);
                        }

                        TLogHelper.Error(ex.Message, "回調失敗");

                        ////執行失敗時是否終止監聽
                        //CloseListen();
                        throw ex;
                    }
                    finally 
                    {
                        socket.Close();
                    }
                }
                catch (Exception)
                {
                }
            }
            CloseListen();
        }

        /// <summary>
        /// 關閉監聽
        /// </summary>
        public void CloseListen ()
        {
            if (listener != null)
            {
                try
                {
                    listener.Shutdown(SocketShutdown.Both);
                }
                catch (Exception)
                {
                }
                listener.Dispose();
                listener.Close();
            }
            IsListening = false;
        }
    }
}
View Code

 

數據格式類

namespace TCPServer
{
    public class TCPServiceMsgModel
    {
        /// <summary>
        /// 業務名
        /// </summary>
        public string BLLName { get; set; }

        /// <summary>
        /// 業務數據
        /// </summary>
        public object Data { get; set; }
    }
}
View Code

 

7、BLL業務

BaseTCPBLL

using System;
using System.Threading;
namespace TCPServer
{
    public abstract class BaseTCPBLL: IDisposable
    {
        public bool IsListening
        {
            get
            {
                if (tcp == null)
                {
                    return false;
                }
                return tcp.IsListening;
            }
        }

        private TCPServerT tcp;
        private TCPServerT tcpSend;
        private TCPServerT.ListenBackDelegate back2;//啟動監聽動作的線程的回調,由啟動監聽方法傳入
        private Thread thrListener;
        private BaseTCPBLL(){}

        /// <summary>
        /// 
        /// </summary>
        /// <param name="requestIp">請求ip</param>
        /// <param name="requestPort">請求端口</param>
        /// <param name="listenIp">監聽ip</param>
        /// <param name="listenPort">監聽端口</param>
        protected BaseTCPBLL(string requestIp,int requestPort, string listenIp,int listenPort)
        {
            //實例化Tcp服務
            tcp = TCPServerT.Instance(listenIp, listenPort);
            tcpSend = TCPServerT.Instance(requestIp, requestPort);
        }
        #region  TCP服務
        /// <summary>
        /// 開始監聽客戶請求處理業務
        /// ,正常時調用back2(obj),失敗時調用back2(null)
        /// </summary>
        public Thread ListenStart(TCPServerT.ListenBackDelegate back2 = null)
        {
            if (IsListening) 
            {
                return thrListener;
            }

            this.back2 = back2;
            thrListener = new Thread(new ThreadStart(Listen));
            thrListener.Start();

            return thrListener;
        }

        /// <summary>
        /// 監聽執行
        /// </summary>
        private void Listen()
        {
            tcp.Listen(ListenClientBack, back2);
        }

        /// <summary>
        /// 關閉監聽
        /// </summary>
        public void CloseListen()
        {
            tcp.CloseListen();
        }

        /// <summary>
        /// 收到終端的請求后回調
        /// </summary>
        protected abstract string ListenClientBack(object obj);
        #endregion

        /// <summary>
        /// 發送請求
        /// </summary>
        protected void Request(TCPServiceMsgModel model)
        {
            tcpSend.Request(model);
        }

        public void Dispose() 
        {
            tcp.CloseListen();
            tcpSend.CloseListen();
        }
    }
}
View Code

 

TCPTestHello:BaseTCPBLL

using MQServer;
using Newtonsoft.Json;
using TCPServer;

namespace BLL
{
    public class TCPTestHello:BaseTCPBLL
    {
        /// <summary>
        /// 
        /// </summary>
        /// <param name="requestIp">請求ip,本機"127.0.0.1"</param>
        /// <param name="requestPort">請求端口</param>
        /// <param name="listenIp">監聽ip,本機""</param>
        /// <param name="listenPort">監聽端口</param>
        public TCPTestHello(string requestIp, int requestPort, string listenIp, int listenPort)
            :base(requestIp, requestPort, listenIp, listenPort)
        {
        }
        #region  TCP服務 

        /// <summary>
        /// 收到終端的請求后回調
        /// </summary>
        protected override string ListenClientBack(object obj)
        {
            var res = JsonConvert.SerializeObject(obj);
            //解析source,根據source中的BLLName方法名,執行不同業務

            TCPServiceMsgModel model;
            try
            {
                res = JsonConvert.DeserializeObject<string[]>(res)[0];
                model = JsonConvert.DeserializeObject<TCPServiceMsgModel>(res);
            }
            catch (System.Exception)
            {
                throw new System.Exception("TCP接收到對象不是本模塊接受的格式!"+ res);
            }

            switch (model.BLLName)
            {
                case "Hello":
                    Hello(model.Data);
                    break;
                default:
                    break;
            }
            return res;
        }
        #endregion

        /// <summary>
        /// 發送Hello請求
        /// </summary>
        public void SendHello(object data)
        {
            Request(new TCPServiceMsgModel() { BLLName = "Hello", Data = data });
        }

        public void Hello(object data)
        {
            TLogHelper.Info(JsonConvert.SerializeObject(data), "接收到消息在TCPTestHello");
        }

    }
}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM