消息中間件利用高效可靠的消息傳遞機制進行平台無關的數據交流,並基於數據通信來進行分布式系統的集成,通過提供消息傳遞和消息排隊模型,它可以在分布式環境下拓展進程間的通信,對於消息中間件,常見的角色大致也就有Producer(生產者).Consumer(消費者)
MQ 消息中間件 消息隊列
Message Queue簡稱MQ
種類:
1.Apache ActiveMQ
ActiveMQ是Apache出品,最流行的,能力強勁的開源消息總線.ActiveMQ是一個完全支持JMS1.1和J2EE1.4規范的JMS Provider實現.我們在本次課程中介紹ActiveMQ的使用.
2.阿里 RocketMQ
3.Pivotal 開發RabbitMQ
AMQP協議的領導實現,支持多種場景.淘寶的Mysql集群內部有使用它進行通訊,OpenStack開源雲平台的通信組件,最先在金融行業得到運用.
ZeroMQ
史上最快的消息隊列系統.
kafka:
Apache下的一個子項目.特點:高吞吐,在一台普通的服務器上既可以達到10W/s的吞吐速率;完全的分布式系統.適合處理海量數據.
使用場景(為什么使用MQ?):
//TODO
JMS簡介
什么是JMS?
JMS(Java Messaging Service)是java平台上有關面向消息中間件的技術規范(可以使用jmsTemplate),它便於消息系統中的java應用程序進行消息交換,並且通過提供標准的產生,發送,接收消息的接口庫簡化企業應用的開發.
JMS本身只定義了一系列的接口規范,是一種與廠商無關的API.用來訪問消息收發系統.它類似於JDBC(java Database Connectivity);這里,JDBC是可以用來訪問許多不同關系數據庫的的API.而JMS則提供同樣與廠商無關的訪問方法,以訪問消息收發服務.許多廠商目前都支持JMS,包括IBM的MQseries.BEA的Weblogic.JMSservice和Progress的SonicMQ,這只是幾個例子.JMS使您能夠通過消息收發服務(有時稱為消息中介程序或者路由器)從一個JMS客戶機向另一個JMS客戶機發送消息,消息是JMS中的一種類型對象,由兩部分組成:報頭和消息主體.
報頭由路由信息以及有關該消息的元數據組成.
消息主體則攜帶着應用程序的數據或有效負載.
JMS定義了五種不同的消息正文格式,以及調用的消息類型.允許你發送並接收以一些不同形式的數據,提供現有消息格式的一些級別的兼容性.
TextMessage ---一個字符串對象
MapMessage---一套名稱-值對
ObjectMessage--一個序列化的java對象
ByteMessage -- 一個字節的數據流
StreamMessage --Java原始值的數據流
JMS消息傳遞類型
對於消息的傳遞有兩種類型:
一種是點對點,即一個生產者和一個消費者一一對應.
另一種是發布/訂閱模式,即一個生產者產生消息並進行發送后,可以由多個消費者進行接收.
JMS入門小Demo
現在是點對點模式:
點對點的模式主要建立在一個隊列上面,當連接一個列隊的時候,發送端不需要知道接收端是否正在接收,可以直接行ActiveMQ發送消息,發送的消息,將會先進入隊列中,如果有接收端在監聽,則會發現接收端,如果沒有接收端接收,多個接收端,但是一條消息,只會被一個接收端給接收到,那個接收端先連上ActiveMQ,則會先接收到,而后來的接收端則接收不到那條消息.
創建的是一個沒有使用任何骨架的java工程
引入的依賴為:
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency> </dependencies> <build> <plugins> <!-- java編譯插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build>
代碼
/** * @Auther:qingmu * @Description:腳踏實地,只為出人頭地 * @Date:Created in 16:16 2019/4/23 */ import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 點對點模式 * 生產者 */ public class QueueProducer { public static void main(String[] args) throws Exception { //1.創建連接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616"); //2.獲取連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session (參數1:是否啟動事務,參數2:消息確認模式) /** * AUTO_ACKNOWLEDGE = 1 自動確認 • CLIENT_ACKNOWLEDGE = 2 客戶端手動確認 • DUPS_OK_ACKNOWLEDGE = 3 自動批量確認 • SESSION_TRANSACTED = 0 事務提交並確認 */ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建隊列對象 Queue queue = session.createQueue("test-queue"); //6.創建消息生產者 MessageProducer producer = session.createProducer(queue); //7.創建消息 TextMessage textMessage = session.createTextMessage("歡迎來到神奇的"); //8.發送消息 producer.send(textMessage); //9.關閉資源 producer.close(); session.close(); connection.close(); } }
直接運行這個main方法,后可以mq中查看到:
消費者的代碼為:
/** * @Auther:qingmu * @Description:腳踏實地,只為出人頭地 * @Date:Created in 16:23 2019/4/23 */ import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 點對點模式 * 消息消費者 */ public class QueueConsumer { public static void main(String[] args) throws Exception{ //1.創建連接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616"); //2.獲取連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session (參數1:是否啟動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建隊列對象 Queue queue = session.createQueue("test-queue"); //6.創建消息消費 MessageConsumer consumer = session.createConsumer(queue); //7.監聽消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待鍵盤輸入 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close(); } }
通過上面定義的監聽器,可以獲取到生產者生產的信息.在控制台上
在進行測試的時候,開啟兩個以上的消費者,開啟一個生產者,然后可以觀察到只能在一個消費者的控制台上進行顯示,而在另一個消費者的控制台上不能進行打印.
發布和訂閱者模式
消費生產者:
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @Auther:qingmu * @Description:腳踏實地,只為出人頭地 * @Date:Created in 16:32 2019/4/23 */ public class TopicProducer { public static void main(String[] args) throws Exception{ //1.創建連接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616"); //2.獲取連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session (參數1:是否啟動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建主題對象 Topic topic = session.createTopic("test-topic"); //6.創建消息生產者 MessageProducer producer = session.createProducer(topic); //7.創建消息 TextMessage textMessage = session.createTextMessage("歡迎來到神奇的世界"); //8.發送消息 producer.send(textMessage); //9.關閉資源 producer.close(); session.close(); connection.close(); } }
運行完以后的結果為:
消費者為:
/** * @Auther:qingmu * @Description:腳踏實地,只為出人頭地 * @Date:Created in 16:36 2019/4/23 */ import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 訂閱 * 多對多 */ public class TopicConsumer { public static void main(String[] args) throws Exception{ //1.創建連接工廠 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616"); //2.獲取連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session (參數1:是否啟動事務,參數2:消息確認模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建主題對象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.創建消息消費 MessageConsumer consumer = session.createConsumer(topic); //7.監聽消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待鍵盤輸入 System.in.read(); //9.關閉資源 consumer.close(); session.close(); connection.close(); } }
運行測試:
同時開啟2個以上的消費者,再次運行生產者,觀察每一個消費者控制台的輸出,會發現每個消費者會接收到消息.