一 什么是消息隊列
我們可以把消息隊列比作是一個存放消息的容器,當我們需要使用消息的時候可以取出消息供自己使用。消息隊列是分布式系統中重要的組件,使用消息隊列主要是為了通過異步處理提高系統性能和削峰、降低系統耦合性。目前使用較多的消息隊列有ActiveMQ,RabbitMQ,Kafka,RocketMQ
二 為什么要用消息隊列
使用消息隊列主要有兩點好處:
1.通過異步處理提高系統性能(削峰、減少響應所需時間);
2.降低系統耦合性。如果在面試的時候你被面試官問到這個問題的話,一般情況是你在你的簡歷上涉及到消息隊列這方面的內容,這個時候推薦你結合你自己的項目來回答。
三 ActiveMQ
ActiveMQ 是基於 JMS 規范實現的。
JMS消息列隊有兩種消息模式,一種是點對點的消息模式,還有一種是訂閱的模式。
四 實現
ActiveMQ下載地址:http://activemq.apache.org/components/classic/download/
解壓縮apache-activemq-5.xxx-bin.zip到一個目錄
啟動ActiveMQ:運行C:\ apache-activemq-5.xxx\bin\activemq.bat
瀏覽器中輸入:http://localhost:8161/admin/ 測試啟動情況
使用點對點方式實現聊天功能
編寫消息發送類和接收類。發送類中需要連接ActiveMQ 服務器,創建隊列,發送消息;接收類中需要ActiveMQ 服務器,讀取發送者發送消息所用的隊列。接收類實現為一個單獨的線程,使用監聽器模式,每隔一段時間偵聽是否有消息到來,若有消息到來,將消息添加到輔助類消息列表中。
使用2個隊列,即對於每一個用戶來說,發送消息為一個隊列,接受消息為一個隊列。
效果如下:

1 import org.apache.activemq.ActiveMQConnectionFactory; 2 3 import javax.jms.*; 4 5 import static java.lang.Thread.sleep; 6 7 public class MessageReceiver implements Runnable{ 8 private String url; 9 private String user; 10 private String password; 11 private final String QUEUE; 12 private Boolean stop; 13 Connection connection; 14 15 public MessageReceiver(String queue, String url, String user, String password) { 16 this.url = url; 17 this.user = user; 18 this.password = password; 19 this.QUEUE = queue; 20 stop = false; 21 } 22 23 public void run() { 24 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); 25 try { 26 connection = connectionFactory.createConnection(); 27 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 28 Destination receiveQueue = session.createQueue(QUEUE); 29 MessageConsumer consumer = session.createConsumer(receiveQueue); 30 connection.start(); 31 while(!stop) { 32 consumer.setMessageListener(new MessageListener() { 33 @Override 34 public void onMessage(Message message) { 35 try { 36 //獲取到接收的數據 37 String text = ((TextMessage) message).getText(); 38 MessageText.setMsg(text); 39 } catch (JMSException e) { 40 e.printStackTrace(); 41 } 42 } 43 }); 44 sleep(500); 45 } 46 } catch (JMSException e) { 47 e.printStackTrace(); 48 }catch (InterruptedException e) { 49 //Thread.currentThread().interrupt(); 50 e.printStackTrace(); 51 } 52 } 53 54 public void setStop(Boolean stop) { 55 this.stop = stop; 56 } 57 58 public void closeConnection(){ 59 try { 60 connection.close(); 61 } catch (JMSException e) { 62 e.printStackTrace(); 63 } 64 } 65 66 public String getUrl() { 67 return url; 68 } 69 70 public void setUrl(String url) { 71 this.url = url; 72 } 73 74 public String getUser() { 75 return user; 76 } 77 78 public void setUser(String user) { 79 this.user = user; 80 } 81 82 public String getPassword() { 83 return password; 84 } 85 86 public void setPassword(String password) { 87 this.password = password; 88 } 89 }

1 import org.apache.activemq.ActiveMQConnectionFactory; 2 3 import javax.jms.*; 4 import java.text.DateFormat; 5 import java.text.SimpleDateFormat; 6 import java.util.Date; 7 8 public class MessageSender { 9 private String url; 10 private String user; 11 private String password; 12 private final String QUEUE; 13 private Connection connection; 14 private Session session; 15 private Destination sendQueue; 16 private MessageProducer sender; 17 private TextMessage outMessage; 18 private DateFormat df; 19 20 public MessageSender(String queue, String url, String user, String password) { 21 this.url = url; 22 this.user = user; 23 this.password = password; 24 this.QUEUE = queue; 25 } 26 27 public void init() { 28 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); 29 df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 30 try { 31 connection = connectionFactory.createConnection(); 32 connection.start(); 33 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 34 sendQueue = session.createQueue(QUEUE); 35 sender = session.createProducer(sendQueue); 36 outMessage = session.createTextMessage(); 37 } catch (JMSException e) { 38 e.printStackTrace(); 39 } 40 } 41 42 public void sendMessage(String messageStr) { 43 try { 44 outMessage = session.createTextMessage(); 45 String sendStr = df.format(new Date()) + "\n" + QUEUE + ": " + messageStr; 46 outMessage.setText(sendStr); 47 sender.send(outMessage); 48 session.commit(); 49 MessageText.setMsg(sendStr); 50 } catch (JMSException e) { 51 e.printStackTrace(); 52 } 53 } 54 55 public void closeConnection() { 56 try { 57 sender.close(); 58 connection.close(); 59 } catch (JMSException e) { 60 e.printStackTrace(); 61 } 62 } 63 64 public String getUrl() { 65 return url; 66 } 67 68 public void setUrl(String url) { 69 this.url = url; 70 } 71 72 public String getUser() { 73 return user; 74 } 75 76 public void setUser(String user) { 77 this.user = user; 78 } 79 80 public String getPassword() { 81 return password; 82 } 83 84 public void setPassword(String password) { 85 this.password = password; 86 } 87 }