本文介紹RabbitMQ與Spring的簡單集成以及消息的發送和接收。
在RabbitMQ的Spring配置文件中,首先需要增加命名空間。
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
其次是模式文檔,這里按1.0的來。
xsi:schemaLocation="
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"
配置connection-factory元素。
<rabbit:connection-factory id="connectionFactory" username="mmq" password="mmq" host="192.168.1.138" port="5672" virtual-host="/vhost1" />
配置connection-factory元素實際是注冊一個org.springframework.amqp.rabbit.connection.CachingConnectionFactory實例。
參數介紹:
id:bean的id值。
host:RabbitMQ服務器地址。默認值"localhost"。
port:RabbitMQ服務端口,默認值"5672"。
virtual-host:虛擬主機,默認是"/"。
username和password就是訪問RabbitMQ服務的賬戶和密碼了。
channel-cache-size:channel的緩存數量。新版本默認是25。
消息隊列queue的配置。
<!--定義queue queueTest --> <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />
參數介紹:
name:queue的名字。
durable:是否為持久的。默認是true,RabbitMQ重啟后queue依然存在。
auto-delete:表示消息隊列沒有在使用時將被自動刪除。默認是false。
exclusive:表示該消息隊列是否只在當前connection生效。默認false。
交換器exchange的配置。
<rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange>
參數介紹:
name:exchange的名字。
durable:是否為持久的,默認為true,RabbitMQ重啟后exhange依然存在。
auto-delete:表示exchange在未被使用時是否自動刪除,默認是false。
key:queue在該direct-exchange中的key值。當消息發送給該direct-exchange中指定key為設置值時,消息將會轉發給queue參數指定的消息隊列。
Spring為方便使用RabbitMQ服務,提供一個操作模板類:org.springframework.amqp.rabbit.core.RabbitTemplate。
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="exchangeTest" />
配置也很簡單。template還有其他的配置項,可以自己查看xsd文件中的說明。
最后一個配置項是消息consumer。其實也可以叫做listener。簡單的配置如下。
<rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="queueTest" ref="messageConsumer"/> </rabbit:listener-container>
messageConsumer是一個簡單bean類,可以用注解標識。
@Component("messageConsumer") public class MessageConsumer implements MessageListener { public void onMessage(Message message) {} }
類作為消息監聽器,必須實現接口MessageListener或者是接口ChannelAwareMessageListener。
另一種配置方式是使用method參數,指定消息處理的方法,以org.springframework.amqp.core.Message類作為方法參數。
配置完成並寫好消息監聽處理類后就可以嘗試發送消息了。
public class MessageProducer{ @Resource private AmqpTemplate amqpTemplate; public void sendMessage(){ Message message = MessageBuilder.withBody("hello rabbit".getBytes("utf-8")) .setMessageId(System.currentTimeMillis()+"") .build(); this.amqpTemplate.send("queueTestKey", message); } }
消息監聽方法。
public void onMessage(Message message){ String content = new String(message.getBody(),"utf-8"); system.out.println(content); }
消息內容發送時會被轉換為字節數組,默認以UTF-8進行編碼。如果想要發送對象信息,按照類實例的序列化和反序列化進行操作即可。