spring集成RabbitMQ配置文件詳解(生產者和消費者)


1,首先引入配置文件org.springframework.amqp,如下:

<dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.7.1.RELEASE</version>
        </dependency>

2,准備工作:安裝好rabbitmq,並在項目中增加配置文件   rabbit.properties 內容如下:

rmq.ip=192.188.113.114   
rmq.port=5672
rmq.producer.num=20
rmq.manager.user=admin
rmq.manager.password=admin

3,rabbitmq屬性介紹:

概念解釋:

Brocker:消息隊列服務器實體。

Exchange:消息交換機,指定消息按什么規則,路由到哪個隊列。

Queue:消息隊列,每個消息都會被投入到一個或者多個隊列里。

Binding:綁定,它的作用是把exchange和queue按照路由規則binding起來。

Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。

Virtual Host: 虛擬主機,一個broker里可以開設多個vhost,用作不用用戶的權限分離。每個virtual host本質上都是一個RabbitMQ Server(但是一個server中可以有多個virtual host),擁有它自己若干的個Exchange、Queue和bings rule等等。其實這是一個虛擬概念,類似於權限控制組。Virtual Host是權限控制的最小粒度。

Producer:消息生產者,就是投遞消息的程序。

Consumer:消息消費者,就是接受消息的程序。

Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。接下來的實踐案例中我們就可以看到,producer和consumer與exchange的通信的前提是先建立TCP連接。僅僅創建了TCP連接,producer和consumer與exchange還是不能通信的。我們還需要為每一個Connection創建Channel。

Channel: 它是建立在上述TCP連接之上的虛擬連接。數據傳輸都是在Channel中進行的。AMQP協議規定只有通過Channel才能執行AMQP的命令。一個Connection可以包含多個Channel。有人要問了,為什么要使用Channel呢,直接用TCP連接不就好了么?對於一個消息服務器來說,它的任務是處理海量的消息,當有很多線程需要從RabbitMQ中消費消息或者生產消息,那么必須建立很多個connection,也就是許多個TCP連接。然而對於操作系統而言,建立和關閉TCP連接是非常昂貴的開銷,而且TCP的連接數也有限制,頻繁的建立關閉TCP連接對於系統的性能有很大的影響,如果遇到高峰,性能瓶頸也隨之顯現。RabbitMQ采用類似NIO的做法,選擇TCP連接服用,不僅可以減少性能開銷,同時也便於管理。在TCP連接中建立Channel是沒有上述代價的,可以復用TCP連接。對於Producer或者Consumer來說,可以並發的使用多個Channel進行Publish或者Receive。有實驗表明,在Channel中,1秒可以Publish10K的數據包。對於普通的Consumer或者Producer來說,這已經足夠了。除非有非常大的流量時,一個connection可能會產生性能瓶頸,此時就需要開辟多個connection。

消息隊列的使用過程大概如下:
消息接收

客戶端連接到消息隊列服務器,打開一個channel。
客戶端聲明一個exchange,並設置相關屬性。
客戶端聲明一個queue,並設置相關屬性。
客戶端使用routing key,在exchange和queue之間建立好綁定關系。

消息發布

客戶端投遞消息到exchange。
exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。

AMQP 里主要要說兩個組件:

Exchange 和 Queue
綠色的X就是Exchange ,紅色的是Queue,這兩者都在Server端,又稱作Broker
這部分是RabbitMQ實現的,而藍色的則是客戶端,通常有Producer和Consumer兩種類型。

 

4,配置spring-rabbitmq.xml,內容如下:

<!-- 公共部分 -->
<!-- 創建連接類 連接安裝好的 rabbitmq -->
<bean id="connectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="localhost" />    
    <!-- username,訪問RabbitMQ服務器的賬戶,默認是guest -->
    <property name="username" value="${rmq.manager.user}" />
    <!-- username,訪問RabbitMQ服務器的密碼,默認是guest -->   
    <property name="password" value="${rmq.manager.password}" />
    <!-- host,RabbitMQ服務器地址,默認值"localhost" -->   
    <property name="host" value="${rmq.ip}" />   
    <!-- port,RabbitMQ服務端口,默認值為5672 -->
    <property name="port" value="${rmq.port}" />
    <!-- channel-cache-size,channel的緩存數量,默認值為25 -->
    <property name="channel-cache-size" value="50" />
    <!-- cache-mode,緩存連接模式,默認值為CHANNEL(單個connection連接,連接之后關閉,自動銷毀) -->
    <property name="cache-mode" value="CHANNEL" />   
</bean>
<!--或者這樣配置,connection-factory元素實際就是注冊一個org.springframework.amqp.rabbit.connection.CachingConnectionFactory實例
<rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}"
username="${rmq.manager.user}" password="${rmq.manager.password}" />-->
<rabbit:admin connection-factory="connectionFactory"/>

<!--定義消息隊列,durable:是否持久化,如果想在RabbitMQ退出或崩潰的時候,不會失去所有的queue和消息,需要同時標志隊列(queue)和交換機(exchange)是持久化的,即rabbit:queue標簽和rabbit:direct-exchange中的durable=true,而消息(message)默認是持久化的可以看類org.springframework.amqp.core.MessageProperties中的屬性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;exclusive: 僅創建者可以使用的私有隊列,斷開后自動刪除;auto_delete: 當所有消費客戶端連接斷開后,是否自動刪除隊列 -->
<rabbit:queue name="spittle.alert.queue.1" id="queue_1" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="spittle.alert.queue.2" id="queue_2" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="spittle.alert.queue.3" id="queue_3" durable="true" auto-delete="false" exclusive="false" />

<!--綁定隊列,rabbitmq的exchangeType常用的三種模式:direct,fanout,topic三種,我們用direct模式,即rabbit:direct-exchange標簽,Direct交換器很簡單,如果是Direct類型,就會將消息中的RoutingKey與該Exchange關聯的所有Binding中的BindingKey進行比較,如果相等,則發送到該Binding對應的Queue中。有一個需要注意的地方:如果找不到指定的exchange,就會報錯。但routing key找不到的話,不會報錯,這條消息會直接丟失,所以此處要小心,auto-delete:自動刪除,如果為Yes,則該交換機所有隊列queue刪除后,自動刪除交換機,默認為false -->
<rabbit:direct-exchange id="spittle.fanout" name="spittle.fanout" durable="true" auto-delete="false">
    <rabbit:bindings>
        <rabbit:binding queue="spittle.alert.queue.1" key="{alert.queue.1}"></rabbit:binding>
        <rabbit:binding queue="spittle.alert.queue.2" key="{alert.queue.2}"></rabbit:binding>
        <rabbit:binding queue="spittle.alert.queue.3" key="{alert.queue.3}"></rabbit:binding>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!-- 生產者部分 -->
<!-- 發送消息的producer類,也就是生產者 -->
<bean id="msgProducer" class="com.asdf.sdf.ClassA">
    <!-- value中的值就是producer中的的routingKey,也就是隊列名稱,它與上面的rabbit:bindings標簽中的key必須相同 -->
    <property name="queueName" value="{alert.queue.1}"/>
</bean>

<!-- spring amqp默認的是jackson 的一個插件,目的將生產者生產的數據轉換為json存入消息隊列,由於fastjson的速度快於jackson,這里替換為fastjson的一個實現 -->
<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>
<!-- 或者配置jackson -->
<!--
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
-->

<rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />

<!-- 消費者部分 -->
<!-- 自定義接口類 -->
<bean id="testHandler" class="com.rabbit.TestHandler"></bean>

<!-- 用於消息的監聽的代理類MessageListenerAdapter -->
<bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" >
        <!-- 類名 -->
    <constructor-arg ref="testHandler" />
        <!-- 方法名 -->
    <property name="defaultListenerMethod" value="handlerTest"></property>
                <property name="messageConverter" ref="jsonMessageConverter"></property>
</bean>

<!-- 配置監聽acknowledeg="manual"設置手動應答,它能夠保證即使在一個worker處理消息的時候用CTRL+C來殺掉這個worker,或者一個consumer掛了(channel關閉了、connection關閉了或者TCP連接斷了),也不會丟失消息。因為RabbitMQ知道沒發送ack確認消息導致這個消息沒有被完全處理,將會對這條消息做re-queue處理。如果此時有另一個consumer連接,消息會被重新發送至另一個consumer會一直重發,直到消息處理成功,監聽容器acknowledge="auto" concurrency="30"設置發送次數,最多發送30次 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20">
        <rabbit:listener queues="spittle.alert.queue.1" ref="testQueueListenerAdapter" />
    <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" />
    <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" />
</rabbit:listener-container>

 

5,生產者(發送端)代碼:

@Resource  
private RabbitTemplate rabbitTemplate;
private String queueName;  
public void sendMessage(CommonMessage msg){
         try {  
              logger.error("發送信息開始");
              System.out.println(rabbitTemplate.getConnectionFactory().getHost());  
             //發送信息  queueName交換機,就是上面的routingKey msg.getSource() 為 test_key 
             rabbitTemplate.convertAndSend(queueName,msg.getSource(), msg);
             //如果是普通字符串消息需要先序列化,再發送消息
             //rabbitTemplate.convertAndSend(queueName,msg.getSource(), SerializationUtils.serialize(msg));
             logger.error("發送信息結束");
         } catch (Exception e) {  
             e.printStackTrace();
         }
    }

public void setQueueName(String queueName) {
    this.queueName = queueName;
}

 

6,消費端代碼:TestHandler 類

public class TestHandler  {
    @Override
    public void handlerTest(CommonMessage commonMessage) {
        System.out.println("DetailQueueConsumer: " + new String(message.getBody()));
    }
}

其他exchangeType介紹:

fanOut:

<!-- Fanout 扇出,顧名思義,就是像風扇吹面粉一樣,吹得到處都是。如果使用fanout類型的exchange,那么routing key就不重要了。因為凡是綁定到這個exchange的queue,都會受到消息。 -->
<rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange">  
        <rabbit:bindings>  
            <rabbit:binding queue="test_delay_queue"/>  
        </rabbit:bindings>  
</rabbit:fanout-exchange>

topic:如果說direct是將消息放到exchange綁定的一個queue里(一對一);fanout是將消息放到exchange綁定的所有queue里(一對所有);那么topic類型的exchange就可以實現(一對部分),應用場景就是打印不同級別的錯誤日志,我們的系統出錯后會根據不同的錯誤級別生成error_levelX.log日志,我們在后台首先要把所有的error保存在一個總的queue(綁定了一個*.error的路由鍵)里,然后再按level分別存放在不同的queue。

<!-- 發送端不是按固定的routing key發送消息,而是按字符串“匹配”發送,接收端同樣如此 -->
<rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="Q1" pattern="error.*.log" />
            <rabbit:binding queue="Q2" pattern="error.level1.log" />
            <rabbit:binding queue="Q3" pattern="error.level2.log" />
        </rabbit:bindings>
</rabbit:topic-exchange>

routing key綁定如下圖:

 

本文轉自:

https://blog.csdn.net/nandao158/article/details/81065892

https://www.cnblogs.com/LipeiNet/p/6079427.html

https://www.toutiao.com/a6598154241037042189/?tt_from=mobile_qq&utm_campaign=client_share&timestamp=1536277584&app=news_article&utm_source=mobile_qq&iid=43157585039&utm_medium=toutiao_android&group_id=6598154241037042189


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM