JMS學習之路(一):整合activeMQ到SpringMVC


 

JMS的全稱是Java Message Service,即Java消息服務。它主要用於在生產者和消費者之間進行消息傳遞,生產者負責產生消息,而消費者負責接收消息。把它應用到實際的業務需求中的話我們可以在特定的時候利用生產者生成一消息,並進行發送,對應的消費者在接收到對應的消息后去完成對應的業務邏輯。對於消息的傳遞有兩種類型,一種是點對點的,即一個生產者和一個消費者一一對應;另一種是發布/訂閱模式,即一個生產者產生消息並進行發送后,可以由多個消費者進行接收。

 

整合activeMQ到springmvc項目也很簡單

只需要增加如下的maven依賴即可,初學者請直接添加all這個jar,否則如果jar包沖突會影響信心的

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.13.2</version>
        </dependency>
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-jms</artifactId>  
            <version>${spring.version}</version>  
        </dependency>

 

開發步驟:

① 添加好maven依賴后,請到 http://activemq.apache.org/ 下載最新版的activeMQ, 我這里下載的是5.13.2,下載解壓后執行bin中的activemq.bat進行啟動

② 理解JMS工作原理

  1.首先配置鏈接信息

         和操作數據庫一樣,我們要根據數據庫地址和鏈接信息,來配置datasource一樣,activemq類同,底層首先需要由activemq廠商提供的驅動,根據具體地址,封裝一個ConnectionFactory, 這是最基本的配置, 同時由於是由Spring進行統一管理, 所以需要將ConnectionFactory注入到sping。jms中的ConnectionFactory。這樣鏈接就配置好了。

<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>

     同理 spring.jms 提供了 SingleConnectionFactory和CachingConnectionFactory,這里我們使用的SingleConnectionFactory,根據意思也知道得八九不離十了,

SingleConnectionFactory就是每次請求都返回同一個鏈接,從啟動開始就一直打開,不會關閉。 而CachingConnectionFactory就和數據庫的鏈接池一樣,可以緩存很多信息,在用戶很多的情況下,同等情況下會增加效率。

     同時activeMQ也提供了PooledConnectionFactory,這樣也可以緩存很多信息,減少資源的使用,配置如下

<!-- 使用pool進行鏈接 -->
    <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
<!--     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> -->
<!--         <property name="brokerURL" value="tcp://localhost:61616"/> -->
<!--     </bean> -->
    
<!--     <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> -->
<!--         <property name="connectionFactory" ref="targetConnectionFactory"/> -->
<!--         <property name="maxConnections" value="10"/> -->
<!--     </bean> -->
    
<!--     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> -->
<!--         <property name="targetConnectionFactory" ref="pooledConnectionFactory"/> -->
<!--     </bean> -->

      2。配置好ConnectionFactory之后我們就需要配置生產者。生產者負責產生消息並發送到JMS服務器,這通常對應的是我們的一個業務邏輯服務實現類。但是我們的服務實現類是怎么進行消息的發送的呢?這通常是利用Spring為我們提供的JmsTemplate類來實現的,所以配置生產者其實最核心的就是配置進行消息發送的JmsTemplate。對於消息發送者而言,它在發送消息的時候要知道自己該往哪里發,為此,我們在定義JmsTemplate的時候需要往里面注入一個Spring提供的ConnectionFactory對象。

<!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>

      在發送消息的時候,除了知道activemq的地址外,還需要讓發送者知道具體發給誰?

      activemq提供了兩種模式,一個是點對點的 ,一個是發送給多個訂閱者這種訂閱模式

    <!--這個是隊列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>queue</value>
        </constructor-arg>
    </bean>
    
    <!--這個是主題目的地,一對多的-->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic"/>
    </bean>

 

這樣發送者相關配置就已經完成了。

③ 發送者發送消息的服務類: 很簡單,調用上面配置好的jmsTemplate。send方法就好了

package xiaochangwei.zicp.net.jms;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
 

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
 
@Service
public class ProducerServiceImpl implements ProducerService {
 
    @Resource
    private JmsTemplate jmsTemplate;
    
    public void sendMessage(Destination destination, final String message) {
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    } 
}

 

至此發送者相關的都已經全部完成了,下面配置接受者

④  接收者配置,接收者和發送者的鏈接信息都是一樣的,不然收不到信息是不, 除了鏈接信息,要知道什么時候有消息發送過來,接收者這邊就要實現一個消息監聽器,    

     當監聽到消息后,進行相應的業務處理,每個目的地都有一個MessageListenerContainer,配置MessageListenerContainer需要鏈接信息,目的地信息,和接受者的消息監聽器

<!-- 消息監聽器 -->
    <bean id="consumerMessageListener"
        class="xiaochangwei.zicp.net.jms.ConsumerMessageListener" />
    <!-- 消息監聽容器 -->
    <bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
    </bean>

 

 

這里我們進行一個業務模擬

A系統通過activemq發送命令串給B系統, B系統收到命令后,根據約定的規則進行解析,並進行業務處理

 

A系統調用ProducerServiceImpl中的 sendMessage 進行消息發送,為減少傳遞量,均采用json發送,代碼如下:

package xiaochangwei.zicp.net.web.controller;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import javax.jms.Destination;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import xiaochangwei.zicp.net.common.tools.JsonUtil;
import xiaochangwei.zicp.net.entity.TestEntity;
import xiaochangwei.zicp.net.jms.ProducerService;

@Controller
@RequestMapping("jms")
public class JmsTestController {

    @Autowired
    private ProducerService producerService;
    @Autowired
    @Qualifier("queueDestination")
    private Destination destination;

    @RequestMapping("test")
    public @ResponseBody String testSend() throws Exception {
        
        //系統業務需要, 需要更新用戶表中信息,根據id更新name
        List<TestEntity> list = new LinkedList<TestEntity>();
        
        TestEntity en = new TestEntity();
        en.setId(100);
        en.setName("name1");
        list.add(en);
        
        TestEntity en2 = new TestEntity();
        en2.setId(1002);
        en2.setName("name2");
        list.add(en2);
        
        Map<String,Object> mapEntity = new HashMap<String, Object>();
        mapEntity.put("user", list);
        
        Map<String,Object> map = new HashMap<String, Object>();
        map.put("update", mapEntity);
        
        System.out.println("發送方發送內容為:" + JsonUtil.object2String(map));
        //發送更新數據請求 
        producerService.sendMessage(destination, JsonUtil.object2String(map));
        
        return "jms exute complete";
    }
}

 

接收者根據約定,知道收到的消息都是json字符串,所以直接按text處理

package xiaochangwei.zicp.net.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ConsumerMessageListener implements MessageListener {
 
    public void onMessage(Message message) {
        TextMessage textMsg = (TextMessage) message;
        try {
            System.out.println("接收者受到消息:" + textMsg.getText());
            System.out.println("開始進行解析並調用service執行....");
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
 
}

 

這里沒有具體的去解析並執行具體操作,只是打印出來而已,只確認發送方發送的內容和接受方接收到的內容一致即可。

可以看到發送方和接收方內容一直

解釋下這段json想表達的意思: 更新(update)用戶信息(user),把id=100的用戶 name更新為name1 ........

 

整合就講到這里,后續會講到消息監聽器,轉換器等進階知識,感興趣的朋友可以持續關注


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM