RabbitMQ與Spring集成配置


1.引入相關jar包

//RabbitMQ

compile group: 'org.springframework.amqp', name: 'spring-rabbit', version: '1.6.6.RELEASE'
compile group: 'org.springframework.integration', name: 'spring-integration-amqp', version: '4.3.5.RELEAS

 

 

 

生產者配置

2.實現一個消息處理器,繼承自org.springframework.amqp.core.MessageListener

public class AmqpMsgListener implements MessageListener {

@Override
 public void onMessage(Message message)
{
System.out.println(message.toString());
   }
}

 

 

 

 

 

 

3.rabbit-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:p="http://www.springframework.org/schema/p" xmlns:int="http://www.springframework.org/schema/integration"
 xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration
  http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
  http://www.springframework.org/schema/integration/amqp
  http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd">
 <!--連接工廠-->
 <rabbit:connection-factory id="connectionFactory" host="{ip地址}" port="{端口}" username="{用戶名}" password="{密碼}" publisher-confirms="true"/>
 <!--創建隊列-->
 <rabbit:queue name="queue.test" durable="true" exclusive="false" auto-delete="false" />
 <!--創建分發交換器-->
 <rabbit:direct-exchange name="exchange.directTest" durable="true">
<rabbit:bindings>
<rabbit:binding key="foo.bar" queue="queue.test"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="exchange.directTest" routing-key="foo.bar" message-converter="jsonMessageConverter" confirm-callback=""/>
<rabbit:admin connection-factory="connectionFactory" id="adminId"/>
 <!-- 配置exchange,不同的exchange會影響消息分發策略 -->
<!-- 消息對象json轉換類 -->
 <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

</beans>

 

消費者配置

4.rabbit-customer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
 <!-- rabbitmq連接配置 -->
 <rabbit:connection-factory id="connectionFactory1" host="{ip地址}" port="{端口}" username="{用戶名}" password="{密碼}" publisher-confirms="true"/>

<rabbit:admin connection-factory="connectionFactory1"/>
 <!--按項目需求配置 -->
 <rabbit:listener-container connection-factory="connectionFactory1" acknowledge="auto">
<rabbit:listener ref="amqpMsgListener" queues="queue.test" />
</rabbit:listener-container>
<bean id="amqpMsgListener" class="com.nxin.farm.test.AmqpMsgListener"/>
</beans>

 

 

 

 

 

 

 

 

 

5.創建測試類

public class MqTest extends BaseJunit {
@Autowired
 private RabbitTemplate amqpTemplate;

 @Test
 public void testSend()
{
try {
for(int i=0;i<100;i++){
amqpTemplate.convertAndSend("nx.farm.exchange.directTest", "foo.bar", "Hello, world! send by xxxxx");
 }
Thread.sleep(1000*1000);
 } catch (AmqpException e) {
e.printStackTrace();
 } catch (InterruptedException e) {
e.printStackTrace();
 }
}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM