使用消息隊列異步化系統
前言
前期為了快速開發,項目結構較為混亂,代碼維護與功能擴展都比較困難,為了方便后續功能開發,最近對項目進行的重構,順便在重構的過程中將之前的部分操作進行了異步處理,也第一次實際接觸了JMS與消息隊列。項目中采用的消息中間件為ActiveMQ。
什么是JMS
Java消息服務(Java Message Service,JMS)應用程序接口是一個Java平台中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平台無關的API,絕大多數MOM提供商都對JMS提供支持。
Java消息服務的規范包括兩種消息模式,點對點和發布者/訂閱者。許多提供商支持這一通用框架因此,程序員可以在他們的分布式軟件中實現面向消息的操作,這些操作將具有不同面向消息中間件產品的可移植性。
Java消息服務支持同步和異步的消息處理,在某些場景下,異步消息是必要的;在其他場景下,異步消息比同步消息操作更加便利。
Java消息服務支持面向事件的方法接收消息,事件驅動的程序設計現在被廣泛認為是一種富有成效的程序設計范例,程序員們都相當熟悉。
在應用系統開發時,Java消息服務可以推遲選擇面對消息中間件產品,也可以在不同的面對消息中間件切換。——Wiki
什么是消息隊列
在計算機科學中,消息隊列(英語:Message queue)是一種進程間通信或同一進程的不同線程間的通信方式,軟件的貯列用來處理一系列的輸入,通常是來自使用者。消息隊列提供了異步的通信協議,每一個貯列中的紀錄包含詳細說明的資料,包含發生的時間,輸入裝置的種類,以及特定的輸入參數,也就是說:消息的發送者和接收者不需要同時與消息隊列互交。消息會保存在隊列中,直到接收者取回它。
目前,有很多消息隊列有很多開源的實現,包括JBoss Messaging、JORAM、Apache ActiveMQ、Sun Open Message Queue、Apache Qpid和HTTPSQS。
消息隊列本身是異步的,它允許接收者在消息發送很長時間后再取回消息,這和大多數通信協議是不同的。例如WWW中使用的HTTP協議是同步的,因為客戶端在發出請求后必須等待服務器回應。然而,很多情況下我們需要異步的通信協議。比如,一個進程通知另一個進程發生了一個事件,但不需要等待回應。但消息隊列的異步特點,也造成了一個缺點,就是接收者必須輪詢消息隊列,才能收到最近的消息。
和信號相比,消息隊列能夠傳遞更多的信息。與管道相比,消息隊列提供了有格式的數據,這可以減少開發人員的工作量。但消息隊列仍然有大小限制。——Wiki
正文
基本類圖結構如下:
說明:
AsyncWork:消息的處理類接口,定義各類型的消息的處理方式
AsyncWorkProducer:消息的生產者(JMS生產者),負責向消息隊列里面放入消息
AsyncWorkConsumer:消息的消費者(JMS消費者),負責從消息隊列中消費消息
AsyncWorkFactory:對外提供的服務的工廠類
EmailWork、PushNotificationWork、LoginLogWork...:實現AsyncWork接口,定義消息的具體處理方式
1 public class AsyncWorkProducer { 2 //ConnectionFactory :連接工廠,JMS 用它創建連接 3 private ConnectionFactory connectionFactory; 4 private String queueName = "QueueName"; 5 public AsyncWorkProducer(String queueName){ 6 this.queueName = queueName; 7 init(); 8 } 9 private void init(){ 10 connectionFactory = new ActiveMQConnectionFactory( 11 ActiveMQConnection.DEFAULT_USER, 12 ActiveMQConnection.DEFAULT_PASSWORD, 13 SystemConfiguration.getString("asyc.location")); 14 } 15 public void sendMessage(Message message){ 16 Connection connection = null; 17 try { 18 // Connection :JMS 客戶端到JMS Provider 的連接 | 構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar 19 connection = connectionFactory.createConnection(); 20 //啟動 21 connection.start(); 22 // Session: 一個發送或接收消息的線程 | 獲取操作連接 23 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 24 // Destination :消息的目的地;消息發送給誰. 25 Destination destination = session.createQueue(queueName); 26 // MessageProducer:消息發送者 |得到消息生成者【發送者】 27 MessageProducer producer = session.createProducer(destination); 28 //設置不持久化,實際根據項目決定 29 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 30 // 發送消息到目的地方 31 producer.send(message); 32 } catch (Exception e) { 33 e.printStackTrace(); 34 }finally{ 35 try { 36 if (null != connection){ 37 connection.close(); 38 } 39 } catch (Throwable ignore) { 40 } 41 } 42 } 43 }
服務工廠類,貌似作用不大:
public class AsyncWorkFactory { private static ConcurrentHashMap<String, AsyncWorkProducer> chm = new ConcurrentHashMap<String, AsyncWorkProducer>(); private AsyncWorkFactory(){} public static AsyncWorkProducer getProducer(String queueName){ AsyncWorkProducer awp = chm.get(queueName); if(awp==null){ awp = new AsyncWorkProducer(queueName); chm.put(queueName, awp); } return awp; } public static void sendMessage(Message message,String queueName){ getProducer(queueName).sendMessage(message); } }
線程監聽:
1 public class AsyncWorkConsumer implements Runnable{ 2 // ConnectionFactory :連接工廠,JMS 用它創建連接 3 private ConnectionFactory connectionFactory; 4 private AsycWork work; 5 private String queueName = "QueueName"; 6 public AsyncWorkConsumer(String queueName,AsycWork work){ 7 this.queueName = queueName; 8 this.work = work; 9 init(); 10 } 11 private void init(){ 12 connectionFactory = new ActiveMQConnectionFactory( 13 ActiveMQConnection.DEFAULT_USER, 14 ActiveMQConnection.DEFAULT_PASSWORD, 15 SystemConfiguration.getString("asyc.location")); 16 } 17 @Override 18 public void run() { 19 Connection connection = null; 20 try { 21 // Connection :JMS 客戶端到JMS Provider 的連接 | 構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar 22 connection = connectionFactory.createConnection(); 23 connection.start(); 24 // Session: 一個發送或接收消息的線程 | 獲取操作連接 25 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 26 // Destination :消息的目的地;消息發送給誰. 27 Destination destination = session.createQueue(queueName); 28 // MessageProducer:消息發送者 |得到消息生成者【發送者】 29 MessageConsumer consumer = session.createConsumer(destination); 30 //設置不持久化,實際根據項目決定 31 while (true) { 32 //可設置接收者接收消息的時間 consumer.recevie(xxx) 33 Message message = consumer.receive(); 34 work.execute(message); 35 } 36 } catch (Exception e) { 37 e.printStackTrace(); 38 }finally{ 39 try { 40 if (null != connection){ 41 connection.close(); 42 } 43 } catch (Throwable ignore) { 44 } 45 } 46 } 47 }
回調處理:
1 public class EmailWorker implements AsycWork { 2 private static Logger log = LoggerFactory.getLogger(EmailWorker.class); 3 @Override 4 public void execute(Message message) { 5 ActiveMQMapMessage msg = (ActiveMQMapMessage) message; 6 try { 7 String address = msg.getString("address"); 8 String title = msg.getString("title"); 9 String content = msg.getString("content"); 10 Constants.sendMail(address, title, content); 11 } catch (JMSException e) { 12 log.error("異步郵件發送異常", e); 13 } 14 } 15 }
項目啟動時執行如下代碼啟動線程:
1 Thread emailThread = new Thread(new AsyncWorkConsumer(AsycWork.EMAIL,emailWorker)); 2 emailThread.setDaemon(true); 3 emailThread.start(); 4 //啟動線程綁定各種回調 5 Thread normalLogThread = new Thread(new AsyncWorkConsumer(AsycWork.NORMAL_LOG,normalLogWork)); 6 normalLogThread.setDaemon(true); 7 normalLogThread.start(); 8 Thread loginLogThread = new Thread(new AsyncWorkConsumer(AsycWork.LOGIN_LOG,loginLogWorker)); 9 loginLogThread.setDaemon(true); 10 loginLogThread.start();
調用異步的工具類:
1 public class AsyncUtils { 2 private static Logger log = LoggerFactory.getLogger(AsyncUtils.class); 3 public static void log(String type,String operate){ 4 if(!SystemConfigFromDB.getBoolean(SystemConfigFromDB.NEED_NORMAL_LOG)){ 5 return; 6 } 7 try{ 8 User user = (User) SecurityUtils.getSubject().getSession().getAttribute("loginUser"); 9 if(user==null){ 10 return; 11 } 12 OperateLog log = new OperateLog(user.getId(), user.getName(), operate,type, user.getLastLoginIp()); 13 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 14 message.setObject(log); 15 AsyncWorkFactory.sendMessage(message, AsycWork.NORMAL_LOG); 16 }catch (Exception e) { 17 log.error("日志記錄出錯!", e); 18 } 19 } 20 public static void sendMail(String address,String title,String content){ 21 if(!SystemConfigFromDB.getBoolean(SystemConfigFromDB.NEED_SEND_MAIL)){ 22 return; 23 } 24 try{ 25 ActiveMQMapMessage message = new ActiveMQMapMessage(); 26 message.setString("address", address); 27 message.setString("title", title); 28 message.setString("content", content); 29 AsyncWorkFactory.sendMessage(message, AsycWork.EMAIL); 30 }catch (Exception e) { 31 log.error("郵件發送出錯!",e); 32 } 33 } 34 public static void loginLog(String uid,String ip,Date date){ 35 if(!SystemConfigFromDB.getBoolean(SystemConfigFromDB.NEED_LOG_CLIENTUSER_LOGINLOG)){ 36 return; 37 } 38 try{ 39 ActiveMQMapMessage message = new ActiveMQMapMessage(); 40 message.setString("uid", uid); 41 message.setString("ip", ip); 42 message.setString("date", DateUtils.formatDateTime(date, "yyyy-MM-dd HH:mm:ss")); 43 AsyncWorkFactory.sendMessage(message, AsycWork.LOGIN_LOG); 44 }catch (Exception e) { 45 log.error("郵件發送出錯!",e); 46 } 47 } 48 }
在需要異步處理的地方執行類似如下代碼:
AsyncUtils.sendMail("xxx@xxx.com", "郵件標題", "郵件內容");//異步發送郵件
這樣就可以執行異步操作了。
適用於
異步系統適用於與主要業務邏輯無關的較耗時或不需要同步操作的,失敗時不影響主業務邏輯的功能點:
比如:1.在用戶注冊的時候記錄數據做后期統計、發送注冊成功郵件等
2.系統操作的日志記錄
3.iOS消息推送
4.發送短信
...
在使用異步系統之前,用戶注冊與注冊日志記錄是在同一個事務完成的,用戶注冊失敗則不會記錄日志,但同時,日志記錄發生異常也會引起用戶注冊失敗,日志記錄本身是與用戶注冊這個邏輯不相關的工作,在日志發生異常的時候不應該使用戶注冊失敗。
在使用異步系統之后,用戶注冊邏輯執行結束后,調用異步的注冊日志記錄與異步的注冊郵件發送功能即可,不用等待日志記錄與郵件發送的返回,即可直接返回用戶注冊成功。將日志與郵件異步處理,既提高了響應速度也使邏輯更加嚴謹。在發生異常的時候,消息隊列會將消息繼續保留,留待后續處理。
PS:本文的實現方式大部分為自己摸索的,之前沒有接觸過類似的模塊,所以有些地方都是按照自己的理解處理的,通用的異步系統是不是這種結構本人不是太了解,歡迎交流。后面會介紹一下最新的實現方式,修改為了基於Spring管理的異步系統,將ActiveMQ丟給了Spring,依靠Spring發送與監聽消息,相比這個可能會更靠譜一點。