利用RabbitMQ、MySQL實現超大用戶級別的消息在/離線收發


由於RabbitMQ中只有隊列(queue)才能存儲信息,所以用RabbitMQ實現超大用戶級別(百萬計)的消息在/離線收發需要對每一個用戶創建一個永久隊列。

但是RabbitMQ節點內存有限,經測試后發現節點集群也無法滿足數百萬用戶隊列收發數據的要求,所以最終決定采用數據庫輔助實現該功能。

 

一、數據庫結構

user_list數據庫下有4張表:user_info、group_info、groupmember_info、message_info。

user_info表中含有username(主鍵,非空,VARCHAR(50))、password(非空,VARCHAR(50))、routingkey(非空,VARCHAR(50))、has_queue(INT UNSIGNED)四個字段。

group_info表中含有groupname(主鍵,非空,VARCHAR(50))、password(非空,VARCHAR(50))、creator(非空,VARCHAR(50))三個字段。

groupmember_info表中含有username(主鍵,非空,VARCHAR(50))、groupname(主鍵,非空,VARCHAR(50))兩個字段。

message_info表中含有sendtime(非空,VARCHAR(50))、body(非空,VARCHAR(300)),receiver(非空,VARCHAR(50))、sender(非空,VARCHAR(100))四個字段。

 

二、客戶端結構

1、文件夾創建以及包依賴安裝:

dotnet new console --name Client
mv Client/Program.cs Client/Client.cs
cd Client
dotnet add package RabbitMQ.Client
dotnet add package MySql.Data dotnet restore

2、項目結構

Client.cs(主程序):
using RabbitMQ.Client;
using MySql.Data.MySqlClient;

namespace Client
{
    class Client
    {
        static void Main(string[] args)
        {
            // 連接數據庫
            string connStr = "Database=user_list;datasource=127.0.0.1;port=3306;user=root;pwd=123456;";
            MySqlConnection sqlConn = new MySqlConnection(connStr);

            sqlConn.Open();

            // 連接RabbitMQ
            var factory = new ConnectionFactory() { HostName = "dev.corp.wingoht.com", VirtualHost = "cd", UserName = "ishowfun", Password = "123456" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                // 用戶登錄,並獲取離線消息
                UserInfo user = UserLogin.Login(sqlConn);

                // 監聽在線消息
                Consumer.StartListening(channel, sqlConn, user);

                // 監聽按鈕事件,調用不同的客戶端功能
                KeyListening.StartListening(channel, sqlConn, user);

                // 退出登錄
                UserLogout.Logout(channel, sqlConn, user);
            }

            // 釋放數據庫連接
            sqlConn.Close();
        }
    }
}
View Code

 UserLogin.cs:

using System;
using MySql.Data.MySqlClient;

namespace Client
{
    class UserLogin
    {
        public static UserInfo Login(MySqlConnection conn)
        {
            UserInfo user = new UserInfo();
            
            // 用戶名輸入
            Console.Write("Please enter your username: ");
            user.Username = Console.ReadLine();
            while (user.Username.Contains(",") || user.Username.Contains(" "))
            {
                Console.WriteLine("Error: Username can not contain \",\"! and \" \"");
                Console.Write("Please enter your username again: ");
                user.Username = Console.ReadLine();
            }

            MySqlCommand cmd = new MySqlCommand("select * from user_info where username='" + user.Username + "'", conn);
            MySqlDataReader reader = cmd.ExecuteReader();

            // 判斷該用戶是否已經注冊
            if (reader.Read())
            {
                // 已注冊用戶登錄
                user.IsNewUser = true;

                // 驗證密碼
                Console.Write("Please enter your password: ");
                user.Password = reader.GetString("password");
                while (user.Password != Console.ReadLine())
                {
                    Console.WriteLine("Error: Username and password do not match!");
                    Console.Write("Please enter your password again: ");
                }
                Console.WriteLine("Welcome back, {0}!", user.Username);
                Console.WriteLine("-------------------------------------");

                // 獲取當前用戶的路由鍵
                user.RoutingKey = reader.GetString("routingkey");
                reader.Close();

                // 讀取離線消息
                cmd = new MySqlCommand("select * from message_info where receiver='" + user.Username + "'", conn);
                reader = cmd.ExecuteReader();
                Console.WriteLine("Unread Message: ");
                while (reader.Read())
                {
                    string sender = reader.GetString("sender");
                    string sendTime = reader.GetString("sendtime");
                    string content = reader.GetString("body");
                    string[] splitSender = sender.Split(" ");
                    if (splitSender.Length == 1)
                    {
                        Console.WriteLine("Sender: {0}, SendTime: {1}, Content: {2}", sender, sendTime, content);
                    }
                    else
                    {
                        sender = splitSender[0];
                        string group = splitSender[1];
                        Console.WriteLine("Sender: {0} from {1}, SendTime: {2}, Content: {3}", sender, group, sendTime, content);
                    }
                }
                reader.Close();

                // 刪除已處理的離線消息
                cmd = new MySqlCommand("delete from message_info where receiver=@re", conn);
                cmd.Parameters.AddWithValue("re", user.Username);

                cmd.ExecuteNonQuery();
            }
            else
            {
                // 新用戶注冊並登錄
                user.IsNewUser = false;
                reader.Close();
                Console.WriteLine("Welcome, new user!");

                // 設置密碼
                Console.Write("Please set your password: ");
                user.Password = Console.ReadLine();
                Console.Write("Please confirm your password: ");
                while (user.Password != Console.ReadLine())
                {
                    Console.WriteLine("Error: Confirmation failure!");
                    Console.Write("Please set your password again: ");
                    user.Password = Console.ReadLine();
                    Console.Write("Please confirm your password: ");
                }

                // 生成該用戶的加密路由鍵,並保證該路由鍵唯一
                user.RoutingKey = GenerateKey.GenerateRandomString(32);
                cmd = new MySqlCommand("select * from user_info where routingkey='" + user.RoutingKey + "'", conn);
                reader = cmd.ExecuteReader();
                while (reader.Read())
                {
                    reader.Close();
                    user.RoutingKey = GenerateKey.GenerateRandomString(32);
                    reader = cmd.ExecuteReader();
                }

                reader.Close();
            }

            Console.WriteLine("-------------------------------------");
            return user;
        }
    }
}
View Code

 UserInfo.cs:

namespace Client
{
    class UserInfo
    {
        // 唯一用戶名
        public string Username { get; set; }

        // 登錄密碼
        public string Password { get; set; }

        // 加密路由鍵
        public string RoutingKey { get; set; }

        // 是否為新用戶
        public bool IsNewUser { get; set; }
    }
}
View Code

 GenerateKey.cs: 

using System;

namespace Client
{
    class GenerateKey
    {
        // 字符串中字符的取值范圍
        private static char[] constant =
        {
            '0','1','2','3','4','5','6','7','8','9',
            'a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z',
            'A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'
        };

        // 生成指定長度的隨機字符串
        public static string GenerateRandomString(int len)
        {
            System.Text.StringBuilder newRandom = new System.Text.StringBuilder(62);
            Random rd = new Random();
            for (int i = 0; i < len; i++)
            {
                newRandom.Append(constant[rd.Next(62)]);
            }
            return newRandom.ToString();
        }
    }
}
View Code

 Consumer.cs:

using System;
using System.Text;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using MySql.Data.MySqlClient;

namespace Client
{
    class Consumer
    {
        public static void StartListening(IModel channel, MySqlConnection conn, UserInfo user)
        {
            // 交換機聲明
            channel.ExchangeDeclare(exchange: "topic_message", type: "topic");

            // 隊列創建與綁定
            channel.QueueDeclare(queue: user.RoutingKey,
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            channel.QueueBind(queue: user.RoutingKey,
                              exchange: "topic_message",
                              routingKey: user.RoutingKey);

            var consumer = new EventingBasicConsumer(channel);

            // 處理收到的消息
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                string message = Encoding.UTF8.GetString(body);
                string[] splitRes = message.Trim().Split(",");
                string sendTime = splitRes[1];
                string content = splitRes[2];
                string[] splitSender = splitRes[0].Split(" ");
                if (splitRes.Length > 3)
                {
                    for (int i = 3; i < splitRes.Length; i++)
                        content = content + "," + splitRes[i];
                }

                Console.WriteLine();
                if (splitSender.Length == 1)
                {
                    string sender = splitRes[0];
                    Console.WriteLine("Sender: {0}, SendTime: {1}, Content: {2}", sender, sendTime, content);
                }
                else
                {
                    string sender = splitSender[0];
                    string group = splitSender[1];
                    Console.WriteLine("Sender: {0} from {1}, SendTime: {2}, Content: {3}", sender, group, sendTime, content);
                }
            };

            channel.BasicConsume(queue: user.RoutingKey,
                                 autoAck: true,
                                 consumer: consumer);

            MySqlCommand cmd;

            // 更新數據庫
            if (user.IsNewUser)
            {
                // 向數據庫聲明該用戶已經擁有了隊列,可以直接在線發送
                cmd = new MySqlCommand("update user_info set has_queue=1 where username=@uid", conn);
                cmd.Parameters.AddWithValue("uid", user.Username);

                cmd.ExecuteNonQuery();
            }
            else
            {
                // 防止注入地插入新用戶數據
                cmd = new MySqlCommand("insert into user_info set username=@uid,password=@pwd,routingkey=@rk,has_queue=1", conn);
                cmd.Parameters.AddWithValue("uid", user.Username);
                cmd.Parameters.AddWithValue("pwd", user.Password);
                cmd.Parameters.AddWithValue("rk", user.RoutingKey);

                cmd.ExecuteNonQuery();
            }

            Console.WriteLine("{0} Start Listening!", user.Username);
            Console.WriteLine("-------------------------------------");
        }
    }
}
View Code

 KeyListening.cs:

using System;

using RabbitMQ.Client;
using MySql.Data.MySqlClient;

namespace Client
{
    class KeyListening
    {
        public static void StartListening(IModel channel, MySqlConnection conn, UserInfo user)
        {
            MQHelper.HelpTip();
            Console.WriteLine("-------------------------------------");
            ConsoleKeyInfo keyPressed;
            bool escFlag = false;

            while (true)
            {

                keyPressed = Console.ReadKey();
                Console.WriteLine();
                Console.WriteLine("-------------------------------------");
                switch (keyPressed.Key)
                {
                    // 0: 顯示幫助信息
                    case ConsoleKey.D0:
                        MQHelper.HelpTip();
                        break;
                    // 1: 創建群組
                    case ConsoleKey.D1:
                        MQHelper.CreateGroup(conn, user);
                        break;
                    // 2: 申請加入群組
                    case ConsoleKey.D2:
                        MQHelper.JoinGroup(conn, user);
                        break;
                    // 3: 退出群組
                    case ConsoleKey.D3:
                        MQHelper.LeaveGroup(conn, user);
                        break;
                    // 4: 顯示群組信息    
                    case ConsoleKey.D4:
                        MQHelper.ShowGroup(conn, user);
                        break;
                    // 5: 單播
                    case ConsoleKey.D5:
                        MQHelper.BasicSend(channel, conn, user);
                        break;
                    // 6: 組播
                    case ConsoleKey.D6:
                        MQHelper.GroupSend(channel, conn, user);
                        break;
                    // ESC: 退出
                    case ConsoleKey.Escape:
                        escFlag = true;
                        break;
                    // 無意義按鍵
                    default:
                        Console.WriteLine("Error: Invalid press!");
                        break;
                }

                Console.WriteLine("-------------------------------------");

                if (escFlag)
                    break;
            }
        }
    }
}
View Code

 MQHelper.cs:

using System;
using System.Text;
using System.Collections.Generic;

using RabbitMQ.Client;
using MySql.Data.MySqlClient;

namespace Client
{
    // 該類包含了按鈕監聽事件觸發的各項功能的實現
    class MQHelper
    {
        #region 顯示幫助信息
        public static void HelpTip()
        {
            // 顯示各按鍵對應功能
            Console.WriteLine("Function Press:");
            Console.WriteLine("[0]: Help");
            Console.WriteLine("[1]: Create group");
            Console.WriteLine("[2]: Join group");
            Console.WriteLine("[3]: Leave group");
            Console.WriteLine("[4]: Show groups");
            Console.WriteLine("[5]: BasicSend");
            Console.WriteLine("[6]: GroupSend");
            Console.WriteLine("[ESC]: Log out");
        }
        #endregion

        #region 創建群組
        public static void CreateGroup(MySqlConnection conn, UserInfo user)
        {
            // 設置群組名稱以及加入密碼
            Console.Write("Please set the group name: ");
            string groupName = Console.ReadLine();
            MySqlCommand groupCmd = new MySqlCommand("select * from group_info where groupname='" + groupName + "'", conn);
            MySqlDataReader groupReader = groupCmd.ExecuteReader();

            if (groupReader.Read())
            {
                groupReader.Close();
                Console.WriteLine("Error: This group name already exists!");
            }
            else
            {
                groupReader.Close();

                Console.Write("Please set the password: ");
                string pwd = Console.ReadLine();
                Console.Write("Please confirm the password: ");
                while (pwd != Console.ReadLine())
                {
                    Console.WriteLine("Error: Confirmation failure!");
                    Console.Write("Please set the password again: ");
                    pwd = Console.ReadLine();
                    Console.Write("Please confirm the password: ");
                }

                // 將群組信息插入數據庫
                groupCmd = new MySqlCommand("insert into group_info set groupname=@gid,password=@pwd,creator=@cr", conn);
                groupCmd.Parameters.AddWithValue("gid", groupName);
                groupCmd.Parameters.AddWithValue("pwd", pwd);
                groupCmd.Parameters.AddWithValue("cr", user.Username);

                groupCmd.ExecuteNonQuery();

                groupCmd = new MySqlCommand("insert into groupmember_info set groupname=@gid,username=@uid", conn);
                groupCmd.Parameters.AddWithValue("gid", groupName);
                groupCmd.Parameters.AddWithValue("uid", user.Username);

                groupCmd.ExecuteNonQuery();

                Console.WriteLine("Successfully create the group!");
            }
        }
        #endregion

        #region 申請加入群組
        public static void JoinGroup(MySqlConnection conn, UserInfo user)
        {
            // 輸入要加入的群名
            Console.Write("Please enter the group name: ");
            string groupName = Console.ReadLine();
            MySqlCommand groupCmd = new MySqlCommand("select * from group_info where groupname='" + groupName + "'", conn);
            MySqlDataReader groupReader = groupCmd.ExecuteReader();

            // 判斷該群是否存在
            if (groupReader.Read())
            {
                // 驗證加入密碼
                string pwd = groupReader.GetString("password");
                groupReader.Close();

                Console.Write("Please enter your password: ");
                if (pwd != Console.ReadLine())
                {
                    Console.WriteLine("Error: Username and password do not match!");
                }
                else
                {
                    groupCmd = new MySqlCommand("select * from groupmember_info where groupname='" + groupName + "' and username='" + user.Username + "'", conn);
                    groupReader = groupCmd.ExecuteReader();
                    
                    // 判斷該用戶是否已經在這個群中 
                    if (groupReader.Read())
                    {
                        groupReader.Close();
                        Console.WriteLine("Error: You already join the group!");
                    }
                    else
                    {
                        // 加入該群,並更新數據庫信息
                        groupReader.Close();
                        groupCmd = new MySqlCommand("insert into groupmember_info set groupname=@gid,username=@uid", conn);
                        groupCmd.Parameters.AddWithValue("gid", groupName);
                        groupCmd.Parameters.AddWithValue("uid", user.Username);

                        groupCmd.ExecuteNonQuery();
                        Console.WriteLine("Successfully join the group!");
                    }
                }
            }
            else
            {
                groupReader.Close();
                Console.WriteLine("Error: This group name does not exist!");
            }
        }
        #endregion

        #region 退出群組
        public static void LeaveGroup(MySqlConnection conn, UserInfo user)
        {
            // 輸入要退出群的名稱
            Console.Write("Please enter the group name: ");
            string groupName = Console.ReadLine();
            MySqlCommand groupCmd = new MySqlCommand("select * from groupmember_info where groupname='" + groupName + "' and username='" + user.Username + "'", conn);
            MySqlDataReader groupReader = groupCmd.ExecuteReader();

            // 判斷你是否在這個群中
            if (groupReader.Read())
            {
                // 退出該群,並更新數據庫信息
                groupReader.Close();
                groupCmd = new MySqlCommand("delete from groupmember_info where groupname=@gid and username=@uid", conn);
                groupCmd.Parameters.AddWithValue("gid", groupName);
                groupCmd.Parameters.AddWithValue("uid", user.Username);
                groupCmd.ExecuteNonQuery();
                Console.WriteLine("Successfully leave the group!");
            }
            else
            {
                groupReader.Close();
                Console.WriteLine("Error: You didn't join the group!");
            }
        }
        #endregion

        #region 顯示群組信息
        public static void ShowGroup(MySqlConnection conn, UserInfo user)
        {
            MySqlCommand groupCmd = new MySqlCommand("select * from groupmember_info where username='" + user.Username + "'", conn);
            MySqlDataReader groupReader = groupCmd.ExecuteReader();

            // 顯示加入的所有群名稱
            while (groupReader.Read())
            {
                Console.WriteLine(groupReader.GetString("groupname"));
            }
            groupReader.Close();
        }
        #endregion

        #region 單播
        public static void BasicSend(IModel channel, MySqlConnection conn, UserInfo user)
        {
            // 輸入收信人用戶名
            Console.Write("Please enter the receiver: ");
            string receiver = Console.ReadLine();
            Console.WriteLine("-------------------------------------");

            // 輸入待發送的消息
            Console.WriteLine("Please enter the message: ");
            string content = Console.ReadLine();
            Console.WriteLine("-------------------------------------");
            MySqlCommand sendCmd = new MySqlCommand("select * from user_info where username='" + receiver + "'", conn);
            MySqlDataReader sendReader = sendCmd.ExecuteReader();

            bool user_flag = sendReader.Read();

            // 判斷是否存在該收信人
            if (user_flag)
            {
                string receiverKey = sendReader.GetString("routingkey");
                int hasQueue = sendReader.GetInt32("has_queue");
                string sendTime = DateTime.Now.ToString();
                string message = user.Username + "," + sendTime + "," + content;
                var body = Encoding.UTF8.GetBytes(message);

                sendReader.Close();

                // 目標隊列存在,則直接發布信息;否則將信息數據存入數據庫
                if (hasQueue == 1)
                {
                    // 在線發布消息
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;

                    channel.BasicPublish(exchange: "topic_message",
                                         routingKey: receiverKey,
                                         basicProperties: properties,
                                         body: body);
                }
                else
                {
                    // 離線發布消息
                    sendCmd = new MySqlCommand("insert into message_info set sendtime=@st,body=@body,receiver=@re,sender=@se", conn);
                    sendCmd.Parameters.AddWithValue("st", sendTime);
                    sendCmd.Parameters.AddWithValue("body", content);
                    sendCmd.Parameters.AddWithValue("re", receiver);
                    sendCmd.Parameters.AddWithValue("se", user.Username);

                    sendCmd.ExecuteNonQuery();
                }
                Console.WriteLine("You sent [ {0} ] to {1} at {2}.", content, receiver, sendTime);
            }
            else
            {
                sendReader.Close();
                Console.WriteLine("Error: The receiver does not exist!");
            }
        }
        #endregion

        #region 組播
        public static void GroupSend(IModel channel, MySqlConnection conn, UserInfo user)
        {
            // 輸入在那個群組中發布消息
            Console.Write("Please enter the receiver: ");
            string group = Console.ReadLine();
            Console.WriteLine("-------------------------------------");

            // 輸入待發布消息
            Console.WriteLine("Please enter the message: ");
            string content = Console.ReadLine();
            Console.WriteLine("-------------------------------------");

            // 讀取群組中的所有成員
            MySqlCommand sendCmd = new MySqlCommand("select * from groupmember_info where groupname='" + group + "'", conn);
            MySqlDataReader sendReader = sendCmd.ExecuteReader();
            List<string> memberList = new List<string>();
            while (sendReader.Read())
            {
                memberList.Add(sendReader.GetString("username"));
            }
            sendReader.Close();

            string sender = user.Username + " " + group;
            string sendTime = DateTime.Now.ToString();

            // 交換機聲明           
            channel.ExchangeDeclare(exchange: "topic_message", type: "topic");

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            // 逐個處理成員,在線用戶在線發送消息,離線用戶離線發送消息
            foreach (string member in memberList)
            {
                sendCmd = new MySqlCommand("select * from user_info where username='" + member + "'", conn);
                sendReader = sendCmd.ExecuteReader();

                sendReader.Read();
                string receiverKey = sendReader.GetString("routingkey");
                int hasQueue = sendReader.GetInt32("has_queue");
                string message = sender + "," + sendTime + "," + content;
                var body = Encoding.UTF8.GetBytes(message);
                sendReader.Close();

                // 目標隊列存在,則直接發布信息;否則將信息數據存入數據庫
                if (hasQueue == 1)
                {
                    // 在線發送消息
                    channel.BasicPublish(exchange: "topic_message",
                                         routingKey: receiverKey,
                                         basicProperties: properties,
                                         body: body);
                }
                else
                {
                    // 離線發送消息
                    sendCmd = new MySqlCommand("insert into message_info set sendtime=@st,body=@body,receiver=@re,sender=@se", conn);
                    sendCmd.Parameters.AddWithValue("st", sendTime);
                    sendCmd.Parameters.AddWithValue("body", content);
                    sendCmd.Parameters.AddWithValue("re", member);
                    sendCmd.Parameters.AddWithValue("se", sender);

                    sendCmd.ExecuteNonQuery();
                }
            }

            Console.WriteLine("You sent [ {0} ] to {1} at {2}.", content, group, sendTime);
        }
        #endregion
    }
}
View Code

 UserLogout.cs:

using System;

using RabbitMQ.Client;
using MySql.Data.MySqlClient;

namespace Client
{
    class UserLogout
    {
        public static void Logout(IModel channel, MySqlConnection conn, UserInfo user)
        {
            Console.WriteLine("Goodbye, {0}!", user.Username);

            // 刪除隊列,並更新數據庫,表明后面發送的消息應該轉為離線發送
            channel.QueueDelete(user.RoutingKey);
            MySqlCommand cmd = new MySqlCommand("update user_info set has_queue=0 where username=@uid", conn);
            cmd.Parameters.AddWithValue("uid", user.Username);

            cmd.ExecuteNonQuery();
        }
    }
}
View Code

 

百度雲鏈接:https://pan.baidu.com/s/1Y93rcqnsv1cA9ZIxH2xrBw 密碼:zfc5


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM