Apache activeMQ消息隊列


ActiveMQ簡介

  ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現

  JMS:Java Message Service    java消息服務     

  ActiveMQ:實現JMS規范

  JMS只給出接口,具體的實現由中間件完成,AcitveMQ為其中的一種

  其它的消息隊列產品:ActiveMQ、RabbitMQ、Kafka、MetaMQ等

  消息隊列中間件是分布式系統中的重要組件,主要解決應用耦合,異步消息,流量削鋒等問題,  實現高性能,高可用,可伸縮和最終一致性的架構

ActiveMQ下載

  官網:http://activemq.apache.org/

  

目錄結構

啟動ActiveMQ

  進入bin目錄啟動服務。
  http://localhost:8161/admin/queues.jsp
  http端口8161:web頁面訪問端口
  Tcp端口連接服務端口:61616
  默認登陸用戶名,密碼:admin

ActiveMQ操作界面 

常用術語

    Provider/MessageProvider:生產者
    Consumer/MessageConsumer:消費者
    PTP:Point To Point,點對點通信消息模型
    Pub/Sub:Publish/Subscribe,發布訂閱消息模型
    Queue:隊列,目標類型之一,和PTP結合
    Topic:主題,目標類型之一,和Pub/Sub結合
    ConnectionFactory:連接工廠,JMS用它創建連接
    Connnection:JMS Client到JMS Provider的連接
    Destination:消息目的地,由Session創建
    Session:會話,由Connection創建,實質上就是發送、接受消息的一個線程,因此生產者、消費者都是Session創建的
View Code

spring整合activeMQ應用

配置生產者

第一步:創建maven,導入spring和activeMQ的坐標或web工程,導入相應activeMQjar包和與spring整合的jar包

<dependencies>
      <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-all</artifactId>
          <version>5.2.0</version>
      </dependency>
      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-jms</artifactId>
          <version>4.2.4.RELEASE</version>
      </dependency>
      <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.9</version>
      </dependency>
      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-test</artifactId>
          <version>4.2.4.RELEASE</version>
      </dependency>
  <dependency>
        <groupId>org.apache.xbean</groupId>
        <artifactId>xbean-spring</artifactId>
        <version>4.2</version>
    </dependency>
  </dependencies>
pom.xml

第二步:提供spring配置文件(配置生產者相關)引入amq,jms名稱空間

第三步:配置連接工廠(緩存session工廠),配置模板對象

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc" 
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:jpa="http://www.springframework.org/schema/data/jpa" 
    xmlns:task="http://www.springframework.org/schema/task"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xsi:schemaLocation="
                        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
                        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
                        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
                        http://www.springframework.org/schema/data/jpa 
                        http://www.springframework.org/schema/data/jpa/spring-jpa.xsd
                        http://activemq.apache.org/schema/core
                        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">               
   <!-- 配置連接工廠對象:產生Connection  方式一:通過amq名稱空間創建連接工廠  方式二:可以通過bean標簽創建對象-->
    <!-- <amq:connectionFactory 
        id="connectionFactory" 
        userName="admin" password="admin" 
        brokerURL="tcp://localhost:61616">
    </amq:connectionFactory> -->
    
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg index="0" value="admin"></constructor-arg>
        <constructor-arg index="1" value="admin"></constructor-arg>
        <constructor-arg index="2" value="tcp://localhost:61616"></constructor-arg>
    </bean>
    
    
    <!-- spring提供優化緩存session對象 -->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="connectionFactory"></property>
        <property name="sessionCacheSize" value="10"></property>
    </bean>
    
    
    <!-- spring提供模板對象jmsTemplate:向mq服務器寫入消息(p2p,pub/sub) -->
    
    <!-- 發送點對點消息 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory"></property>
        <!-- 通過屬性pubSubDomain指定消息模式:默認值false  -->
        <property name="pubSubDomain" value="false"></property>
    </bean>
    
    <!-- 發送主題模式消息 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory"></property>
        <property name="pubSubDomain" value="true"></property>
    </bean>
</beans>

第四步:編寫單元測試方法,在類中注入模板對象JmsTemplate。通過此對象發送消息到隊列

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext.xml")
public class ProduceTest {

    //注入模板對象
    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;

    @Test
    public void test() {
        jmsTemplate.send("test_spring", new MessageCreator() {

            // 創建對象
            public Message createMessage(Session session) throws JMSException {
                MapMessage mapMessage = session.createMapMessage();
                mapMessage.setString("tel", "1311111111");
                mapMessage.setString("code", "MSXX88sdfsdf");
                return mapMessage;
            }
        });
    }

    /*public static void main(String[] args) {
        ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
        JmsTemplate jmsTemplate = (JmsTemplate) classPathXmlApplicationContext.getBean("jmsQueueTemplate");
        //發送消息
        jmsTemplate.send("test_spring", new MessageCreator() {
            
            //創建對象
            public Message createMessage(Session session) throws JMSException {
                MapMessage mapMessage = session.createMapMessage();
                mapMessage.setString("tel", "1311111111");
                mapMessage.setString("code", "MSXX88sdfsdf");
                return mapMessage;
            }
        });
    }*/
}

配置消費者

第一步:開發一個類,監聽消息隊列

@Component("consumerListener")
public class ConsumerListener implements MessageListener{

    //如果注冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法
    public void onMessage(Message message) {
        try {
            MapMessage mapMessage = (MapMessage) message;
            String tel = mapMessage.getString("tel");
            String code = mapMessage.getString("code");
            System.out.println(tel+"**********"+code);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

第二步:配置spring 配置文件,注冊監聽器

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc" 
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:jpa="http://www.springframework.org/schema/data/jpa" 
    xmlns:task="http://www.springframework.org/schema/task"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="
                        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
                        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
                        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
                        http://www.springframework.org/schema/data/jpa 
                        http://www.springframework.org/schema/data/jpa/spring-jpa.xsd
                        http://activemq.apache.org/schema/core
                        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd
                        http://www.springframework.org/schema/jms
                        http://www.springframework.org/schema/jms/spring-jms.xsd">
                        
    <!-- 配置連接工廠對象:產生Connection  方式一:通過amq名稱空間創建連接工廠  方式二:可以通過bean標簽創建對象-->
    <!-- <amq:connectionFactory 
        id="connectionFactory" 
        userName="admin" password="admin" 
        brokerURL="tcp://localhost:61616">
    </amq:connectionFactory> -->
    
       <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
     <!-- 構造方法賦值 -->
<constructor-arg index="0" value="admin"></constructor-arg>

<constructor-arg index="1" value="admin"></constructor-arg>
 <constructor-arg index="2" value="tcp://localhost:61616"></constructor-arg> </bean> <!-- spring提供優化緩存session對象 --> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="connectionFactory"></property> <property name="sessionCacheSize" value="10"></property> </bean> <context:component-scan base-package="cn.itcast"></context:component-scan> <!-- 在監聽器容器中注冊監聽器對象 acknowledge:設置應答模式 auto自動應答 destination-type:隊列類型(queue,topic) connection-factory:注入連接工廠 jms:listener:節點注入監聽器對象 --> <jms:listener-container acknowledge="auto" destination-type="queue" connection-factory="cachingConnectionFactory"> <!-- destination:監聽哪個隊列 --> <jms:listener destination="test_spring" ref="consumerListener"/> </jms:listener-container> </beans>

測試代碼

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext.xml")
public class ConsumerTest {


    @Test
    public void test() {
    //保證spring工廠不關閉,web項目啟動,tomcat不停止,監聽器會自動監聽隊列中消息
while(true){ } } }

啟動生產者,消費者服務,消息生產者把將消息發送到服務器,將消息存放在隊列或主題中,消息服務器會將消息轉發給接受者,ActiveMQ的異步消息使得消息的發送與接受無必然聯系,只要將消息發出,消息發出端繼續執行代碼,無需等待消息消費端



 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM