在前面講到了RabbitMQ高可用集群的搭建,但是我們知道只是集群的高可用並不能保證應用在使用消息隊列時完全沒有問題,例如如果應用連接的RabbitMQ集群突然宕機了,雖然這個集群時可以使用的,但是應用訂閱的連接就斷開了,如果有個機房外網出口帶寬被挖掘機弄斷了,那集群依然是不可用的。所以我們后面會介紹應用APP如何與連接集群來保證兩者配合默契,以及如何實現跨機房的集群復制。
應用連接集群高可用
前面講到應用服務器通過一個負載均衡服務將連接的流量分發到指定服務器,如果連接的節點宕機怎么辦呢。應用服務器連接集群主要做兩件事,訂閱和發布,所以如果是發布消息每次都會重新初始化連接所以連接節點的切換對整個系統的可用性影響不大。如果是訂閱消息就沒有真么簡單了。首先我們要做到如果連接出現問題應該是拋出異常而不是終止腳本,並且這時應該重新連接連接。
好了不廢話了,代碼如下:
ColonyProduct

using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace ColonyProduct { class Program { static void Main(string[] args) { String exchangeName = "wytExchange"; String queueName = "wytQueue"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.130"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: String.Empty, arguments: null); IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true; for (int i = 0; i < 10; i++) { Byte[] body = Encoding.UTF8.GetBytes("Hello World -- "+i); channel.BasicPublish(exchange: exchangeName, routingKey: String.Empty, basicProperties: properties, body: body); } } } Console.WriteLine("發送完成"); Console.ReadKey(); } } }
ColonyConsumer

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; namespace ColonyConsumer { class Program { static void Main(string[] args) { String exchangeName = "wytExchange"; String queueName = "wytQueue"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.133"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: String.Empty, arguments: null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); channel.BasicAck(ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.WriteLine("等待接收消息"); Console.ReadKey(); } } } } }
所以通過以上的代碼就可以保證服務器某節點宕機后訂閱的連接自動重連切換。
RabbitMQ集群異地復制
基於warren的共享存儲模式
這種方式其實並不是跨地區的遠程復制,並且需要共享存儲,如果感興趣的同學可以百度下。
基於Shovel的遠程復制
如果直接基於WAN來組建異地的集群的話,集群間大量的數據通訊會產生高昂的費用,另外Erlang也不允許這么高延遲的通訊。
Shovel是RabbitMQ自帶插件(2.7.0后),自帶插件的好處就是可以在RabbitMQ服務啟動時自動啟動Shovel和自定義復制關系。
Shovel運行的原理其實非常簡單。通過定義RabbitMQ上一個隊列和另外一個RabbitMQ上的交換機之間的復制關系來實現遠程復制。也就是說它會在主服務上建立一個隊列來監聽交換機,所以這是到交換機所以的消息會投遞到該隊列,並且在從服務中訂閱這個隊列,使隊列中的消息復制到從服務的交換機中。RabbitMQ是一個比較全面的消息隊列解決方案,我們公司並沒有用到該功能,只是在這提下,感興趣的同學可以搜下。