轉載請注明出處
0.目錄
RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ
1.簡介
原計划這章應該講RabbitMQ的RPC調用的,后來想想,這個場景應該用的不多,現在比較火的微服務,要么用dubbo,要么用spring cloud,用RabbitMQ做RPC比較少見,所以就先跳過了,有需要再補充。
其實網上RabbitMQ和Spring集成的教程有不少,我也大致看了看,大部分都是言簡意賅,代碼配置一貼,然后就可以用了,而我希望我的教程能多和大家一起探討一些“為什么”。
2.Spring AMQP
Spring AMQP中有兩個單詞,Spring都知道,那AMQP是什么?
中文意思是,高級消息隊列協議,然后用蹩腳的英語猜一下,advance message queue protocol,差不多了,advance變成形容詞高級的-advanced,queue變成queuing(排隊論,學術一點),所以,AMQP就是Advanced Message Queuing Protocal。
AMQP 0-9-1 是RabbitMQ支持的協議之一,0-9-1是個版本號,正常情況下推薦使用,它所表現出的形式,就是前面幾張介紹的內容
RabbitMQ還支持其他版本的協議,具體可以參考這里
Spring AMQP的定義如下
The Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions. It provides a "template" as a high-level abstraction for sending and receiving messages. It also provides support for Message-driven POJOs with a "listener container". These libraries facilitate management of AMQP resources while promoting the use of dependency injection and declarative configuration
意外的發現谷歌翻譯的很通順
Spring AMQP項目將核心Spring概念應用於基於AMQP的消息傳遞解決方案的開發。 它提供了一個“模板”作為發送和接收消息的高級抽象。 它還通過“偵聽器容器”為消息驅動的POJO提供支持。 這些庫促進AMQP資源的管理,同時促進使用依賴注入和聲明性配置
3.main方法集成
先不用Spring,只在Main方法中集成Spring AMQP,方便分析。
在pom文件中加入如下配置,不需要再引入RabbitMQ的包,Spring AMQP已經包含了4.0.x的客戶端
Spring AMQP now uses the new 4.0.x version of the
amqp-clientlibrary provided by the RabbitMQ team. This client has auto recovery configured by default
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.1.RELEASE</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.1</version> </dependency>
新建一個類,
1 package com.liyang.ticktock.rabbitmq; 2 3 import org.springframework.amqp.core.BindingBuilder; 4 import org.springframework.amqp.core.Queue; 5 import org.springframework.amqp.core.TopicExchange; 6 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 7 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 8 import org.springframework.amqp.rabbit.core.RabbitAdmin; 9 import org.springframework.amqp.rabbit.core.RabbitTemplate; 10 import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; 11 import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; 12 13 14 public class App 15 { 16 public static void main( String[] args ) throws InterruptedException 17 { 18 //獲取一個連接工廠,用戶默認是guest/guest(只能使用部署在本機的RabbitMQ) 19 //是Spring實現的對com.rabbitmq.client.Connection的包裝 20 ConnectionFactory cf = new CachingConnectionFactory("localhost"); 21 22 //對AMQP 0-9-1的實現 23 RabbitAdmin admin = new RabbitAdmin(cf); 24 //聲明一個隊列 25 Queue queue = new Queue("myQueue"); 26 admin.declareQueue(queue); 27 //聲明一個exchange 28 TopicExchange exchange = new TopicExchange("myExchange"); 29 admin.declareExchange(exchange); 30 //綁定隊列到exchange,加上routingKey foo.* 31 admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*")); 32 33 //監聽容器 34 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf); 35 //監聽者對象 36 Object listener = new Object() { 37 @SuppressWarnings("unused") 38 public void handleMessage(String foo) { 39 System.out.println(foo); 40 } 41 }; 42 //通過這個適配器代理listener 43 MessageListenerAdapter adapter = new MessageListenerAdapter(listener); 44 //把適配器(listener)設置給Container 45 container.setMessageListener(adapter); 46 //設置該容器監聽的隊列名,可以傳多個,public void setQueueNames(String... queueName) { 47 container.setQueueNames("myQueue"); 48 //開始監聽 49 container.start(); 50 51 //發送模版,設置上連接工廠 52 RabbitTemplate template = new RabbitTemplate(cf); 53 //發送消息 54 template.convertAndSend("myExchange", "foo.bar", "Hello, world!"); 55 56 Thread.sleep(1000); 57 container.stop(); 58 } 59 }
運行結果如下:

以上代碼中
ConnectionFactory包裝了所有物理連接信息,然后傳遞給RabbitAdmin創建了RabbitMQ支持協議的連接(AMQP 0-9-1);
聲明隊列和交換中心后,通過BindingBuilder把隊列的綁定關系聲明到admin上;
創建一個消息處理類,用一個適配器(MessageListenerAdapter)包裝它,並注冊到監聽容器中,啟動監聽;
最后通過連接信息創建一個Rabbit模板,調用發送方法。
這里引申出一個問題,為什么要用監聽容器(SimpleMessageListenerContainer),我們點開它的outline,如下,可以看到,它是對監聽這個動作的抽象,一個容器可以有多個Consumer,並且可以控制如超時時間等配置。

可以看出,比起直接使用RabbitMQ客戶端,以上代碼已經簡化了一部分,最明顯的部分就是,不需要手動去關Channel、Connection了,對以上概念的介紹都穿插在前幾章,這里不再贅述。
這些代碼用Spring的方式注入將會更加簡潔
4.傳統Spring方式集成
首先,POM文件中要把Spring引進來
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.7.RELEASE</version> </dependency>
然后增加applicatioContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd "> <context:annotation-config/> <context:component-scan base-package="com.liyang.ticktock.rabbitmq.listener"/> <!-- 配置ConnectionFactory --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" username="guest" password="guest" port="5672" /> <!-- 等同new一個RabbitAdmin --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 聲明一個隊列 --> <rabbit:queue name="myQueue" /> <!-- 聲明一個topic類型的exchange,並把上面聲明的隊列綁定在上面,routingKey="foo.*" --> <rabbit:topic-exchange name="myExchange"> <rabbit:bindings> <rabbit:binding queue="myQueue" pattern="foo.*" /> <!-- 這里還可以繼續綁定其他隊列 --> </rabbit:bindings> </rabbit:topic-exchange> <!-- 聲明一個rabbitTemplate,指定連接信息,發送消息到myExchange上,routingKey在程序中設置,此處的配置在程序中可以用set修改 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="myExchange"/> <!-- 配置監聽容器,指定消息處理類,處理方法,還可以配置自動確認等--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="myListener" method="listen" queue-names="myQueue" /> <!-- 可以繼續注冊監聽 --> </rabbit:listener-container> </beans>
這個配置文件中,我們使用了context的自動掃描裝配,不用配bean了,還使用了rabbit的命名空間 xmlns:rabbit
這個東西可能有些童鞋不知道怎么配的,其實沒什么神秘的,我們打開根路徑看看里面有什么“http://www.springframework.org/schema/”

和本地目錄一樣,就是個存東西的地方,rabbit命名空間,在這個路徑后面加上rabbit對應的目錄就行了
再進入rabbit目錄,發現里面是一些不同版本的xsd文件,所以,在schemaLocation中,配上使用的具體版本的xsd就可以了

言歸正傳,上面的配置文件,把大量原來在代碼中實現的東西挪到了xml中,一目了然,方便修改。注意注釋加粗的三處,現在我們可以用Spring實現我們原來用原生RabbitMQ客戶端實現的所有功能了
在發送消息之前,還需要寫一個類用來處理消息,並且這個類要有linsten方法,和配置文件中聲明的 method="listen" 對應
1 package com.liyang.ticktock.rabbitmq.listener; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 import org.springframework.stereotype.Component; 6 7 @Component 8 public class MyListener { 9 Logger logger = LoggerFactory.getLogger(MyListener.class); 10 11 public void listen(String message){ 12 logger.debug("received:"+message); 13 } 14 15 }
剩下在java中的代碼就非常簡潔了,監聽在Spring環境啟動后就自動開始了,我們只需要發消息就行,代碼如下
直接一個main方法,省的還要捉一只湯姆貓
1 public static void main(String[] args) throws InterruptedException { 2 //啟動Spring環境 3 AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml"); 4 //假裝是Autowired的 5 RabbitTemplate template = ctx.getBean(RabbitTemplate.class); 6 //設置routingKey 7 template.setRoutingKey("foo.bar"); 8 //發送,exchange,routingKey什么的都配好了 9 template.convertAndSend("Hello, world!"); 10 11 //關掉環境 12 Thread.sleep(1000); 13 ctx.destroy(); 14 }
運行結果如下:

5.采用Spring Boot集成
不是很重要,先欠着,有空補上
6.結束語
最近實在太忙,這篇博客利用工作的碎片時間,寫了有一個多星期,囧。
在寫這篇系列教程的同時,我也獲益良多,后面會繼續為大家介紹一些RabbitMQ實用的高級特性,請多多支持
