1.RabbitMQ簡介
RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。RabbitMQ是AMQP(高級消息隊列協議)的標准實現。
官網:http://www.rabbitmq.com/
2.Spring集成RabbitMQ
2.1pom.xml文件:
<!--rabbitmq依賴 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.1.RELEASE</version> </dependency> <!-- 如果沒有這段,上面也會將其拉進來 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>1.7.1.RELEASE</version> </dependency> <!-- 這個必要的 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.6</version> </dependency>
2.2 spring-mq.properties
rmq.host = 192.168.XX.XX //rabbitmq服務器ip地址 rmq.port = 5672 //端口 rmq.producer.num = 20 //發消息生產者的最大數,沒有這個需求可以不寫 rmq.user = admin //用戶名 rmq.password = admin //密碼
2.3 spring-mq.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">
<rabbit:connection-factory id="connectionFactory"
host="${rmq.host}" username="${rmq.user}" password="${rmq.password}"
port="${rmq.port}" />
<!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 -->
<rabbit:admin connection-factory="connectionFactory" />
<!--隊列 -->
<!-- 說明: durable:是否持久化 exclusive: 僅創建者可以使用的私有隊列,斷開后自動刪除 auto_delete: 當所有消費客戶端連接斷開后,是否自動刪除隊列 -->
<rabbit:queue name="my_first_queue" auto-declare="true" durable="true" />
<!-- 任務下發交換機 -->
<!-- 說明: rabbit:direct-exchange:定義exchange模式為direct,意思就是消息與一個特定的路由鍵完全匹配,才會轉發。
rabbit:binding:設置消息queue匹配的key -->
<rabbit:direct-exchange name="mq-exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding key="my_first_queue" queue="my_first_queue" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息轉換器聲明 消息對象json轉換類 -->
<bean id="jsonMessageConverter"
class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!-- 消費者 -->
<bean id="myCustomer" class="com.ln.mq.Customer">
<!-- 消費者方法要有相應的set方法 -->
<property name="converter" ref="jsonMessageConverter" />
</bean>
<!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 -->
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener queues="my_first_queue" ref="myCustomer" />
</rabbit:listener-container>
<!-- spring template聲明 -->
<rabbit:template id="amqpTemplate" exchange="mq-exchange" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
</beans>
說明:
<rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" />
durable:是否持久化
exclusive: 僅創建者可以使用的私有隊列,斷開后自動刪除
auto_delete: 當所有消費客戶端連接斷開后,是否自動刪除隊列
<rabbit:direct-exchange name="test-mq-exchange" durable="true" auto-delete="false" id="test-mq-exchange">
<rabbit:bindings>
<rabbit:binding queue="test_queue_key" key="test_queue_key"/>
</rabbit:bindings>
</rabbit:direct-exchange>
rabbit:direct-exchange:定義exchange模式為direct,意思就是消息與一個特定的路由鍵完全匹配,才會轉發。
rabbit:binding:設置消息queue匹配的key
2.4 web.xml
<!-- 加載spring容器 --> <context-param> <param-name>contextConfigLocation</param-name> <param-value> classpath*:applicationContext.xml classpath*:spring-mq.xml </param-value> </context-param>
3.測試
3.1 生產者測試類
package com.ln.mq;
import javax.annotation.Resource;
import org.junit.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import com.ln.web.controller.TestBase;
/**
* 生產者
*
*/
public class Producer extends TestBase{
private static final String MY_FITST_QUEUE="my_first_queue";
@Resource
private AmqpTemplate amqpTemplate;
@Test
public void sendMessage(){
System.out.println("*******生產者********");
String message="hello my first queue";
amqpTemplate.convertAndSend(MY_FITST_QUEUE, message);
}
}
3.2 消費者測試類
package com.ln.mq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import com.rabbitmq.client.Channel;
/**
* 消費者
*
*/
public class Customer implements ChannelAwareMessageListener{
protected MessageConverter converter;
public void setConverter(MessageConverter converter) {
this.converter = converter;
}
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Object fromMessage = converter.fromMessage(message);
System.out.println("***********消費者********");
System.out.println("***********接收到的Message:"+fromMessage.toString());
}
}
3.3 運行
運行生產者Producer測試junit
*******生產者********
*******消費者********
*******接收到的Message:hello my first queue
3.4 rabbitmq視圖
可以查看隊列消息情況
地址:http://localhost:15672
我的是本地的服務,如果要鏈接遠程服務,localhost換成服務器ip。
