1 消息队列的优点
消息队列本质是生产者——消费者模式。也有很多使用方式。那么它有什么优点呢?
以日常生活中邮寄信件这个行为举例,
当只有1个寄信人,1个邮递员的时候。寄信人想要寄信,到指定地点(邮局),直接将信件交给邮递员即可。
当有50个寄信人,1个邮递员的时候。这50个寄信人就要依次排队等待邮递员处理信件。
可以增加邮递员的数量,但是依然会有忙闲不均的问题存在。
我们现在增加一个邮筒(也就是数据缓冲区)
在这个例子中,寄信人就是生产者,邮递员是消费者。而邮筒就是一个消息队列。这个邮筒解决了以下问题:
1.1 解除耦合
实现了时间上解耦,也实现了对象间解耦。
之前邮递员隶属于A邮局,寄信人想要寄信,到指定地点,直接将信件交给邮递员即可。如果因为实际需求,以后由B邮局的快递员负责寄信业务。那么寄信人就要去另一个地点寄信。
这就是由于耦合产生的问题。
现在不管信件是由A邮局还是其他邮局负责,寄信人只管将信件投递进邮筒就行了。解除了寄信人和邮递员的耦合性。
1.2 实现异步处理
之前寄信将信件直接交给邮递员,可能要等待邮递员要确认很多信息(比如寄件人信息)之后,长辄几分钟,才能结束本次寄信的行为。
而现在将信件直接投递到邮箱里,只要不到1S,就能结束寄信的行为。
1.3 支持并发操作
解决同步处理的阻塞问题。
之前所有寄信人需要排队等待上一个人寄信完毕,才能开始寄信。
现在所有寄信人都把信件投递进邮筒即可。
1.4 实现流量削峰
可以根据邮递员方的处理能力,调节邮筒的容量。超过这个容量后,邮筒就放不下(拒绝)信件了。
即能根据下游的处理能力自由调节流量,实现削峰。
2 安装erlang和RabbitMQ
2.1 安装erlang
由于RabbitMQ是基于erlang开发的,需要先安装erlang。
确认自己要安装的RabbitMQ依赖的erlang的最低版本。
erlang:https://www.erlang.org/downloads
安装后添加环境变量。
在系统变量中添加:
变量名:ERLANG_HOME
变量值:C:\Program Files\erl-24.0(安装ERLANG的文件夹)
然后在用户变量的PATH中添加:%ERLANG_HOME%\bin
添加完环境变量之后可能需要重启。
然后打开CMD,运行erl,出现版本号为成功。
2.2 安装RabbitMQ
RabbitMQ:https://github.com/rabbitmq/rabbitmq-server/releases/
安装成功后会自动创建RabbitMQ服务并且启动
可以在任务管理器中确认:
2.3 安装RabbitMQ的Web管理插件
在命令行中CD到安装目录下,执行
rabbitmq-plugins.bat enable rabbitmq_management
成功后进入浏览器,输入:http://localhost:15672
初始账号和密码:guest/guest
3 理解消息队列中的基本概念
消息队列中有Exchange、Connection、Channel、Queue等概念
3.1 Exchange(交换机)
是生产者和消息队列的一个中介,负责将生产者的消息分发给消息队列。如果使用简单模式(只有一个生产者,一个消费者,一对一)时,不配置Exchange,实际上使用的是默认的Exchange。
3.2 Connection(连接)
是连接到MQ的TCP连接。为了方便理解,可以将Connection想象成一个光纤电缆。
3.3 Channel(通道)
一个Connection中存在多个Channel。可以把Channel理解为光纤电缆中的光纤。
3.4 Queue(消息队列)
一个Channel中可以存在多个Queue。
3.5 Broker(代理交换节点)
一个Broker就是一个RabbitMQ服务节点,包含多个Exchange(交换机)和Queue(消息队列)。
3.6 其他
因为建立和销毁 TCP 连接是非常昂贵的开销,所以一般维持Connection。在Connection之上,操作channel。
Channel的其中一个作用就是,屏蔽Connection的TCP层面的细节,方便开发,同时达到TCP连接复用的效果。
4 尝试消息队列的简单模式(一对一)
特点:一个生产者对应一个消费者。最简单的模式。
场景:一对一私聊。
新建一个解决方案,包含两个控制台程序,分别是生产者和消费者。
右键解决方案,设置多项目启动。
4.1 生产者代码
/// <summary>
/// 生产者
/// </summary>
internal class Program
{
private static void Main(string[] args)
{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
//创建RabbitMQ的TCP长连接(可以比喻成一个光纤电缆)
//因为建立和销毁 TCP 连接是非常昂贵的开销,所以一般维持连接(TCP连接复用)。在连接之上,建立和销毁channel。
var connection = factory.CreateConnection();
//创建通道(可以比喻成光纤电缆中的"一根"光纤)
var channel = connection.CreateModel();
/*声明一个队列:实现通道与队列的绑定
* 5个参数:
* queue:被绑定的消息队列名,当该消息队列不存在时,将新建该消息队列
* durable:是否使用持久化
* exclusive:该通道是否独占该队列
* autoDelete:消费完成时是否删除队列, 该删除操作在消费者彻底断开连接之后进行。
* args:其他配置参数
*/
channel.QueueDeclare("hello", false, false, false, null);
Console.WriteLine("\nRabbitMQ连接成功,生产者已启动,请输入消息,输入exit退出!");
string input;
do
{
input = Console.ReadLine();
var sendBytes = Encoding.UTF8.GetBytes(input);
//发布消息
channel.BasicPublish("", "hello", null, sendBytes);
}
while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();
}
}
4.2 消费者代码
/// <summary>
/// 消费者
/// </summary>
internal class Program
{
private static void Main(string[] args)
{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
Console.WriteLine($"收到消息: {message}");
//确认该消息已被消费
channel.BasicAck(ea.DeliveryTag, false);
};
//启动消费者 设置为手动应答消息
channel.BasicConsume("hello", false, consumer);
Console.WriteLine("消费者已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
}
4.3 测试
两个项目一起启动之后
在生产者对应的控制台输入文字后,添加到消息队列中,由消费者进行消费,显示在消费者控制台上。
5 尝试消息队列的WORK模式
特点:争夺消息,能者多劳。每个消费者获得的消息具有唯一性。
场景:抢红包。抢单。
5.1 生产者的代码
为了代码逻辑清晰,将各种模式的代码从Main函数中提出来单独封装成函数。Main函数中使用Switch来方便之后的测试。
/// <summary>
/// 生产者
/// </summary>
internal static class Program
{
private static void Main(string[] args)
{
//选择的模式类型
string ModeNumber = "2";
switch (ModeNumber)
{
case "1":
SignalMode();
break;
case "2":
WorkMode();
break;
default:
break;
}
}
/// <summary>
/// 简单模式
/// </summary>
private static void SignalMode()
{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
//创建RabbitMQ的TCP长连接(可以比喻成一个光纤电缆)
//因为建立和销毁 TCP 连接是非常昂贵的开销,所以一般维持连接(TCP连接复用)。在连接之上,建立和销毁channel。
var connection = factory.CreateConnection();
//创建通道(可以比喻成光纤电缆中的"一根"光纤)
var channel = connection.CreateModel();
/*声明一个队列:实现通道与队列的绑定
* 5个参数:
* queue:被绑定的消息队列名,当该消息队列不存在时,将新建该消息队列
* durable:是否使用持久化
* exclusive:该通道是否独占该队列
* autoDelete:消费完成时是否删除队列, 该删除操作在消费者彻底断开连接之后进行。
* args:其他配置参数
*/
channel.QueueDeclare("队列A", false, false, false, null);
Console.WriteLine("\nRabbitMQ连接成功,生产者已启动,请输入消息,输入exit退出!");
string input;
do
{
input = Console.ReadLine();
var sendBytes = Encoding.UTF8.GetBytes(input);
//发布消息
channel.BasicPublish("", "hello", null, sendBytes);
}
while (input.Trim().ToLower() != "exit");
channel.Close();
connection.Close();
}
/// <summary>
/// Work模式
/// </summary>
private static void WorkMode()
{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare("队列A", false, false, false, null);
for (int i = 0; i < 50; i++)
{
String message = "" + i;
var sendBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "队列A", null, sendBytes);
Console.WriteLine(" [x] Sent '" + message + "'");
Thread.Sleep(i * 10);
}
channel.Close();
connection.Close();
}
}
5.2 消费者的代码
为了模拟多个消费者争夺消息,将之前的消费者项目重命名为"RabbitMQ_Consumer01",并新建项目"RabbitMQ_Consumer02"。在work模式中,消费者01和消费者02的代码是相同的。
并将生产者、消费者01、消费者02同时设为启动项(由于消费者代码相同,只贴一个)。
/// <summary>
/// 消费者01
/// </summary>
internal static class Program
{
private static void Main(string[] args)
{
//选择的模式类型
string ModeNumber = "2";
switch (ModeNumber)
{
case "1":
SignalMode();
break;
case "2":
WorkMode();
break;
default:
break;
}
}
/// <summary>
/// 简单模式
/// </summary>
private static void SignalMode()
{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
//事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (model, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
Console.WriteLine($@"收到消息: {message}");
//确认该消息已被消费
channel.BasicAck(ea.DeliveryTag, false);
};
//启动消费者 设置为手动应答消息
channel.BasicConsume("队列A", false, consumer);
Console.WriteLine($@"消费者已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
/// <summary>
/// Work模式
/// </summary>
private static void WorkMode()
{
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare("队列A", false, false, false, null);
/** 设置限流机制
* param1: prefetchSize,消息本身的大小 如果设置为0 那么表示对消息本身的大小不限制
* param2: prefetchCount,告诉rabbitmq,不要一次性给消费者推送大于N个消息(一旦有N个消息没有Ack,此消费者不再获取消息,直到有消息Ack为止)
* param3:global,是否将上面的设置应用于整个通道,false表示只应用于当前消费者
*/
channel.BasicQos(0, 1, false);
// 定义队列的消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//启动消费者
//false为手动应答,true为自动应答
channel.BasicConsume("队列A", false, consumer);
consumer.Received += (model, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
Console.WriteLine($@"收到消息: {message}");
//确认该消息已被消费,手动返回完成
channel.BasicAck(ea.DeliveryTag, false);
};
Console.WriteLine($@"消费者01已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
}
5.3 测试处理能力相同的情况
启动项目后,可以看到消费者向队列推送消息1-50。而消费者01和消费者02对队列中的消息进行争抢,并且获取的消息具有唯一性。
可以看到由于消费者01、02的处理能力相同,争抢消息的数量也是平均的。
5.4 测试处理能力不相同的情况
那如何体现“能者多劳”这个特点呢。
在消费者01获取消息后,通过Thread.Sleep(1000);模拟消费者01的对消息的处理速度比较慢
可以看到,由于消费者01的处理速度慢,争抢到的消息也比较少
6 消费者端的限流配置(QOS)
6.1 配置限流参数
消费者中的这句代码就是在配置限流参数:
channel.BasicQos(0, 1, false);
param1: prefetchSize,消息本身的大小 如果设置为0 那么表示对消息本身的大小不限制
param2: prefetchCount,告诉rabbitmq,不要一次性给消费者推送大于N个消息(简单点说就是:一旦有N个消息没有Ack,此消费者不再获取消息,直到有消息Ack为止)
param3: global,是否将上面的设置应用于整个通道,false表示只应用于当前消费者
6.2 测试限流功能
我们将消费者02的限流数量设置为1,同时注释掉手动ACK的语句。
这样消费者02获取消息后,由于不会进行ACK操作,会导致消费者02的阻塞。我们设置的限流数量是1,所以消费者02由始至终只会获得一条消息。
7 RabbitMQ的确认机制(ACK)
ACK是acknowledgment的缩写。
队列接收到消费者的ACK信息后,才会将对应的消息进行删除操作。
RabbitMQ的确认方式分为自动ACK和手动ACK。
7.1 自动ACK和手动ACK的区别
自动ACK:消费者获取到消息后,会自动进行ACK操作。
手动ACK:可以自定义调用ACK操作的位置。
选择自动ACK,如果消费者处理时出现问题,或者中途退出没有处理。但队列已经接收到自动ACK把消息删除了,可能导致对消息处理出错。
选择手动ACK,可以将ACK的时机放在消费者正确将消息处理完毕之后。如果消费者中途退出,消息会由另一个消费者获取到进行操作。
7.2 如何配置ACK模式
消费者中这行代码的第二个参数就是在配置ACK模式(bool AutoAck)
channel.BasicConsume("队列A", false, consumer);
如果选择手动ACK,就要选择时机执行channel.BasicAck()函数。
7.3 测试手动ACK模式
在消费者02,接收到消息的事件委托函数中,增加以下代码。
当消费者02首次获取到大于10的数时,模拟消费失败,消费者02退出的的场景。
可以看到,消费者02获取到消息"12"之后,在进行ACK操作之前就退出了。消息再次由消费者01获得。
8 尝试消息队列的发布/订阅模式
特点:每个消费者有各自的队列,获取的消息相同。
场景:群聊天。
生产者向交换机发送消息,交换机将消息广播到各个队列。
8.1 交换机的类型
交换机有四种类型,分别是fanout、direct、topic、headers。
其中fanout(扇形交换)就是发布/订阅模式需要用到的。
(其实fanout交换机是Direct交换机的简化版,对于Direct先不进行讨论)
8.2 生产者的代码
Main函数中增加发布订阅类型,并选择
private static void Main(string[] args)
{
//选择的模式类型
string ModeNumber = "3";
switch (ModeNumber)
{
case "1":
SignalMode();
break;
case "2":
WorkMode();
break;
case "3":
PubSubMode();
break;
default:
break;
}
}
增加发布/订阅类型的函数
/// <summary>
/// 发布/订阅模式
/// </summary>
private static void PubSubMode()
{
//声明接受消息的交换机的名称
string myExangeName = "ExangeTypeF";
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
/** 声明交换机
* param1: 接受消息的交换机名称
* param2: 交换机模式
*/
channel.ExchangeDeclare(myExangeName, "fanout");
Console.WriteLine("生产者已启动");
for (int i = 0; i < 50; i++)
{
string message = "" + i;
var sendBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(myExangeName, "", null, sendBytes);
Console.WriteLine(" [x] Sent '" + message + "'");
Thread.Sleep(i * 10);
}
channel.Close();
connection.Close();
Console.ReadKey();
}
8.3 消费者01的代码
Main函数中增加发布订阅类型,并选择
private static void Main(string[] args)
{
//选择的模式类型
string ModeNumber = "3";
switch (ModeNumber)
{
case "1":
SignalMode();
break;
case "2":
WorkMode();
break;
case "3":
PubSubMode();
break;
default:
break;
}
}
增加发布/订阅类型的函数
/// <summary>
/// 发布/订阅模式
/// </summary>
private static void PubSubMode()
{
//声明接受消息的交换机的名称
string myExangeName = "ExangeTypeF";
//声明监听的队列
string myQueueName = "Queue01";
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
//声明队列
channel.QueueDeclare(myQueueName, false, false, false, null);
//绑定队列到交换机
channel.QueueBind(myQueueName, myExangeName, "");
channel.BasicQos(0, 1, false);
// 定义队列的消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//启动消费者
//false为手动应答,true为自动应答
channel.BasicConsume(myQueueName, false, consumer);
consumer.Received += (model, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
Console.WriteLine($@"收到消息: {message}");
//确认该消息已被消费,手动返回完成
channel.BasicAck(ea.DeliveryTag, false);
};
Console.WriteLine($@"消费者01已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
8.4 消费者02的代码
和消费者01基本的一致。注意监听的队列名称要和消费者01监听的队列区分开。
否则两个消费者监听同一个队列,又变成WORK模式了。
string myQueueName = "Queue02";
8.5 测试
项目启动后,可以看到,两个消费者监听的两个队列,都接受到了交换机发送的消息广播。
9 尝试消息队列的路由模式
特点:在发布/订阅模式的基础上,增加路由键值(routingkey),达到选择性向队列发送消息的目的。
9.1 交换机类型
实现路由模式的交换机类型为Direct(直连交换机)。交换机将消息推送到绑定着对应路由键值的队列中。
9.2 生产者的代码
Main函数中增加路由类型,并选择
private static void Main(string[] args)
{
//选择的模式类型
string ModeNumber = "4";
switch (ModeNumber)
{
case "1":
SignalMode();
break;
case "2":
WorkMode();
break;
case "3":
PubSubMode();
break;
case "4":
RoutingMode();
break;
default:
break;
}
}
增加路由模式相关代码,并设定区分路由键值的逻辑。
如果数字大于15且小于30,路由键值为X1;否则,路由键值为X2。
/// <summary>
/// 路由模式
/// </summary>
private static void RoutingMode()
{
//声明接受消息的交换机的名称
string myExangeName = "ExangeTypeR";
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
/** 声明交换机
* param1: 接受消息的交换机名称
* param2: 交换机模式
*/
channel.ExchangeDeclare(myExangeName, "direct");
Console.WriteLine("生产者已启动");
for (int i = 0; i < 50; i++)
{
string message = "" + i;
var sendBytes = Encoding.UTF8.GetBytes(message);
//区分设定路由键值
//如果大于15且小于30,路由键值为X1;否则,路由键值为X2
if (i > 15 && i < 30)
{
/** 发布消息
* param1: 接受消息的交换机名称
* param2: 路由键值
* param3: 其他参数(暂时用不到)
* param4: 二进制的消息体
*/
channel.BasicPublish(myExangeName, "X1", null, sendBytes);
}
else
{
channel.BasicPublish(myExangeName, "X2", null, sendBytes);
}
Console.WriteLine(" [x] Sent '" + message + "'");
Thread.Sleep(i * 10);
}
channel.Close();
connection.Close();
Console.ReadKey();
}
9.3 消费者01的代码
Main函数中增加路由类型,并选择(和生产者的Main函数一样,不粘贴了)
增加路由模式相关代码:
/// <summary>
/// 路由模式
/// </summary>
private static void RoutingMode()
{
//声明接受消息的交换机的名称
string myExangeName = "ExangeTypeR";
//声明监听的队列
string myQueueName = "Queue01";
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
//声明队列
channel.QueueDeclare(myQueueName, false, false, false, null);
//绑定队列到交换机
channel.QueueBind(myQueueName, myExangeName, "X1");
channel.BasicQos(0, 1, false);
// 定义队列的消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//启动消费者
//false为手动应答,true为自动应答
channel.BasicConsume(myQueueName, false, consumer);
consumer.Received += (model, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
Console.WriteLine($@"收到消息: {message}");
//确认该消息已被消费,手动返回完成
channel.BasicAck(ea.DeliveryTag, false);
};
Console.WriteLine($@"消费者01已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
注意这句话最后一个参数,绑定了队列匹配的路由键值
//绑定队列到交换机
channel.QueueBind(myQueueName, myExangeName, "X1");
9.4 消费者02的代码
Main函数中增加路由类型,并选择(和生产者的Main函数一样,不粘贴了)。
新增路由模式的代码:
和消费者01的代码相同,只修改了两处。
监听的队列为“Queue02”:
//声明监听的队列
string myQueueName = "Queue02";
队列绑定的路由键值为“X2”:
//绑定队列到交换机
channel.QueueBind(myQueueName, myExangeName, "X2");
9.5 测试
可以看到路由键值为X1的消息(大于15且小于30),都被直连交换机转发到匹配着“X1”路由键值的“Queue1”队列中,发送给消费者01。
10 尝试消息队列的主题模式
主题模式是路由模式的进化型。
如果说路由模式中交换机发送消息的依据是匹配着路由键值的队列,
那么主题模式中发送消息的依据则是根据通配符找到符合条件的队列,进行消息发送。
有些类似于抖音或微博中的#,话题功能。
说的简单一点,
路由模式是“全字匹配”路由键值,
主题模式是根据规则“模糊查询”路由键值。
10.1 交换机的类型
实现主题模式的交换机类型为topic(主题交换机)。
10.2 星花*与井号#的效果
主题模式中的路由键值是由多个主题组成,由"."进行分割。
例如"it.computer.cpu"。
消费者进行匹配的规则有两种,星花*与井号#。
星花*的效果是只能忽略一个主题进行匹配。
井号#的效果是可以忽略多个主题进行匹配。
当一个队列的路由键值为"it.*"时,是"接收不到"消息的。
当一个队列的路由键值为"it.computer.*"时,是"可以收到"消息的。
当一个队列的路由键值为"it.#"时,是"可以收到"消息的。
10.3 生产者的代码
Main函数中增加发布主题类型,并选择
private static void Main(string[] args)
{
//选择的模式类型
string ModeNumber = "5";
switch (ModeNumber)
{
case "1":
SignalMode();
break;
case "2":
WorkMode();
break;
case "3":
PubSubMode();
break;
case "4":
RoutingMode();
break;
case "5":
TopicMode();
break;
default:
break;
}
}
添加主题模式的代码
/// <summary>
/// 主题模式
/// </summary>
private static void TopicMode()
{
//声明接受消息的交换机的名称
string myExangeName = "ExangeTypeT";
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(myExangeName, "topic");
Console.WriteLine("生产者已启动");
for (int i = 0; i < 50; i++)
{
string message = "" + i;
var sendBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(myExangeName, "it.computer.cpu", null, sendBytes);
Console.WriteLine(" [x] Sent '" + message + "'");
Thread.Sleep(i * 10);
}
channel.Close();
connection.Close();
Console.ReadKey();
}
和路由模式的主要区别在于:
第二个参数路由键值,填写的是多个单词(话题)拼接而成的字符串,用“.”做分隔。
channel.BasicPublish(myExangeName, "it.computer.cpu", null, sendBytes);
10.4 消费者01的代码
Main函数中增加主题模式,并选择(和生产者的Main函数一样,不粘贴了)。
新增主题模式相关代码
/// <summary>
/// 主题模式
/// </summary>
private static void TopicMode()
{
//声明接受消息的交换机的名称
string myExangeName = "ExangeTypeT";
//声明监听的队列
string myQueueName = "Queue01";
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
//声明队列
channel.QueueDeclare(myQueueName, false, false, false, null);
//绑定队列到交换机
channel.QueueBind(myQueueName, myExangeName, "it.computer.*");
channel.BasicQos(0, 1, false);
// 定义队列的消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//启动消费者
//false为手动应答,true为自动应答
channel.BasicConsume(myQueueName, false, consumer);
consumer.Received += (model, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
Console.WriteLine($@"收到消息: {message}");
//确认该消息已被消费,手动返回完成
channel.BasicAck(ea.DeliveryTag, false);
};
Console.WriteLine($@"消费者01已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
10.5 消费者02的代码
Main函数中增加主题模式,并选择(和生产者的Main函数一样,不粘贴了)。
新增主题模式相关代码
/// <summary>
/// 主题模式
/// </summary>
private static void TopicMode()
{
//声明接受消息的交换机的名称
string myExangeName = "ExangeTypeT";
//声明监听的队列
string myQueueName = "Queue02";
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
//声明队列
channel.QueueDeclare(myQueueName, false, false, false, null);
//绑定队列到交换机
channel.QueueBind(myQueueName, myExangeName, "it.#");
channel.BasicQos(0, 1, false);
// 定义队列的消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//启动消费者
//false为手动应答,true为自动应答
channel.BasicConsume(myQueueName, false, consumer);
consumer.Received += (model, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
Console.WriteLine($@"收到消息: {message}");
//确认该消息已被消费,手动返回完成
channel.BasicAck(ea.DeliveryTag, false);
};
Console.WriteLine($@"消费者02已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();
}
消费者01和消费者02,监听的主题是不同的。
消费者01:channel.QueueBind(myQueueName, myExangeName, "it.computer.*");
消费者02:channel.QueueBind(myQueueName, myExangeName, "it.#");
10.6 测试
可以看到消费者01和消费者02都可以接收到消息。
11 实现消息的持久化
11.1 消息持久化的作用
消息持久化的目的是在服务器端保存未消费的消息,防止服务器宕机或者消息队列服务因故关闭导致的消息丢失,和手动ACK机制一样,都可以用来提高消息队列服务的可靠性。
RabbitMQ中为了最终达到持久化的目的,需要将3个部分都设置为持久化。分别是交换机持久化、队列持久化、消息持久化。
(测试消息持久化,基于生产者和消费者01的主题模式代码进行修改。不涉及消费者02)
11.2 生产者的代码
交换机持久化:
声明交换机时,将第三个可选参数durable设为true(默认为false),实现交换机持久化:
//开启交换机持久化
channel.ExchangeDeclare(myExangeName, "topic", true);
消息持久化:
调用接口新建基础参数类,将DeliveryMode设为2。
发送消息时,将基础参数类作为参数传入:
IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
channel.BasicPublish(myExangeName, "it.computer.cpu", properties, sendBytes);
11.3 消费者01的代码
队列持久化:
声明队列时将第二个参数durable设为true,实现队列持久化:
//声明队列
//开启队列持久化
channel.QueueDeclare(myQueueName, true, false, false, null);
11.4 重启服务相关指令
Windos下RabbitMQ关闭和启动的指令分别为:
rabbitmq-service start 开始服务
rabbitmq-service stop 停止服务
11.5 测试持久化
减慢消费速度
为了队列中的消息不会太快被消费,我们在消费者01中,将消费的速度减慢。
增加代码Thread.Sleep(6000);
设置生产者和消费者01同时启动
开始生产消息
当生产者将所有消息都发送后,消费者01没有全部消费完之前,关闭程序。
(注:截图后消费者01消费到消息17了,没有截取到)
重启RabbitMQ服务
rabbitmq-service stop 停止服务
rabbitmq-service start 开始服务
测试持久化效果
将解决方案设置为消费者01单项目启动
启动消费者01,此时由于消息持久化成功,消费者01会继续消费关闭服务前未被消费的消息。
12 保证消息的幂等性
12.1 什么是消息的幂等性
如果同一个消息,因为各种原因,不慎被消费了多次(例如多次点按按钮),和只消费一次得到的数据是相同的。就可以说保持了幂等性。
RabbitMQ是没有办法自己解决幂等性问题的,甚至某些情况下会造成消息重复消费的问题。
例如在第7.3节中,我们尝试了RabbitMQ的ACK机制。
可以看到,消费者02获取到消息"12"之后,其实已经将消息消费完了(输出在控制台),只是在进行ACK操作之前模拟了消费者出错退出。导致消息再次由消费者01获得并消费。
最终,这条消息就被消费了两次。
如果我们不人为保证消息的幂等性,数据就会出错。
关于消息的等幂性,请看另一篇文章:
13 防止消息丢失
当通过RabbitMQ传送消息时,如何防止消息半路丢失?
消费者导致消息丢失
详见第七节,我们可以使用消费者的手动ACK功能。当一个消费者接受到消息后没有进行ACK操作就掉线,交换机会将这条消息发送到另一个消费者监听的队列中。
RabbitMQ导致消息丢失
详见第十一节,我们可以通过启用RabbitMQ的持久化功能,保证消息服务宕掉重启后,未消费完成的消息依然存在,并向消费者发送。
生产者导致丢失
生产者在发送消息后,是无法得知消息是否正确发送到队列服务。
我们可以通过使用RabbitMQ的事务或通道(Channel)的Confirm功能预防这种情况。
Confirm类似于消费者的ACK,是队列服务提供给生产者的ACK。
官方文档:https://www.rabbitmq.com/tutorials/tutorial-seven-dotnet.html
13.1 WaitForConfirms()
使用channel.WaitForConfirms()函数来获取Borker的确认消息。
确认消息有两种结果,ack'd(接收到)和nack'd(丢失)。官方文档对nack'd的解释是:"meaning the broker could not take care of it for some reason"。
如果状态为ack'd,WaitForConfirms()的返回值为true。
如果状态为nack'd,或者在规定时间内没有收到确认消息,则返回false。
WaitForConfirms()的参数为自己规定的等待时间,并且有两种重载。
13.2 单独Confirm模式
在每次发送消息之后等待确认消息,对于生产者自身而言,是一种同步方法。
修改生产者WORK模式的代码
for (int i = 0; i < 50; i++)
{
string message = "" + i;
var sendBytes = Encoding.UTF8.GetBytes($@"消息{message}");
channel.BasicPublish("", "队列A", null, sendBytes);
Console.WriteLine($@" [x] Sent 消息{message}");
//等待Broker的确认消息
if (channel.WaitForConfirms(new TimeSpan(0, 0, 5)))
{
Console.WriteLine($@"接收到了确认信息");
Console.WriteLine();
}
Thread.Sleep(i * 10);
}
每发送一条消息后,生产者都会等待Borker的确认消息,再发送下一条消息。
优点:操作简单
缺点:对确认消息的等待会阻止后续发送消息的操作,大大减慢发送速度
13.3 批量Confirm模式
为了改进单独Confirm的缺点,避免为每一个发送的消息等待Broker的确认信息。可以使用批量Confirm的方法。
一次性发送N条消息,并在一次WaitForConfirms中获取这N条消息的确认信息。
修改生产者Work模式的代码
Console.WriteLine("生产者已启动");
//定义每批发送消息的数量,每批发10条
int batchSize = 10;
//消息计数
int pubCount = 0;
for (int i = 0; i < 50; i++)
{
string message = "" + i;
var sendBytes = Encoding.UTF8.GetBytes($@"消息{message}");
channel.BasicPublish("", "队列A", null, sendBytes);
Console.WriteLine($@" [x] Sent 消息{message}");
pubCount++;
if (pubCount == batchSize)
{
//等待Broker的确认消息
if (channel.WaitForConfirms(new TimeSpan(0, 0, 5)))
{
Console.WriteLine($@"接收到了确认信息");
Console.WriteLine();
}
//计数还原
pubCount = 0;
}
Thread.Sleep(i * 10);
}
定义每批消息的数量,并定义一个消息的计数器。
可以看到,每10个消息为一批,统一接受到Borker的Confirm消息。
优点:与单独Confirm相比,有效提高了信息的吞吐量。
缺点:依然是同步操作,对确认信息的等待会阻塞后续操作。而且由于是按批获取确认消息,如果出现问题无法得知具体是哪一条消息丢失。
13.4 异步Confirm模式
异步获取每一条消息的确认信息。不需要使用WaitForConfirms()方法,只需要注册两个回调函数即可。
channel.BasicAcks:获取的确认消息为ack时执行的回调函数
channel.BasicNacks:获取的确认消息为nack时执行的回调函数
修改生产者WORK模式的代码
/// <summary>
/// Work模式
/// </summary>
private static void WorkMode()
{
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",//用户名
Password = "guest",//密码
HostName = "localhost"//rabbitmq ip
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
//开启channel的Confirm模式
channel.ConfirmSelect();
//ACK回调函数(model指自己:生产者;ea:EventArgs参数)
channel.BasicAcks += (model, ea) =>
{
Console.WriteLine($@"接收到了的ACK,Tag为{ea.DeliveryTag}");
Console.WriteLine($@"Ea的Multiple:{ea.Multiple}");
Console.WriteLine();
};
//Nack回调函数
channel.BasicNacks += (model, ea) =>
{
Console.WriteLine($@"接收到了的NACK,Tag为{ea.DeliveryTag}");
Console.WriteLine($@"Ea的Multiple:{ea.Multiple}");
Console.WriteLine();
};
channel.QueueDeclare("队列A", false, false, false, null);
Console.WriteLine("生产者已启动");
for (int i = 0; i < 50; i++)
{
string message = "" + i;
var sendBytes = Encoding.UTF8.GetBytes($@"消息{message}");
channel.BasicPublish("", "队列A", null, sendBytes);
Console.WriteLine($@" [x] Sent 消息{message}");
//等待Broker的确认消息
channel.WaitForConfirms(new TimeSpan(0, 0, 5));
Thread.Sleep(i * 10);
}
channel.Close();
connection.Close();
Console.ReadKey();
}
主要添加三部分,分别是:开启Confirm,接受Confirm,编写Confirm为ACK或NACK时的回调函数
两个回调函数的参数都是相同的,其中ea有两个参数:DeliveryTag和Multiple。
DeliveryTag(发送标签):是发送消息之前为消息打上的序号(从1开始)。
Multiple(bool):本条Confirm对应一条还是多条。
优点:异步获取信息,不会造成阻塞。并且可以利用TAG与 消息的关联,在调用成功与失败的回调函数中,多做一些事情。
缺点:稍微有些复杂,也没那么复杂。