ActiveMQ的另一種模式就SUB/HUB即發布訂閱模式,是SUB/hub就是一拖N的USB分線器的意思。意思就是一個來源分到N個出口。還是上節的例子,當一個訂單產生后,后台N個系統需要聯動,但有一個前提是都需要收到訂單信息,那么我們就需要將一個生產者的消息發布到N個消費者。
生產者:
try { //Create the Connection Factory IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/"); using (IConnection connection = factory.CreateConnection()) { //Create the Session using (ISession session = connection.CreateSession()) { //Create the Producer for the topic/queue IMessageProducer prod = session.CreateProducer( new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing")); //Send Messages int i = 0; while (!Console.KeyAvailable) { ITextMessage msg = prod.CreateTextMessage(); msg.Text = i.ToString(); Console.WriteLine("Sending: " + i.ToString()); prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue); System.Threading.Thread.Sleep(5000); i++; } } } Console.ReadLine(); } catch (System.Exception e) { Console.WriteLine("{0}", e.Message); Console.ReadLine(); }
假設生產者每5秒發送一次消息:
消費者:
static void Main(string[] args) { try { //Create the Connection factory IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/"); //Create the connection using (IConnection connection = factory.CreateConnection()) { connection.ClientId = "testing listener1"; connection.Start(); //Create the Session using (ISession session = connection.CreateSession()) { //Create the Consumer IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener1", null, false); consumer.Listener += new MessageListener(consumer_Listener); Console.ReadLine(); } connection.Stop(); connection.Close(); } } catch (System.Exception e) { Console.WriteLine(e.Message); } } static void consumer_Listener(IMessage message) { try { ITextMessage msg = (ITextMessage)message; Console.WriteLine("Receive: " + msg.Text); } catch (System.Exception e) { Console.WriteLine(e.Message); } }
啟動一個消費者:
我們發現他是從15開始的,而不是像上節一樣從頭開始,再啟動另一個消費者:
我們發現就是從啟動時開始接受消息的,之前的消息就丟失了。
整體狀態如下:
我們觀察管理界面:
產生了一個testing的Topics,而訂閱方有2個都訂閱的是testing:
這樣只需要在需要獲取消息的地方訂閱即可及時獲得。