Spring整合activeMQ消息隊列


1.配置JMS

  <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 -->  
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->  
        <property name="connectionFactory" ref="connectionFactory"/>  
    </bean>    
    <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->  
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://localhost:61616"/>  
    </bean>  
       
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
    </bean>  

發送信息到activeMQ

@Override
    public void addNotifyCashToMq(final String notifyUrl, final String cashId, final String reqSn, final String callResult,int count) {
        //發送的參數final String callBackUrl = SuperAppConstant.TRANSACTION_CALLBACK_PREFIX_URL + notify_url_notifyCash
                    + notifyUrl + "&cashId=" + cashId + "&reqSn=" + reqSn + "&callResult=" + callResult + "&count="
                    + _count;
        //發送消息到queue_notifuCash_serial消息隊列 jmsTemplate.send(queue_notifyCash_serial,
new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { if (logger.isDebugEnabled()) { logger.debug("notifyUrl=" + notifyUrl + ",cashId=" + cashId + ",reqSn=" + reqSn + ",callResult=" + callResult + ",_count=" + _count); } HashMap map = new HashMap(); map.put("callBackUrl", callBackUrl); ObjectMessage objectMessage = session.createObjectMessage();//創建消息 objectMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);//延時,delay為延時時長,以毫秒為單位
return objectMessage; } }); }

xml配置信息

    <!-- ActiveMQ 連接工廠 -->
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${jms.broker_url}" />
    </bean>

    <!-- Spring Caching 連接工廠 -->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="connectionFactory" />
        <property name="sessionCacheSize" value="10" />
    </bean>

    <!-- Spring JMS Template -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory" />
    </bean>

2.destination消息隊列定義

<description>Queue定義</description>
    <bean id="queue_callback_serial" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>queue_callback_serial</value>
        </constructor-arg>
    </bean>

3。監聽器BatchJob

3.1 jms.xml

<description>JMS簡單應用配置</description>

    <!-- ActiveMQ 連接工廠 -->
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${jms.broker_url}" />
    </bean>

    <!-- Spring Caching 連接工廠 -->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="connectionFactory" />
        <property name="sessionCacheSize" value="10" />
    </bean>

    <!-- Queue定義 -->
    <bean id="orderQueueProducer" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="order.queue.producer" />
    </bean>
    <!-- Spring JMS Template -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory" />
        <property name="defaultDestination" ref="orderQueueProducer" />
    </bean>

    <!-- 使用Spring JmsTemplate的消息生產者 -->
    <bean id="orderProducerJmsService" class="com.gmall88.server.jms.order.impl.OrderProducerJmsServiceImpl">
        <property name="jmsTemplate" ref="jmsTemplate" />
    </bean>
    
    <!-- 定義消息隊列 -->
    <bean id="orderQueueListener" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>order.queue.listener</value>
        </constructor-arg>
    </bean>

3.2 監聽器impl

import java.util.Map;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.gmall88.server.wxpay.RF;

import net.sf.json.JSONObject;

public class NotifyCashManagerImpl implements MessageListener {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(Message message) {
        if(logger.isDebugEnabled()){
            logger.debug("new callback start..");
        }
        if(message !=null){
            if(message instanceof ObjectMessage){
                ObjectMessage objectMessage = (ObjectMessage) message;//監聽消息 try {
                    Map param  = (Map)objectMessage.getObject();
                    String callBackUrl = (String)param.get("callBackUrl");//取出消息里的參數 if (logger.isInfoEnabled()) {
                        logger.info("callBackUrl=" + callBackUrl);
                    }
                    JSONObject jsonObject = RF.httpsRequestJson(callBackUrl, "POST", "");//通過http回調方法 if(jsonObject != null){
                        logger.info("code:"+jsonObject.getString("code"));
                        logger.info("message="+jsonObject.getString("message"));
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(),e);
                }
            }else{
                logger.error("Unknown message, type=" + message.getClass().getName());
            }
        }else{
            logger.error("message is null");
        }
    }

}

回調方法:

@RequestMapping(value = "/notifyCash", method = RequestMethod.POST)
    @ResponseBody
    public Object notifyCash(String notifyUrl, String cashId, String reqSn, String cashResult,int count) {
        ReturnResult returnResult = new ReturnResult();
        String clientId = "superApp_notifyOrder";
        try {
            clientId += cashId;
            returnResult = recordRequestCheck(clientId);
            if(returnResult != null){
                return returnResult;
            }
            returnResult = new ReturnResult();
            
            try{
                    // 回調業務系統
                    try {
                        superAppServerManager.notifyCash(notifyUrl, cashId, reqSn, cashResult);
                    } catch (Exception e) {
                        // 回調失敗,做延時回調
                        logger.error(e.getMessage(), e);
                        superAppServerManager.addNotifyCashToMq(notifyUrl, cashId, reqSn, cashResult, count);
                }
            }finally{
                recordRequestEnd(clientId);
            }
            
        } catch (GmallException e) {
            returnResult.setCodeNum(e.getCode());
            returnResult.setMessage(e.getMessage());
        }  catch (Exception e) {
            logger.error(e.getMessage(), e);
            returnResult.setCode(ReturnCodeType.FAILURE)
                    .setMessage(e.getMessage());
        }
        logger.info("called..");
        return returnResult;
    }

整理了一下整個流程如圖所示:

 4.ActiveMq持久化(這里只考慮持久化為MySQL方式)

把mysql的驅動方法ActiveMQ的lib文件下,如:mysql-connector-java-5.0.4-bin.jar

配置文件添加:

<persistenceAdapter>
       <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
    </persistenceAdapter>

ActiveMq連接數據庫相關配置

 <bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM