最近,項目中使用到了ActiveMQ獲取第三方推送過來的數據。具體背景是:公司需要監控全國各地車輛實時運行的GPS數據,但監控本身不是公司做的,而是交給第三方公司做,第三方采集GPS數據后推送給我們。全國各地,近萬台車輛,每台車輛每隔幾秒就發送一次GPS位置數據,如果我們提供API給第三方公司去調用,顯然無論是第三方還是我們這邊,服務器都是是扛不住的,這么做也是不合理的,於是,便采取了消息隊列,第三方采集到的數據直接推送到消息隊列代理服務器,而己方從消息隊列服務器取數據處理。以下對項目實踐及其中遇到的一些問題及解決進行概要總結。
1、ActiveMQ NMS簡介
關於NMS,這里主要談兩點。
NMS API:ActiveMQ定義的一套API接口規范,你可以理解為一個API的接口,它指明了生產者或消費者如何與消息隊列服務器通信。
NMS Providers:NMS API的具體實現,基於Windows或ActiveMQ下的各種協議,提供了各種實現,目前提供了ActiveMQ、STOMP、MSMQ、EMS、WCF、AMQP、MQTT、XMS幾種實現。具體項目中,我采取的是ActiveMQ實現。
至於消息隊列涉及到的其他概念,什么Broker、Queue、Topic、Producer、Consumer,這里不做介紹,各位可以自己查資料,這些 概念本身也不難理解的。
2、關於異常下的Broker重連
這個異常,可能是由於網絡異常,也可能是長時間沒有通信,Broker把Client給斷掉了,不去管它。起初,這個項目是從一位離職員工的手頭接過來的,給的說法是只需要維護就夠了,基本上不用調整。當時雖然說是做了重連,后來發現,就跟沒做一樣。發現這個,是起源於第三方頻繁通知,MQ隊列有積壓,通知我們盡快處理。項目拿到手一看,我勒個去,直接起了一個Timer在那兒定時監控Connection狀態,如果狀態不對立刻重新打開連接。先不說Socket連接的浪費情況,及Timer這個.NET中近乎Bug的一個東西,這種做法實際中行之無效,因為連接異常情況下再打開,往往是打開失敗的,比如上次異常連接沒有關閉,狀態不對,或者ClientID暫時沒被Broker釋放等。
於是,針對重連,開始做第一次優化。查了IConnection元數據,發現有個ExceptionListener可用,於是便想到利用這個事件來監聽並重連。改完上線,可第二天一大早過來,發現MQ又擠壓了,重連時效了,打開日志看到,記錄了ExceptionListener事件日志,但重連沒有成功,具體原因,我想可能和優化前是一樣的吧。這折騰前后完全沒區別。這時候,我想,不能在現有做法里邊去整了,必須回到NMS本身去整,堂堂Apache開源項目,一定有更好的重連機制,放着不用自己整,是不是傻。。。
於是,打開官網,如願以償,找到了failover這個東西:
根據描述,鏈接異常時,隨機從配置的Broker列表總選取一個進行重連。這是個好東西,於是,Broker的鏈接配置,由tcp://183.56.131.224:61616調整為failover:(tcp://183.56.131.224:61616)。這里沒有多個Broker,只有一個,所以配置一個也是沒有問題的,我們重點是利用failover。與此同時,OpenWire上發現了maxInactivityDuration這個配置項,官網描述如下:
這個也不錯,配置Connection閑置多久被Broker斷掉。我這里比較狠,反正Broker那個隊列出了第三方往里邊推,就我這兒一個人在消費,直接配置0算了,永不被殺,出問題了重連,豈不爽哉。於是,Broker進一步調整為:failover:(tcp://183.56.131.224:61616)?wireFormat.maxInactivityDuration=0。此機制我也自己寫Demo驗證過,無論是Broker突然停掉再開啟,還是Producer停掉再開啟,Consumer均能成功重連的。至此,MQ的可靠重連問題算是解決了。
3、進程重啟導致Consumer鏈接失敗
具體情境是這樣的:MQ消費者進程是寄宿在Windows服務中的,運維那邊做測試或維護,會在MQ運行正常的情況下直接重啟服務,有時候會重啟失敗,過陣子啟動,又成功了。我過去,打開Windows事件日志,說是ClientID被占用,也就是說瞬間重啟時候,Broker端暫時沒來得及斷開或者釋放該ClientID對應的Connection,而我們系統中ClientID是配置死的。我又驗證了下,正常運行下,先關閉服務,過幾秒再開啟,就沒這問題,也印證了自己的推斷。問題是找到了,但總不能告訴運維,每次先停止服務,再打開,不能用重啟吧,哪個開發要是這樣跟我說,那他媽也太不靠譜了。
解決方案就是,ClientID動態生成,每次啟動都不一樣,這個ClientID僅僅是Broker用來標識一個連接端的,隨便什么都無所謂,只要跟上次不一樣。項目中取的是當前時分秒字符串。如下:
_connection.ClientId = DateTime.Now.ToString("yyyyMMddHHmmss");
調整完上線,再試驗,那問題再無復現。
4、服務啟動時間過長的問題
隨着各種奇葩情況繼續出現,我這里繼續被操。具體場景是:鑒於是跟第三方合作,各種第三方服務器宕機,各種網絡不靠譜,你懂的。如果是消費者進程已經啟動成功了,那第三方或者網絡不靠譜了,我們利用2中的重連機制就已經可以了,無非就是等他們靠譜了我們自動重連上就是了。可問題是,如果第三方不靠譜,或者網絡不靠譜時,我們在啟動消費者Windows服務,那會出現什么情況呢?給大家實際演示下:
目前,我我的服務安裝后,是這樣的:
假設正常鏈接配置是這樣的:
failover:(tcp://183.56.131.224:61616)?wireFormat.maxInactivityDuration=0
為了模擬外界異常或不可達的情況,我手動設置為如下:
failover:(tcp://183.56.131.200:61616)?wireFormat.maxInactivityDuration=0
大家注意,那個Broker地址是不可達的。
開啟服務,其結果如下:
這個啟動界面,你就等着吧,等個兩三分鍾,結果如下:
更要命的是,點擊確定后,服務啟動結果如下:
這就比較操蛋了,你啟動失敗就失敗把,別給我整成啟動狀態啊,這不誤導人么。
一般,這種情況,就屬於啟動進程一直卡主,當服務啟動超時時,就會出現這種情況, 啟動強行被Windows終止,但那個標記為啟動狀態這個就不好理解,也比較坑了。這是必須要處理的事情,否則極易造成誤導。對於Windows服務本身啟動機制,你是沒辦法做任何事情的,那只能從MQ鏈接機制去干事情。最終, 經過查詢官網文檔, 再次如願以償,找到了以下兩個配置項:
這兩個配置項分別代表,啟動時最大重連嘗試次數,默認值0,代表無限重連,我們出問題就出現在這里,鏈接不上時無限重試,無限重試無限連接不上,無限鏈接不上再無限重試。。。然后,進程阻塞,阻塞到一定時間,Windows服務重啟失敗。這個我也在Connection open時候打斷點調試過,確實阻塞了。那么第二個配置項代表一項操作超時時間。問題找到了,那么自然也就有解決方案了,現把鏈接配置為如下:
failover:(tcp://183.56.131.200:61616)?wireFormat.maxInactivityDuration=0&transport.startupMaxReconnectAttempts=3&transport.timeout=3000
這里有兩點要注意:
1)原本,這里配置應該是failover:(tcp://183.56.131.224:61616)?wireFormat.maxInactivityDuration=0&transport.startupMaxReconnectAttempts=3&transport.timeout=3000,但在配置文件中, &符號是不支持的,必須轉義或替換,這里采取了實體替換,具體的是&這個鬼實體符;
2)NMS.ActiveMQ v1.4.0以上版本和以前以及其他語言實現版本不大相同,1.4以上版本配置這兩項參數時必須有transport前綴。這里當時也是吃過虧的。
配置調整完畢后,我們再用 這個無效地址啟動服務,在經過60S以內的啟動時間,畫風變成了這樣:
點擊確定:
這個時間,和transport.timeout、transport.initialReconnectDelay、transport.startupMaxReconnectAttempts等幾項配置有關。但起碼時間不會像之前那樣很久,並且最終Windows服務狀態顯示為啟動了。
5、總結
鑒於這是公司實際運作項目,就不上傳代碼了,如果是自己的Demo,一定毫不保留,望各位見諒。實際上,也沒什么特別的,大家平時遇到這種難纏的問題,多查官網文檔,官網文檔搞不定,再查源碼,配合動手實踐,一般都不會是問題的。幸運的是,雖然很多官網文檔都是英文,但絕大部分都通俗易懂,我們看上去,也都不費事兒的。
附:ActiveMQ生產者及消費者示例代碼
生產者:
/// <summary> /// 生產者啟動器 /// </summary> public class ProducerBootstrap { #region Private Fields private readonly IConnectionFactory _connectionFactory = null; private IConnection _connection = null; private IMessageProducer _producer = null; #endregion #region Constructors public ProducerBootstrap() { _connectionFactory = new ConnectionFactory("tcp://localhost:61616"); } #endregion #region Public Methods public void Start() { _connection = _connectionFactory.CreateConnection(); _connection.ExceptionListener += _connection_ExceptionListener; _connection.Start(); ISession sesison = _connection.CreateSession(); _producer = sesison.CreateProducer(new ActiveMQQueue("guokun")); } public void Stop() { _connection.Stop(); _connection.Close(); _connection.Dispose(); } public void SendMessage() { while (true) { ITextMessage message = _producer.CreateTextMessage(); message.Text = string.Format("數據:{0}", DateTime.Now); _producer.Send(message); Thread.Sleep(1000); Console.WriteLine(message.Text); } } #endregion #region Private Methods private void _connection_ExceptionListener(Exception exception) { Console.WriteLine("生產者發生異常:{0}", exception); } #endregion }
消費者:
public class ConsumerBootstrap { #region Private Fields private readonly IConnectionFactory _connectionFactory = null; private IConnection _connection = null; private IMessageConsumer _consumer = null; #endregion #region Constructors public ConsumerBootstrap() { _connectionFactory = new ConnectionFactory("failover:(tcp://localhost:61616)?wireFormat.maxInactivityDuration=0&transport.timeout=3000&transport.startupMaxReconnectAttempts=2"); } #endregion #region Public Methods public void Start() { _connection = _connectionFactory.CreateConnection(); _connection.ClientId = "guokun"; _connection.ExceptionListener += _connection_ExceptionListener; _connection.Start(); ISession session = _connection.CreateSession(); _consumer = session.CreateConsumer(new ActiveMQQueue("guokun")); _consumer.Listener += _consumer_Listener; Console.WriteLine("消費者啟動成功..."); } public void Stop() { _connection.Stop(); _connection.Close(); _connection.Dispose(); } #endregion #region Private Methods /// <summary> /// 消息監聽處理 /// </summary> /// <param name="message"></param> private void _consumer_Listener(IMessage message) { ITextMessage textMessage = message as ITextMessage; Console.WriteLine("{0}-{1}", DateTime.Now, textMessage.Text); } private void _connection_ExceptionListener(Exception exception) { Console.WriteLine("生產者發生異常:{0}", exception); } #endregion }