一、 為什么使用消息中間件?
假設用戶登錄系統 傳統方式 用戶登錄 調用短息服務 積分服務 日志服務等各種服務 如果短息服務出現問題就無法發送短信而且用戶登錄成功必須所有調用全部完成返回給用戶登錄系統一條用戶登錄成功信息。從整體業務上講 用戶只是要登錄系統 並不關心短信服務 日志服務怎么樣就想登錄成功就好 這種操作讓用戶等待了時間。
2)通過消息中間件解耦服務調用
用戶登錄系統會將登錄消息發送給消息中間件 ---消息中間件會將用戶登錄消息異步一條一條推送給---短息服務 日志服務等其他相關服務 用戶就不需要等待其他服務處理完成在給我返回結果。
二、 消息中間件的好處:
1)系統解耦
2)異步
3)橫向擴展
4)安全可靠 消息中間件會把我們的消息進行保存 如果其他業務系統出現問題 或者業務系統沒有對消息進行消費 業務系統可以下一次繼續對消息進行消費
5)順序保存
三、中間件是什么:
中間件作用在業務系統之間 不是操作系統軟件 還不是業務軟件,用戶不能直接使用的軟件同一叫法。
四、消息中間件:
用於數據接收和發送,利用高效可靠的異步消息傳遞機制集成分布式系統
五、JMS (java Message Service )
Java消息服務 java消息中間件的API,用於在兩個應用程序之間或者分布式系統中發送消息,進行異步通信的規范。
六、AMQP
提供統一消息服務的應用層標准協議,遵循這個協議客戶端與消息中間件可以傳遞消息,不會受到客戶端和中間件不同產品,是不同開發語言影響 只要遵循這種協議就可以傳遞消息。
七、常見消息中間件
activeMQ 是一個完全支持JMS1.1和J2EE1.4規范的
rabbitMQ 是一個開源的AMQP實現,用於分布式系統中存儲轉發消息
kafka 是一個高吞吐量的分布式發布訂閱消息系統,是一個分布式的,分區的,可靠的分布式日志存儲服務。(不是一個嚴格消息中間件 )
1)高吞吐量:即使非常普通的硬件kafka也可以支持每秒數百萬的消息
八、JMS規范
提供者:實現JMS規范的消息中間件服務器
客戶端:接收或發送消息的應用程序
生產者/發布者:創建並發送消息的客戶端
消費者/訂閱者:接收並處理消息的客戶端
消息:應用程序之間傳遞的數據內容
消息模式:在客戶端之間傳遞消息的方式,JMS中定義了主題和隊列兩種模式
九、JMS消息模式
1)隊列模式
客戶端包括生產者和消費者
隊列中的消息只能被一個消費者消費
消費者可以隨時消費隊列的消息
舉例:生產者 應用1 應用2 向JMS隊列中發送消息 應用1發送 1 3 5 應用2 發送2 4 6 JMS消息隊列中會存在 1 2 3 4 5 6 消息 時存在消費者 應用3 應用4 應用3與JMS 有兩個鏈接 應用4有一個鏈接 在消費消息的時候 三個鏈接會平均分配6各消息
2)主題模式
客戶端:包括發布者和訂閱者
主題中的消息被所有訂閱者消費
消費者不能消費訂閱之前就發送到主題中的消息(消費者要消費隊列中的消息要先訂閱在消費 如果不提前訂閱是接收不到消息的)
舉例:應用3 與應用4 向隊列中訂閱消息 應用3建立了兩個鏈接 應用4建立了一個鏈接 發布者 應用1 應用2 向隊列中發布消息 123456 當訂閱者消費消息的時候三個鏈接都消費了6個消息
十、JMS編碼接口
ConnectionFactory 用於創建鏈接到消息中間件的鏈接工廠
Connection 代表可應用程序和消息服務器之間的通信鏈路
Destination (目的地) 指消息發布和接收的地點,包括隊列和主題
Session 表示一個單線程的上下文,用於發送和接收消息
MessageConsumer 由會話創建,用於接收發送到目標的消息
MessageProducer 由會話創建,用於發送消息到目標
Message 是在消費者和生產者之間傳送的對象,消息頭,一組消息屬性,一個消息體

十一:JMS代碼演示
1)使用JMS接口規范鏈接activeMQ 隊列模式
引入activemq依賴jar 注意:引入相關jar 必須與相應的jdk匹配否則會報異常
1 java.lang.UnsupportedClassVersionError: org/apache/lucene/store/Directory : Unsupported major.minor version 51.0
2 at java.lang.ClassLoader.defineClass1(Native Method)
3 at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
4 at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
5 at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
6 at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
7 at java.net.URLClassLoader.access$000(URLClassLoader.java:58) 8 at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
1 <dependencies> 2 <dependency> 3 <groupId>org.apache.activemq</groupId> 4 <artifactId>activemq-all</artifactId> 5 <version>5.14.0</version> 6 </dependency> 7 </dependencies>
創建消費提供方(主題模式消息發布方)
1 public class JmsProduce { 2 3 //聲明服務器地址 4 private static final String url = "tcp://127.0.0.1:61616"; 5 //聲明隊列名稱 6 //private static final String queue = "queue_test"; 7 private static final string topic = "topic_test"; 8 public static void main(String []args)throws Exception{ 9 10 //創建連接工廠 由消息服務商提供 11 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); 12 //根據消息工廠創建連接 13 Connection connection = factory.createConnection(); 14 //開啟連接 15 connection.start(); 16 //根據連接創建會話 參數一 是否使用事務 參數二 應答模式 17 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 18 19 //創建目標 也就是隊列 20 // Destination destination = session.createQueue(JmsProduce.queue); 21 //創建主題目標 22 Destination destination = session.createTopic(topic); 23 //創建一個生產者 24 MessageProducer producer = session.createProducer(destination); 25 // 26 for (int i=0;i<100;i++){ 27 //創建消息 28 TextMessage textMessage = session.createTextMessage("test" + i); 29 //生產者將消息發送給隊列 30 producer.send(textMessage); 31 System.out.println("生產者"+textMessage); 32 } 33 connection.close(); 34 35 } 36 }
消息消費方(主題模式訂閱者)
1 public class JmsConsumer { 2 3 private static final String url="tcp://127.0.0.1:61616"; 4 //private static final String queue = "queue_test"; 5 private static final String topic = "topic_test"; 6 public static void main(String [] args) throws JMSException { 7 8 //創建連接工廠 由消息服務商提供 9 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); 10 //根據消息工廠創建連接 11 Connection connection = factory.createConnection(); 12 //開啟連接 13 connection.start(); 14 //根據連接創建會話 參數一 是否使用事務 參數二 應答模式 15 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 16 //創建目標 也就是隊列 17 //Destination destination = session.createQueue(queue); 18 //創建主題目標 19 Destination destination = session.createTopic(topic); 20 //創建消費者 21 MessageConsumer consumer = session.createConsumer(destination); 22 //創建一個監聽器 23 consumer.setMessageListener(new MessageListener() { 24 public void onMessage(Message message) { 25 TextMessage message1 = (TextMessage) message; 26 System.out.println("接收消息"+message1); 27 } 28 }); 29 } 30 }
隊列模式是點對點形式
主題模式 消費者需要先對主題進行訂閱 然后發布者在發布過程中消費者才能消費消息
Spring 整合JMS ActiveMq
創建一個maven項目
pom.xml
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 2 <modelVersion>4.0.0</modelVersion> 3 <groupId>cn.ac.bcc</groupId> 4 <artifactId>Jms-Activemq</artifactId> 5 <version>0.0.1-SNAPSHOT</version> 6 <packaging>war</packaging> 7 <properties> 8 <spring.version>4.1.3.RELEASE</spring.version> 9 </properties> 10 <dependencies> 11 <dependency> 12 <groupId>junit</groupId> 13 <artifactId>junit</artifactId> 14 <version>4.11</version> 15 <scope>test</scope> 16 </dependency> 17 <dependency> 18 <groupId>org.springframework</groupId> 19 <artifactId>spring-context</artifactId> 20 <version>${spring.version}</version> 21 </dependency> 22 <dependency> 23 <groupId>org.springframework</groupId> 24 <artifactId>spring-jms</artifactId> 25 <version>${spring.version}</version> 26 </dependency> 27 <dependency> 28 <groupId>org.springframework</groupId> 29 <artifactId>spring-test</artifactId> 30 <version>${spring.version}</version> 31 </dependency> 32 <dependency> 33 <groupId>org.apache.activemq</groupId> 34 <artifactId>activemq-core</artifactId> 35 <version>5.7.0</version> 36 <exclusions> 37 <exclusion> 38 <artifactId>spring-context</artifactId> 39 <groupId>org.springframework</groupId> 40 </exclusion> 41 </exclusions> 42 </dependency> 43 </dependencies> 44 <build> 45 <plugins> 46 <plugin> 47 <groupId>org.apache.maven.plugins</groupId> 48 <artifactId>maven-compiler-plugin</artifactId> 49 <version>3.2</version> 50 <configuration> 51 <source>1.7</source> 52 <target>1.7</target> 53 <encoding>UTF-8</encoding> 54 </configuration> 55 </plugin> 56 </plugins> 57 </build> 58 </project>
消息提供方實現
1)定義消息服務方接口
1 package cn.ac.bcc.jms.service; 2 3 public interface ProducerService { 4 //定義發送消息的方法 5 public void sendMessage(String message); 6 }
2)配置公共common.xml文件
1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans 5 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 6 http://www.springframework.org/schema/context 7 http://www.springframework.org/schema/context/spring-context-4.0.xsd 8 http://www.springframework.org/schema/jms 9 http://www.springframework.org/schema/jms/spring-jms-4.0.xsd"> 10 11 12 <!--開啟注解掃描 --> 13 <context:annotation-config/> 14 <!--配置activemq連接工廠 在spring提供的連接工廠中需要提供activemq提供的工廠 --> 15 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 16 <!-- 配置activemq服務器地址 通過地址創建連接 --> 17 <property name="brokerURL" value="tcp://localhost:61616"/> 18 </bean> 19 <!-- 配置spring jms 提供的連接工廠 --> 20 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 21 <property name="targetConnectionFactory" ref="targetConnectionFactory"/> 22 </bean> 23 <!-- 配置activeMq目的地 --> 24 <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> 25 <!-- 指定隊列名稱 通過構造方式 --> 26 <constructor-arg value="queue-test"/> 27 </bean> 28 29 </beans>
3)配置spring配置文件 producer.xml
1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans 5 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 6 http://www.springframework.org/schema/context 7 http://www.springframework.org/schema/context/spring-context-4.0.xsd 8 http://www.springframework.org/schema/jms 9 http://www.springframework.org/schema/jms/spring-jms-4.0.xsd" > 10 11 <!--引入公共配置文件 --> 12 <import resource="common.xml" /> 13 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 14 <!--注入連接工廠 --> 15 <property name="connectionFactory" ref="connectionFactory" /> 16 </bean> 17 <bean class="cn.ac.bcc.jms.service.impl.ProducerServiceImpl"></bean> 18 </beans>
4)實現消息發送接口
1 package cn.ac.bcc.jms.service.impl; 2 3 import javax.annotation.Resource; 4 import javax.jms.Destination; 5 import javax.jms.JMSException; 6 import javax.jms.Message; 7 import javax.jms.Session; 8 import javax.jms.TextMessage; 9 10 import org.springframework.beans.factory.annotation.Autowired; 11 import org.springframework.jms.core.JmsTemplate; 12 import org.springframework.jms.core.MessageCreator; 13 import org.springframework.stereotype.Service; 14 15 import cn.ac.bcc.jms.service.ProducerService; 16 17 @Service 18 public class ProducerServiceImpl implements ProducerService { 19 20 @Autowired 21 private JmsTemplate jmsTemplate; 22 @Resource(name = "queueDestination") 23 private Destination destination; 24 25 @Override 26 public void sendMessage(final String message) { 27 28 //通過jmsTemplate 模板發送消息 傳遞兩個參數 消息的目的地 也就是activemq服務 參數2 創建一個消息體 封裝消息信息 29 jmsTemplate.send(destination, new MessageCreator() { 30 31 @Override 32 public Message createMessage(Session session) throws JMSException { 33 34 TextMessage textMessage = session.createTextMessage(message); 35 System.out.println("發送消息" + textMessage.getText()); 36 return textMessage; 37 } 38 }); 39 40 } 41 42 }
5)測試類
1 package cn.ac.bcc.test; 2 3 import org.springframework.context.support.ClassPathXmlApplicationContext; 4 5 import cn.ac.bcc.jms.service.ProducerService; 6 7 public class JmsProducer { 8 9 public static void main(String[] args) { 10 11 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml"); 12 // 獲取提供者接口實例 13 ProducerService producerService = context.getBean(ProducerService.class); 14 for (int i = 0; i < 100; i++) { 15 // 調用發送消息方法 16 producerService.sendMessage("消息發送來了" + i); 17 } 18 //關閉連接 19 context.close(); 20 } 21 22 }
消息消費方實現
1)自定義消息消費方監聽實現spring提供的MessageListener監聽
1 package cn.ac.bcc.jms.listener; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.TextMessage; 7 8 public class ConsumerListener implements MessageListener { 9 10 @Override 11 public void onMessage(Message message) { 12 TextMessage textMessage = (TextMessage)message; 13 try { 14 System.out.println("消息消費"+textMessage.getText()); 15 } catch (JMSException e) { 16 // TODO Auto-generated catch block 17 e.printStackTrace(); 18 } 19 } 20 21 }
2)配置消費方spring 配置文件 consumer.xml
1 <beans xmlns="http://www.springframework.org/schema/beans" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans 5 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 6 http://www.springframework.org/schema/context 7 http://www.springframework.org/schema/context/spring-context-4.0.xsd 8 http://www.springframework.org/schema/jms 9 http://www.springframework.org/schema/jms/spring-jms-4.0.xsd"> 10 <!--引入公共配置文件 --> 11 <import resource="common.xml"/> 12 <!--創建自定義監聽 --> 13 <bean id = "consumerListener" class="cn.ac.bcc.jms.listener.ConsumerListener"></bean> 14 <!--配置jms監聽器 --> 15 <bean id="jmsLisener" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 16 <property name="connectionFactory" ref="connectionFactory"></property> 17 <property name="destination" ref="queueDestination"></property> 18 <property name="messageListener" ref="consumerListener"></property> 19 </bean> 20 </beans>
3)消費方測試實現
1 package cn.ac.bcc.test; 2 3 import org.springframework.context.support.ClassPathXmlApplicationContext; 4 5 public class ConsumerText { 6 7 public static void main(String[] args) { 8 9 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml"); 10 11 } 12 13 }
以上為spring整合JMS 實現消息接收發送隊列模式實現 在消息接收與發送過程中要啟動activemq
//設置消息的有效期 當24小時內消息接收為有效期
jmsTemplate.setTimeToLive(86400000)
