最近使用了spring整合rabbitMQ技術,對兩者都有了更深的理解,之前一直沒怎么用過,這次使用中間出了好多的小的細節問題,現在整理出實現過程。
安裝rabbitMQ的步驟以及如何使用百度就會百度到,就不詳細說明了
1、首先配置web.xml文件
<!-- 加載spring容器 -->
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath*:applicationContext.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<servlet>
<servlet-name>springmvc</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<!-- contextConfigLocation配置springmvc加載的配置文件(配置處理器映射器、適配器等等) 如果不配置contextConfigLocation,默認加載的是/WEB-INF/servlet名稱-serlvet.xml(springmvc-servlet.xml) -->
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath*:springmvc.xml</param-value>
</init-param>
</servlet>
<servlet-mapping>
<servlet-name>springmvc</servlet-name>
<url-pattern>/*</url-pattern>
</servlet-mapping>
2、配置applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd">
<import resource="classpath*:rabbitMq.xml" />
<import resource="classpath*:rabbitMq-consumer.xml" />
<import resource="classpath*:applicationContext-service.xml" />
<!--默認注解映射的支持 -->
<mvc:annotation-driven />
<context:component-scan base-package="com.vci.mq.controller" />
<bean id="msgQueueUtil" class="com.vci.mq.util.MsgQueueUtil"></bean>
</beans>
3、配置rabbitMq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">
<!-- <context:property-placeholder location="classpath:db.properties" /> -->
<!--配置connection-factory,指定連接rabbit server參數 -->
<rabbit:connection-factory id="connectionFactory"
host="localhost" username="testuser" password="testuser"
port="5672" />
<bean id="confirmCallBackListener" class="com.vci.mq.listener.ConfirmCallBackListener" />
<bean id="returnCallBackListener" class="com.vci.mq.listener.ReturnCallBackListener" />
<!--定義rabbit template用於數據的接收和發送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<!-- confirm-callback="confirmCallBackListener" -->
<!-- return-callback="returnCallBackListener" -->
<!-- mandatory="true" -->
<!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- 設置Ack模式為手動 -->
<bean id="ackManual" class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">
<property name="staticField" value="org.springframework.amqp.core.AcknowledgeMode.MANUAL" />
</bean>
<!-- 將類自動注入,可解析msg信息 -->
<bean id="msgConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" />
<!-- 異常處理,記錄異常信息 -->
<bean id="mqErrorHandler" class="com.vci.mq.util.MQErrorHandler"/>
</beans>
4、配置rabbitMq-consumer.xml,此文件中配置消費者,exchange,queue
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">
<!--定義queue
durable:是否持久化
exclusive: 僅創建者可以使用的私有隊列,斷開后自動刪除
auto_delete: 當所有消費客戶端連接斷開后,是否自動刪除隊列 -->
<rabbit:queue id="queue001" name="queue001" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue id="001" name="001" durable="true" auto-delete="false" exclusive="false" />
<!-- 定義direct exchange,綁定queueTest
rabbit:direct-exchange:定義exchange模式為direct,意思就是消息與一個特定的路由鍵完全匹配,才會轉發。
rabbit:binding:設置消息queue匹配的key
direct:轉發消息到 routigKey 指定的隊列。
topic:按規則轉發消息(最靈活)。
headers:(這個還沒有接觸到)
fanout:轉發消息到所有綁定隊列
-->
<rabbit:direct-exchange name="exchange001" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queue001" key="queue001"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消費者 -->
<bean id="channelAware" class="com.vci.mq.consumer.impl.ChannelAwareImpl"></bean>
<!-- <bean id="myMsgConverter" class="com.vci.mq.util.MQRepublishMessageRecoverer" /> -->
<!-- 適配器 -->
<bean id="channelAwareAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="channelAware" />
<property name="defaultListenerMethod" value="handleTxMsg"></property>
<property name="messageConverter" ref="msgConverter"></property>
</bean>
<!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象-->
<rabbit:listener-container connection-factory="connectionFactory" >
<!-- queues 監聽隊列,多個用逗號分隔
ref 監聽器 -->
<rabbit:listener queues="001" ref="channelAware"/>
</rabbit:listener-container>
<!-- 創建SimpleMessageListenerContainer的理想通道,主要實現異常事件處理邏輯 -->
<!-- <bean id="retryOperationsInterceptorFactoryBean" -->
<!-- class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean"> -->
<!-- <property name="messageRecoverer"> -->
<!-- <bean class="com.vci.mq.util.MQRepublishMessageRecoverer"/> -->
<!-- </property> -->
<!-- <property name="retryOperations"> -->
<!-- <bean class="org.springframework.retry.support.RetryTemplate"> -->
<!-- <property name="backOffPolicy"> -->
<!-- <bean -->
<!-- class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> -->
<!-- <property name="initialInterval" value="500" /> -->
<!-- <property name="multiplier" value="10.0" /> -->
<!-- <property name="maxInterval" value="10000" /> -->
<!-- </bean> -->
<!-- </property> -->
<!-- </bean> -->
<!-- </property> -->
<!-- </bean> -->
<!-- 另一種監聽配置 -->
<!-- <bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> -->
<!-- <property name="connectionFactory" ref="connectionFactory" /> -->
<!-- <property name="acknowledgeMode" ref="ackManual" /> -->
<!-- <property name="queueNames" value="queue001, 001" /> -->
<!-- <property name="messageListener"> -->
<!-- <bean class="com.vci.mq.consumer.impl.ChannelAwareImpl" /> -->
<!-- </property> -->
<!-- <property name="concurrentConsumers" value="${rabbitmq.concurrentConsumers}" />
<property name="adviceChain" ref="retryOperationsInterceptorFactoryBean" />-->
<!-- <property name="errorHandler" ref="mqErrorHandler" /> -->
<!-- </bean> -->
</beans>
5、添加測試類TestController.java,發送信息測試
package com.vci.mq.controller;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import com.vci.mq.util.DomUtil;
import com.vci.mq.util.MsgQueueUtil;
@Controller
@RequestMapping("/test")
public class TestController {
@Autowired
MsgQueueUtil msgQueueUtil;
@RequestMapping(value="/sendMsg",method={RequestMethod.GET,RequestMethod.POST})
public @ResponseBody String testQueue() {
try {
Map<String, Object> map = new HashMap<String, Object>();
map.put("target", "001");
map.put("cmd", "setTestInitHis");
map.put("content", "receiveInitHis");
msgQueueUtil.sendMessageByQueue("001", map);
// msgQueueUtil.sendMessageByQueue("queue001", map);
} catch (Exception e) {
e.printStackTrace();
}
return "發送完畢";
}
}
6、MsgQueueUtil.java,此文件中發送消息
package com.vci.mq.util;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class MsgQueueUtil {
// private static MsgQueueUtil util= null;
//
// private MsgQueueUtil() {
//
// }
// public static MsgQueueUtil getInstance() {
// if (util == null) {
// util = new MsgQueueUtil();
// }
// return util;
// }
private Logger logger = LoggerFactory.getLogger(MsgQueueUtil.class);
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMessageByExchange(String exchange_key, String queue_key, Object message) {
logger.info("根據exchangeKey和queueKey發送消息", message);
amqpTemplate.convertAndSend(exchange_key, queue_key, message);
}
public void sendMessageByQueue(String queue_key, Object message) {
logger.info("根據queueKey發送消息", message);
amqpTemplate.convertAndSend(queue_key, message);
}
public void publishMsg(String target, String cmd, String content) {
Map<String, Object> map = new HashMap<String, Object>();
map.put("target", target);
map.put("cmd", cmd);
map.put("content", content);
sendMessageByQueue(target, map);
}
}
7、ChannelAwareImpl.java,此文件中接收消息,主要通過實現ChannelAwareMessageListener 接口,在onmessage方法中獲取到發送的消息。
package com.vci.mq.consumer.impl;
import java.lang.reflect.Method;
import java.util.Map;
import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import com.rabbitmq.client.Channel;
import com.vci.mq.util.DomUtil;
import com.vci.mq.util.SpringConfigTool;
public class ChannelAwareImpl implements ChannelAwareMessageListener {
private static final Logger logger = Logger.getLogger(ChannelAwareImpl.class);
@Autowired
private MessageConverter msgConverter;
// @Autowired
// private MQRepublishMessageRecoverer myMsgConverter;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//確認消息,已經收到
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
System.out.println(message);
Object obj = null;
try {
//轉換消息類型為Object
obj = msgConverter.fromMessage(message);
} catch (MessageConversionException e) {
logger.error("convert MQ message error.", e);
} finally {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// if (deliveryTag != App.DELIVERIED_TAG) {
// channel.basicAck(deliveryTag, false);
// message.getMessageProperties().setDeliveryTag(App.DELIVERIED_TAG);
// logger.info("revice and ack msg: " + (obj == null ? message : new String((byte[]) obj)));
// }
}
if (obj == null) {
return;
}
Map map = (Map) obj;
//通過xml獲取接收方服務名和接收方服務方法
DomUtil dutil = new DomUtil();
dutil.parserXml("D:/workspace2017/VciProject/config/service.xml");
Map<String, String> receiveMap = dutil.getElementByNode("id", (String) map.get("cmd"));
//接收方服務名 通過自定義service.xml中設置的server-name的值
String serviceName = receiveMap.get("server-name");
//接收方服務方法 通過自定義service.xml中設置的method的值
String serviceMethod = receiveMap.get("method");
//獲取接收方服務名對象
Object testObj = SpringConfigTool.init().getBean(serviceName);
//通過反射機制調用接收方服務方法
Class cls = testObj.getClass();
Class [] parameters = {String.class, String.class, String.class};
Object[] params = {map.get("target"), map.get("cmd"), map.get("content")};
Method method = cls.getMethod(serviceMethod, parameters);
//通過反射機制調用接收方服務方法
Object res = method.invoke(testObj, params);
// Method method1 = cls.getMethod(serviceMethod, String.class, String.class, String.class);
// Object res1 = method.invoke(testObj, map.get("target"), map.get("cmd"), map.get("content"));
System.out.println("接收方服務方法返回值:" + res);
}
}