1,首先引入配置文件org.springframework.amqp,如下:
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.1.RELEASE</version> </dependency>
2,准備工作:安裝好rabbitmq,並在項目中增加配置文件 rabbit.properties 內容如下:
rmq.ip=192.188.113.114
rmq.port=5672
rmq.producer.num=20
rmq.manager.user=admin
rmq.manager.password=admin
3,rabbitmq屬性介紹:
概念解釋:
Brocker:消息隊列服務器實體。
Exchange:消息交換機,指定消息按什么規則,路由到哪個隊列。
Queue:消息隊列,每個消息都會被投入到一個或者多個隊列里。
Binding:綁定,它的作用是把exchange和queue按照路由規則binding起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
Virtual Host: 虛擬主機,一個broker里可以開設多個vhost,用作不用用戶的權限分離。每個virtual host本質上都是一個RabbitMQ Server(但是一個server中可以有多個virtual host),擁有它自己若干的個Exchange、Queue和bings rule等等。其實這是一個虛擬概念,類似於權限控制組。Virtual Host是權限控制的最小粒度。
Producer:消息生產者,就是投遞消息的程序。
Consumer:消息消費者,就是接受消息的程序。
Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。接下來的實踐案例中我們就可以看到,producer和consumer與exchange的通信的前提是先建立TCP連接。僅僅創建了TCP連接,producer和consumer與exchange還是不能通信的。我們還需要為每一個Connection創建Channel。
Channel: 它是建立在上述TCP連接之上的虛擬連接。數據傳輸都是在Channel中進行的。AMQP協議規定只有通過Channel才能執行AMQP的命令。一個Connection可以包含多個Channel。有人要問了,為什么要使用Channel呢,直接用TCP連接不就好了么?對於一個消息服務器來說,它的任務是處理海量的消息,當有很多線程需要從RabbitMQ中消費消息或者生產消息,那么必須建立很多個connection,也就是許多個TCP連接。然而對於操作系統而言,建立和關閉TCP連接是非常昂貴的開銷,而且TCP的連接數也有限制,頻繁的建立關閉TCP連接對於系統的性能有很大的影響,如果遇到高峰,性能瓶頸也隨之顯現。RabbitMQ采用類似NIO的做法,選擇TCP連接服用,不僅可以減少性能開銷,同時也便於管理。在TCP連接中建立Channel是沒有上述代價的,可以復用TCP連接。對於Producer或者Consumer來說,可以並發的使用多個Channel進行Publish或者Receive。有實驗表明,在Channel中,1秒可以Publish10K的數據包。對於普通的Consumer或者Producer來說,這已經足夠了。除非有非常大的流量時,一個connection可能會產生性能瓶頸,此時就需要開辟多個connection。
消息隊列的使用過程大概如下:
消息接收
客戶端連接到消息隊列服務器,打開一個channel。
客戶端聲明一個exchange,並設置相關屬性。
客戶端聲明一個queue,並設置相關屬性。
客戶端使用routing key,在exchange和queue之間建立好綁定關系。
消息發布
客戶端投遞消息到exchange。
exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。
AMQP 里主要要說兩個組件:
Exchange 和 Queue
綠色的X就是Exchange ,紅色的是Queue,這兩者都在Server端,又稱作Broker
這部分是RabbitMQ實現的,而藍色的則是客戶端,通常有Producer和Consumer兩種類型。
4,配置spring-rabbitmq.xml,內容如下:
<!-- 公共部分 --> <!-- 創建連接類 連接安裝好的 rabbitmq --> <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="localhost" /> <!-- username,訪問RabbitMQ服務器的賬戶,默認是guest --> <property name="username" value="${rmq.manager.user}" /> <!-- username,訪問RabbitMQ服務器的密碼,默認是guest --> <property name="password" value="${rmq.manager.password}" /> <!-- host,RabbitMQ服務器地址,默認值"localhost" --> <property name="host" value="${rmq.ip}" /> <!-- port,RabbitMQ服務端口,默認值為5672 --> <property name="port" value="${rmq.port}" /> <!-- channel-cache-size,channel的緩存數量,默認值為25 --> <property name="channel-cache-size" value="50" /> <!-- cache-mode,緩存連接模式,默認值為CHANNEL(單個connection連接,連接之后關閉,自動銷毀) --> <property name="cache-mode" value="CHANNEL" /> </bean> <!--或者這樣配置,connection-factory元素實際就是注冊一個org.springframework.amqp.rabbit.connection.CachingConnectionFactory實例 <rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}" username="${rmq.manager.user}" password="${rmq.manager.password}" />--> <rabbit:admin connection-factory="connectionFactory"/> <!--定義消息隊列,durable:是否持久化,如果想在RabbitMQ退出或崩潰的時候,不會失去所有的queue和消息,需要同時標志隊列(queue)和交換機(exchange)是持久化的,即rabbit:queue標簽和rabbit:direct-exchange中的durable=true,而消息(message)默認是持久化的可以看類org.springframework.amqp.core.MessageProperties中的屬性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;exclusive: 僅創建者可以使用的私有隊列,斷開后自動刪除;auto_delete: 當所有消費客戶端連接斷開后,是否自動刪除隊列 --> <rabbit:queue name="spittle.alert.queue.1" id="queue_1" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="spittle.alert.queue.2" id="queue_2" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="spittle.alert.queue.3" id="queue_3" durable="true" auto-delete="false" exclusive="false" /> <!--綁定隊列,rabbitmq的exchangeType常用的三種模式:direct,fanout,topic三種,我們用direct模式,即rabbit:direct-exchange標簽,Direct交換器很簡單,如果是Direct類型,就會將消息中的RoutingKey與該Exchange關聯的所有Binding中的BindingKey進行比較,如果相等,則發送到該Binding對應的Queue中。有一個需要注意的地方:如果找不到指定的exchange,就會報錯。但routing key找不到的話,不會報錯,這條消息會直接丟失,所以此處要小心,auto-delete:自動刪除,如果為Yes,則該交換機所有隊列queue刪除后,自動刪除交換機,默認為false --> <rabbit:direct-exchange id="spittle.fanout" name="spittle.fanout" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spittle.alert.queue.1" key="{alert.queue.1}"></rabbit:binding> <rabbit:binding queue="spittle.alert.queue.2" key="{alert.queue.2}"></rabbit:binding> <rabbit:binding queue="spittle.alert.queue.3" key="{alert.queue.3}"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange> <!-- 生產者部分 --> <!-- 發送消息的producer類,也就是生產者 --> <bean id="msgProducer" class="com.asdf.sdf.ClassA"> <!-- value中的值就是producer中的的routingKey,也就是隊列名稱,它與上面的rabbit:bindings標簽中的key必須相同 --> <property name="queueName" value="{alert.queue.1}"/> </bean> <!-- spring amqp默認的是jackson 的一個插件,目的將生產者生產的數據轉換為json存入消息隊列,由於fastjson的速度快於jackson,這里替換為fastjson的一個實現 --> <bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean> <!-- 或者配置jackson --> <!-- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> --> <rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> <!-- 消費者部分 --> <!-- 自定義接口類 --> <bean id="testHandler" class="com.rabbit.TestHandler"></bean> <!-- 用於消息的監聽的代理類MessageListenerAdapter --> <bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" > <!-- 類名 --> <constructor-arg ref="testHandler" /> <!-- 方法名 --> <property name="defaultListenerMethod" value="handlerTest"></property> <property name="messageConverter" ref="jsonMessageConverter"></property> </bean> <!-- 配置監聽acknowledeg="manual"設置手動應答,它能夠保證即使在一個worker處理消息的時候用CTRL+C來殺掉這個worker,或者一個consumer掛了(channel關閉了、connection關閉了或者TCP連接斷了),也不會丟失消息。因為RabbitMQ知道沒發送ack確認消息導致這個消息沒有被完全處理,將會對這條消息做re-queue處理。如果此時有另一個consumer連接,消息會被重新發送至另一個consumer會一直重發,直到消息處理成功,監聽容器acknowledge="auto" concurrency="30"設置發送次數,最多發送30次 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20"> <rabbit:listener queues="spittle.alert.queue.1" ref="testQueueListenerAdapter" /> <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" /> <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" /> </rabbit:listener-container>
5,生產者(發送端)代碼:
@Resource private RabbitTemplate rabbitTemplate; private String queueName; public void sendMessage(CommonMessage msg){ try { logger.error("發送信息開始"); System.out.println(rabbitTemplate.getConnectionFactory().getHost()); //發送信息 queueName交換機,就是上面的routingKey msg.getSource() 為 test_key rabbitTemplate.convertAndSend(queueName,msg.getSource(), msg); //如果是普通字符串消息需要先序列化,再發送消息 //rabbitTemplate.convertAndSend(queueName,msg.getSource(), SerializationUtils.serialize(msg)); logger.error("發送信息結束"); } catch (Exception e) { e.printStackTrace(); } } public void setQueueName(String queueName) { this.queueName = queueName; }
6,消費端代碼:TestHandler 類
public class TestHandler { @Override public void handlerTest(CommonMessage commonMessage) { System.out.println("DetailQueueConsumer: " + new String(message.getBody())); } }
其他exchangeType介紹:
fanOut:
<!-- Fanout 扇出,顧名思義,就是像風扇吹面粉一樣,吹得到處都是。如果使用fanout類型的exchange,那么routing key就不重要了。因為凡是綁定到這個exchange的queue,都會受到消息。 --> <rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange"> <rabbit:bindings> <rabbit:binding queue="test_delay_queue"/> </rabbit:bindings> </rabbit:fanout-exchange>
topic:如果說direct是將消息放到exchange綁定的一個queue里(一對一);fanout是將消息放到exchange綁定的所有queue里(一對所有);那么topic類型的exchange就可以實現(一對部分),應用場景就是打印不同級別的錯誤日志,我們的系統出錯后會根據不同的錯誤級別生成error_levelX.log日志,我們在后台首先要把所有的error保存在一個總的queue(綁定了一個*.error的路由鍵)里,然后再按level分別存放在不同的queue。
<!-- 發送端不是按固定的routing key發送消息,而是按字符串“匹配”發送,接收端同樣如此 --> <rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange"> <rabbit:bindings> <rabbit:binding queue="Q1" pattern="error.*.log" /> <rabbit:binding queue="Q2" pattern="error.level1.log" /> <rabbit:binding queue="Q3" pattern="error.level2.log" /> </rabbit:bindings> </rabbit:topic-exchange>
routing key綁定如下圖:
本文轉自:
https://blog.csdn.net/nandao158/article/details/81065892