消息隊列RabbitMQ與Spring集成


1.RabbitMQ簡介

RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。RabbitMQ是AMQP(高級消息隊列協議)的標准實現。
官網:http://www.rabbitmq.com/

 

2.maven配置

        <!--rabbit -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.6.RELEASE</version>
        </dependency>

 

3.配置文件

rabbitmq.properties

mq.host=172.17.22.187
mq.username=remote_user
mq.password=123456

 

4.Spring配置

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"  
    xsi:schemaLocation="  
        http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd  
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-3.1.xsd  
        http://www.springframework.org/schema/rabbit   
        http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">  

<context:property-placeholder location="classpath:rabbitmq.properties" />

    <!--配置connection-factory,指定連接rabbit server參數-->
    <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" /> 

    <!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成-->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定義queue-->
    <rabbit:queue id="com.mj.test" name="com.mj.test" durable="true" auto-delete="false" exclusive="false"/>

    <!-- 定義direct exchange,綁定com.mj.test queue -->
    <rabbit:direct-exchange name="myChange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="com.mj.test" key="hello"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    
    <!--定義rabbit template用於數據的接收和發送-->
    <rabbit:template id="myAmqpTemplate" connection-factory="connectionFactory" exchange="myChange" />
    
    <!-- 接受 -->
     <bean id="messageReceiver" class="com.ucs.mq.QueueListenter"></bean>
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="com.mj.test" ref="messageReceiver"/>
    </rabbit:listener-container>  
    
</beans>

 

5.發送消息Producer

package com.ucs.mq;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import net.sf.json.JSONSerializer;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.transaction.TransactionConfiguration;
import org.springframework.transaction.annotation.Transactional;

import com.ucs.base.BaseTest;

@Transactional
@RunWith(SpringJUnit4ClassRunner.class) 
@TransactionConfiguration(transactionManager="transactionManager",defaultRollback=false)
@ContextConfiguration(locations={"classpath:application-mq.xml"})
public class TestMQ {
    
    @Autowired
    private AmqpTemplate amqpTemplate;

    
    @Test
    public void send() throws Exception{
         
        List<String> submobileList=new ArrayList<String>();        
        submobileList.add("1");
        submobileList.add("2");
        submobileList.add("3");
        Map<String, Object> bodyMap = new HashMap<String, Object>();
        bodyMap.put("batchNo", "遞四方速遞");    
        bodyMap.put("item", submobileList);           
        String jsonStr=JSONSerializer.toJSON(bodyMap).toString();
        amqpTemplate.convertAndSend("hello", jsonStr);   
    }
}

6.異步接收消息Consumer

package com.ucs.mq;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;


public class QueueListenter implements MessageListener {


    @Override
    public void onMessage(Message msg) {
        try{      
            System.out.println(new String(msg.getBody(),"UTF-8"));
            String str=new String(msg.getBody(),"UTF-8");
            JSONObject json=JSONObject.fromObject(str);

            System.out.println(json.get("batchNo").toString());
            JSONArray jSONArray=JSONArray.fromObject(json.get("item"));
            System.out.println(jSONArray.toString());
        }catch(Exception e){
            e.printStackTrace();
        }
    }   
}

 

7.運行

{"batchNo":"遞四方速遞","item":["1","2","3"]}
遞四方速遞
["1","2","3"]

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM