由於RabbitMQ中只有隊列(queue)才能存儲信息,所以用RabbitMQ實現超大用戶級別(百萬計)的消息在/離線收發需要對每一個用戶創建一個永久隊列。
但是RabbitMQ節點內存有限,經測試后發現節點集群也無法滿足數百萬用戶隊列收發數據的要求,所以最終決定采用數據庫輔助實現該功能。
一、數據庫結構
user_list數據庫下有4張表:user_info、group_info、groupmember_info、message_info。
user_info表中含有username(主鍵,非空,VARCHAR(50))、password(非空,VARCHAR(50))、routingkey(非空,VARCHAR(50))、has_queue(INT UNSIGNED)四個字段。
group_info表中含有groupname(主鍵,非空,VARCHAR(50))、password(非空,VARCHAR(50))、creator(非空,VARCHAR(50))三個字段。
groupmember_info表中含有username(主鍵,非空,VARCHAR(50))、groupname(主鍵,非空,VARCHAR(50))兩個字段。
message_info表中含有sendtime(非空,VARCHAR(50))、body(非空,VARCHAR(300)),receiver(非空,VARCHAR(50))、sender(非空,VARCHAR(100))四個字段。
二、客戶端結構
1、文件夾創建以及包依賴安裝:
dotnet new console --name Client mv Client/Program.cs Client/Client.cs
cd Client dotnet add package RabbitMQ.Client
dotnet add package MySql.Data dotnet restore
2、項目結構
Client.cs(主程序):

using RabbitMQ.Client; using MySql.Data.MySqlClient; namespace Client { class Client { static void Main(string[] args) { // 連接數據庫 string connStr = "Database=user_list;datasource=127.0.0.1;port=3306;user=root;pwd=123456;"; MySqlConnection sqlConn = new MySqlConnection(connStr); sqlConn.Open(); // 連接RabbitMQ var factory = new ConnectionFactory() { HostName = "dev.corp.wingoht.com", VirtualHost = "cd", UserName = "ishowfun", Password = "123456" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { // 用戶登錄,並獲取離線消息 UserInfo user = UserLogin.Login(sqlConn); // 監聽在線消息 Consumer.StartListening(channel, sqlConn, user); // 監聽按鈕事件,調用不同的客戶端功能 KeyListening.StartListening(channel, sqlConn, user); // 退出登錄 UserLogout.Logout(channel, sqlConn, user); } // 釋放數據庫連接 sqlConn.Close(); } } }
UserLogin.cs:

using System; using MySql.Data.MySqlClient; namespace Client { class UserLogin { public static UserInfo Login(MySqlConnection conn) { UserInfo user = new UserInfo(); // 用戶名輸入 Console.Write("Please enter your username: "); user.Username = Console.ReadLine(); while (user.Username.Contains(",") || user.Username.Contains(" ")) { Console.WriteLine("Error: Username can not contain \",\"! and \" \""); Console.Write("Please enter your username again: "); user.Username = Console.ReadLine(); } MySqlCommand cmd = new MySqlCommand("select * from user_info where username='" + user.Username + "'", conn); MySqlDataReader reader = cmd.ExecuteReader(); // 判斷該用戶是否已經注冊 if (reader.Read()) { // 已注冊用戶登錄 user.IsNewUser = true; // 驗證密碼 Console.Write("Please enter your password: "); user.Password = reader.GetString("password"); while (user.Password != Console.ReadLine()) { Console.WriteLine("Error: Username and password do not match!"); Console.Write("Please enter your password again: "); } Console.WriteLine("Welcome back, {0}!", user.Username); Console.WriteLine("-------------------------------------"); // 獲取當前用戶的路由鍵 user.RoutingKey = reader.GetString("routingkey"); reader.Close(); // 讀取離線消息 cmd = new MySqlCommand("select * from message_info where receiver='" + user.Username + "'", conn); reader = cmd.ExecuteReader(); Console.WriteLine("Unread Message: "); while (reader.Read()) { string sender = reader.GetString("sender"); string sendTime = reader.GetString("sendtime"); string content = reader.GetString("body"); string[] splitSender = sender.Split(" "); if (splitSender.Length == 1) { Console.WriteLine("Sender: {0}, SendTime: {1}, Content: {2}", sender, sendTime, content); } else { sender = splitSender[0]; string group = splitSender[1]; Console.WriteLine("Sender: {0} from {1}, SendTime: {2}, Content: {3}", sender, group, sendTime, content); } } reader.Close(); // 刪除已處理的離線消息 cmd = new MySqlCommand("delete from message_info where receiver=@re", conn); cmd.Parameters.AddWithValue("re", user.Username); cmd.ExecuteNonQuery(); } else { // 新用戶注冊並登錄 user.IsNewUser = false; reader.Close(); Console.WriteLine("Welcome, new user!"); // 設置密碼 Console.Write("Please set your password: "); user.Password = Console.ReadLine(); Console.Write("Please confirm your password: "); while (user.Password != Console.ReadLine()) { Console.WriteLine("Error: Confirmation failure!"); Console.Write("Please set your password again: "); user.Password = Console.ReadLine(); Console.Write("Please confirm your password: "); } // 生成該用戶的加密路由鍵,並保證該路由鍵唯一 user.RoutingKey = GenerateKey.GenerateRandomString(32); cmd = new MySqlCommand("select * from user_info where routingkey='" + user.RoutingKey + "'", conn); reader = cmd.ExecuteReader(); while (reader.Read()) { reader.Close(); user.RoutingKey = GenerateKey.GenerateRandomString(32); reader = cmd.ExecuteReader(); } reader.Close(); } Console.WriteLine("-------------------------------------"); return user; } } }
UserInfo.cs:

namespace Client { class UserInfo { // 唯一用戶名 public string Username { get; set; } // 登錄密碼 public string Password { get; set; } // 加密路由鍵 public string RoutingKey { get; set; } // 是否為新用戶 public bool IsNewUser { get; set; } } }
GenerateKey.cs:

using System; namespace Client { class GenerateKey { // 字符串中字符的取值范圍 private static char[] constant = { '0','1','2','3','4','5','6','7','8','9', 'a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z', 'A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z' }; // 生成指定長度的隨機字符串 public static string GenerateRandomString(int len) { System.Text.StringBuilder newRandom = new System.Text.StringBuilder(62); Random rd = new Random(); for (int i = 0; i < len; i++) { newRandom.Append(constant[rd.Next(62)]); } return newRandom.ToString(); } } }
Consumer.cs:

using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; using MySql.Data.MySqlClient; namespace Client { class Consumer { public static void StartListening(IModel channel, MySqlConnection conn, UserInfo user) { // 交換機聲明 channel.ExchangeDeclare(exchange: "topic_message", type: "topic"); // 隊列創建與綁定 channel.QueueDeclare(queue: user.RoutingKey, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: user.RoutingKey, exchange: "topic_message", routingKey: user.RoutingKey); var consumer = new EventingBasicConsumer(channel); // 處理收到的消息 consumer.Received += (model, ea) => { var body = ea.Body; string message = Encoding.UTF8.GetString(body); string[] splitRes = message.Trim().Split(","); string sendTime = splitRes[1]; string content = splitRes[2]; string[] splitSender = splitRes[0].Split(" "); if (splitRes.Length > 3) { for (int i = 3; i < splitRes.Length; i++) content = content + "," + splitRes[i]; } Console.WriteLine(); if (splitSender.Length == 1) { string sender = splitRes[0]; Console.WriteLine("Sender: {0}, SendTime: {1}, Content: {2}", sender, sendTime, content); } else { string sender = splitSender[0]; string group = splitSender[1]; Console.WriteLine("Sender: {0} from {1}, SendTime: {2}, Content: {3}", sender, group, sendTime, content); } }; channel.BasicConsume(queue: user.RoutingKey, autoAck: true, consumer: consumer); MySqlCommand cmd; // 更新數據庫 if (user.IsNewUser) { // 向數據庫聲明該用戶已經擁有了隊列,可以直接在線發送 cmd = new MySqlCommand("update user_info set has_queue=1 where username=@uid", conn); cmd.Parameters.AddWithValue("uid", user.Username); cmd.ExecuteNonQuery(); } else { // 防止注入地插入新用戶數據 cmd = new MySqlCommand("insert into user_info set username=@uid,password=@pwd,routingkey=@rk,has_queue=1", conn); cmd.Parameters.AddWithValue("uid", user.Username); cmd.Parameters.AddWithValue("pwd", user.Password); cmd.Parameters.AddWithValue("rk", user.RoutingKey); cmd.ExecuteNonQuery(); } Console.WriteLine("{0} Start Listening!", user.Username); Console.WriteLine("-------------------------------------"); } } }
KeyListening.cs:

using System; using RabbitMQ.Client; using MySql.Data.MySqlClient; namespace Client { class KeyListening { public static void StartListening(IModel channel, MySqlConnection conn, UserInfo user) { MQHelper.HelpTip(); Console.WriteLine("-------------------------------------"); ConsoleKeyInfo keyPressed; bool escFlag = false; while (true) { keyPressed = Console.ReadKey(); Console.WriteLine(); Console.WriteLine("-------------------------------------"); switch (keyPressed.Key) { // 0: 顯示幫助信息 case ConsoleKey.D0: MQHelper.HelpTip(); break; // 1: 創建群組 case ConsoleKey.D1: MQHelper.CreateGroup(conn, user); break; // 2: 申請加入群組 case ConsoleKey.D2: MQHelper.JoinGroup(conn, user); break; // 3: 退出群組 case ConsoleKey.D3: MQHelper.LeaveGroup(conn, user); break; // 4: 顯示群組信息 case ConsoleKey.D4: MQHelper.ShowGroup(conn, user); break; // 5: 單播 case ConsoleKey.D5: MQHelper.BasicSend(channel, conn, user); break; // 6: 組播 case ConsoleKey.D6: MQHelper.GroupSend(channel, conn, user); break; // ESC: 退出 case ConsoleKey.Escape: escFlag = true; break; // 無意義按鍵 default: Console.WriteLine("Error: Invalid press!"); break; } Console.WriteLine("-------------------------------------"); if (escFlag) break; } } } }
MQHelper.cs:

using System; using System.Text; using System.Collections.Generic; using RabbitMQ.Client; using MySql.Data.MySqlClient; namespace Client { // 該類包含了按鈕監聽事件觸發的各項功能的實現 class MQHelper { #region 顯示幫助信息 public static void HelpTip() { // 顯示各按鍵對應功能 Console.WriteLine("Function Press:"); Console.WriteLine("[0]: Help"); Console.WriteLine("[1]: Create group"); Console.WriteLine("[2]: Join group"); Console.WriteLine("[3]: Leave group"); Console.WriteLine("[4]: Show groups"); Console.WriteLine("[5]: BasicSend"); Console.WriteLine("[6]: GroupSend"); Console.WriteLine("[ESC]: Log out"); } #endregion #region 創建群組 public static void CreateGroup(MySqlConnection conn, UserInfo user) { // 設置群組名稱以及加入密碼 Console.Write("Please set the group name: "); string groupName = Console.ReadLine(); MySqlCommand groupCmd = new MySqlCommand("select * from group_info where groupname='" + groupName + "'", conn); MySqlDataReader groupReader = groupCmd.ExecuteReader(); if (groupReader.Read()) { groupReader.Close(); Console.WriteLine("Error: This group name already exists!"); } else { groupReader.Close(); Console.Write("Please set the password: "); string pwd = Console.ReadLine(); Console.Write("Please confirm the password: "); while (pwd != Console.ReadLine()) { Console.WriteLine("Error: Confirmation failure!"); Console.Write("Please set the password again: "); pwd = Console.ReadLine(); Console.Write("Please confirm the password: "); } // 將群組信息插入數據庫 groupCmd = new MySqlCommand("insert into group_info set groupname=@gid,password=@pwd,creator=@cr", conn); groupCmd.Parameters.AddWithValue("gid", groupName); groupCmd.Parameters.AddWithValue("pwd", pwd); groupCmd.Parameters.AddWithValue("cr", user.Username); groupCmd.ExecuteNonQuery(); groupCmd = new MySqlCommand("insert into groupmember_info set groupname=@gid,username=@uid", conn); groupCmd.Parameters.AddWithValue("gid", groupName); groupCmd.Parameters.AddWithValue("uid", user.Username); groupCmd.ExecuteNonQuery(); Console.WriteLine("Successfully create the group!"); } } #endregion #region 申請加入群組 public static void JoinGroup(MySqlConnection conn, UserInfo user) { // 輸入要加入的群名 Console.Write("Please enter the group name: "); string groupName = Console.ReadLine(); MySqlCommand groupCmd = new MySqlCommand("select * from group_info where groupname='" + groupName + "'", conn); MySqlDataReader groupReader = groupCmd.ExecuteReader(); // 判斷該群是否存在 if (groupReader.Read()) { // 驗證加入密碼 string pwd = groupReader.GetString("password"); groupReader.Close(); Console.Write("Please enter your password: "); if (pwd != Console.ReadLine()) { Console.WriteLine("Error: Username and password do not match!"); } else { groupCmd = new MySqlCommand("select * from groupmember_info where groupname='" + groupName + "' and username='" + user.Username + "'", conn); groupReader = groupCmd.ExecuteReader(); // 判斷該用戶是否已經在這個群中 if (groupReader.Read()) { groupReader.Close(); Console.WriteLine("Error: You already join the group!"); } else { // 加入該群,並更新數據庫信息 groupReader.Close(); groupCmd = new MySqlCommand("insert into groupmember_info set groupname=@gid,username=@uid", conn); groupCmd.Parameters.AddWithValue("gid", groupName); groupCmd.Parameters.AddWithValue("uid", user.Username); groupCmd.ExecuteNonQuery(); Console.WriteLine("Successfully join the group!"); } } } else { groupReader.Close(); Console.WriteLine("Error: This group name does not exist!"); } } #endregion #region 退出群組 public static void LeaveGroup(MySqlConnection conn, UserInfo user) { // 輸入要退出群的名稱 Console.Write("Please enter the group name: "); string groupName = Console.ReadLine(); MySqlCommand groupCmd = new MySqlCommand("select * from groupmember_info where groupname='" + groupName + "' and username='" + user.Username + "'", conn); MySqlDataReader groupReader = groupCmd.ExecuteReader(); // 判斷你是否在這個群中 if (groupReader.Read()) { // 退出該群,並更新數據庫信息 groupReader.Close(); groupCmd = new MySqlCommand("delete from groupmember_info where groupname=@gid and username=@uid", conn); groupCmd.Parameters.AddWithValue("gid", groupName); groupCmd.Parameters.AddWithValue("uid", user.Username); groupCmd.ExecuteNonQuery(); Console.WriteLine("Successfully leave the group!"); } else { groupReader.Close(); Console.WriteLine("Error: You didn't join the group!"); } } #endregion #region 顯示群組信息 public static void ShowGroup(MySqlConnection conn, UserInfo user) { MySqlCommand groupCmd = new MySqlCommand("select * from groupmember_info where username='" + user.Username + "'", conn); MySqlDataReader groupReader = groupCmd.ExecuteReader(); // 顯示加入的所有群名稱 while (groupReader.Read()) { Console.WriteLine(groupReader.GetString("groupname")); } groupReader.Close(); } #endregion #region 單播 public static void BasicSend(IModel channel, MySqlConnection conn, UserInfo user) { // 輸入收信人用戶名 Console.Write("Please enter the receiver: "); string receiver = Console.ReadLine(); Console.WriteLine("-------------------------------------"); // 輸入待發送的消息 Console.WriteLine("Please enter the message: "); string content = Console.ReadLine(); Console.WriteLine("-------------------------------------"); MySqlCommand sendCmd = new MySqlCommand("select * from user_info where username='" + receiver + "'", conn); MySqlDataReader sendReader = sendCmd.ExecuteReader(); bool user_flag = sendReader.Read(); // 判斷是否存在該收信人 if (user_flag) { string receiverKey = sendReader.GetString("routingkey"); int hasQueue = sendReader.GetInt32("has_queue"); string sendTime = DateTime.Now.ToString(); string message = user.Username + "," + sendTime + "," + content; var body = Encoding.UTF8.GetBytes(message); sendReader.Close(); // 目標隊列存在,則直接發布信息;否則將信息數據存入數據庫 if (hasQueue == 1) { // 在線發布消息 var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "topic_message", routingKey: receiverKey, basicProperties: properties, body: body); } else { // 離線發布消息 sendCmd = new MySqlCommand("insert into message_info set sendtime=@st,body=@body,receiver=@re,sender=@se", conn); sendCmd.Parameters.AddWithValue("st", sendTime); sendCmd.Parameters.AddWithValue("body", content); sendCmd.Parameters.AddWithValue("re", receiver); sendCmd.Parameters.AddWithValue("se", user.Username); sendCmd.ExecuteNonQuery(); } Console.WriteLine("You sent [ {0} ] to {1} at {2}.", content, receiver, sendTime); } else { sendReader.Close(); Console.WriteLine("Error: The receiver does not exist!"); } } #endregion #region 組播 public static void GroupSend(IModel channel, MySqlConnection conn, UserInfo user) { // 輸入在那個群組中發布消息 Console.Write("Please enter the receiver: "); string group = Console.ReadLine(); Console.WriteLine("-------------------------------------"); // 輸入待發布消息 Console.WriteLine("Please enter the message: "); string content = Console.ReadLine(); Console.WriteLine("-------------------------------------"); // 讀取群組中的所有成員 MySqlCommand sendCmd = new MySqlCommand("select * from groupmember_info where groupname='" + group + "'", conn); MySqlDataReader sendReader = sendCmd.ExecuteReader(); List<string> memberList = new List<string>(); while (sendReader.Read()) { memberList.Add(sendReader.GetString("username")); } sendReader.Close(); string sender = user.Username + " " + group; string sendTime = DateTime.Now.ToString(); // 交換機聲明 channel.ExchangeDeclare(exchange: "topic_message", type: "topic"); var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 逐個處理成員,在線用戶在線發送消息,離線用戶離線發送消息 foreach (string member in memberList) { sendCmd = new MySqlCommand("select * from user_info where username='" + member + "'", conn); sendReader = sendCmd.ExecuteReader(); sendReader.Read(); string receiverKey = sendReader.GetString("routingkey"); int hasQueue = sendReader.GetInt32("has_queue"); string message = sender + "," + sendTime + "," + content; var body = Encoding.UTF8.GetBytes(message); sendReader.Close(); // 目標隊列存在,則直接發布信息;否則將信息數據存入數據庫 if (hasQueue == 1) { // 在線發送消息 channel.BasicPublish(exchange: "topic_message", routingKey: receiverKey, basicProperties: properties, body: body); } else { // 離線發送消息 sendCmd = new MySqlCommand("insert into message_info set sendtime=@st,body=@body,receiver=@re,sender=@se", conn); sendCmd.Parameters.AddWithValue("st", sendTime); sendCmd.Parameters.AddWithValue("body", content); sendCmd.Parameters.AddWithValue("re", member); sendCmd.Parameters.AddWithValue("se", sender); sendCmd.ExecuteNonQuery(); } } Console.WriteLine("You sent [ {0} ] to {1} at {2}.", content, group, sendTime); } #endregion } }
UserLogout.cs:

using System; using RabbitMQ.Client; using MySql.Data.MySqlClient; namespace Client { class UserLogout { public static void Logout(IModel channel, MySqlConnection conn, UserInfo user) { Console.WriteLine("Goodbye, {0}!", user.Username); // 刪除隊列,並更新數據庫,表明后面發送的消息應該轉為離線發送 channel.QueueDelete(user.RoutingKey); MySqlCommand cmd = new MySqlCommand("update user_info set has_queue=0 where username=@uid", conn); cmd.Parameters.AddWithValue("uid", user.Username); cmd.ExecuteNonQuery(); } } }
百度雲鏈接:https://pan.baidu.com/s/1Y93rcqnsv1cA9ZIxH2xrBw 密碼:zfc5