RabbitMQ(三)五種工作模式和Exchange交換機


 

前言 

先來了解RabbitMQ一個重要的概念:Exchange交換機

1. Exchange概念

  • Exchange:接收消息,並根據路由鍵轉發消息所綁定的隊列。

  

藍色框:客戶端發送消息至交換機,通過路由鍵路由至指定的隊列。
黃色框:交換機和隊列通過路由鍵有一個綁定的關系。
綠色框:消費端通過監聽隊列來接收消息。

 

2. 交換機屬性

 Name交換機名稱
 Type交換機類型——direct、topic、fanout、headers、sharding(此篇不講)
 Durability是否需要持久化,true為持久化
 Auto Delete當最后一個綁定到Exchange上的隊列刪除后,自動刪除該Exchange
 Internal當前Exchange是否用於RabbitMQ內部使用,默認為false
 Arguments擴展參數,用於擴展AMQP協議自定制化使用

 

一、簡單模式(Hello Word)

P代表生產者,C代表消費者,紅色代碼消息隊列。P將消息發送到消息隊列,C對消息進行處理。

生產者:

@Controller 
public class ProducerDemo {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("/send")
    @ResponseBody
    public String send() {
        String context = "hello==========" + new Date();
        log.info("Sender : " + context);
        //生產者,正在往hello這個路由規則中發送,由於沒有交換機,所以路由規則就是隊列名稱
        this.rabbitTemplate.convertAndSend("hello", context);
        return "success";
    }
}

消費者:

@Component
//監聽hello這個隊列
@RabbitListener(queues = "hello")
public class ConsumerDemo {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  ===================: " + hello);
    }
}

 

二、工作模式

一個隊列有兩個消費者。一個隊列中一條消息,只能被一個消費者消費。

在上面的基礎中,添加一個消費者就OK了。

消費者:

//第1個消費者
@Component @RabbitListener(queues
= "hello")//監聽hello這個隊列 public class ConsumerDemo1{ @RabbitHandler public void process(String hello) { System.out.println("Receiver ===================: " + hello); } }
//第2個消費者 @Component @RabbitListener(queues = "hello")//監聽hello這個隊列 public class ConsumerDemo2{ @RabbitHandler public void process(String hello) { System.out.println("Receiver ===================: " + hello); } }

當兩個消費者同時監聽一個隊列時,他們並不能同時消費一條消息,而是隨機消費消息。1,2,3,4,5,6消息來了,consumer1消費了1,3,5;consumer2消費了2,4,6。

這個數據是隨機的哦,別理解為奇偶數。可以自己測試一下。

 

三、訂閱與發布模式(Fanout)

  • 不處理路由鍵,只需要簡單的將隊里綁定到交換機上

  • 生產者將消息不是直接發送到隊列,而是發送到X交換機,發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上

  • Fanout交換機轉發消息是最快的

定義一個訂閱模式的交換機:FanoutExchange交換機。然后創建2個隊列helloA,helloB,然后將這兩個隊列綁定到交換機上面。

@Configuration
public class RabbitMQConfig {

     @Bean
    public Queue queueA() {
        return new Queue("helloA", true);
    }

    @Bean
    public Queue queueB() {
        return new Queue("helloB", true);
    }

    //創建一個fanoutExchange交換機
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    //將queueA隊列綁定到fanoutExchange交換機上面
    @Bean
    Binding bindingExchangeMessageFanoutA(Queue queueA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueA).to(fanoutExchange);
    }

    //將queueB隊列綁定到fanoutExchange交換機上面
    @Bean
    Binding bindingExchangeMessageFanoutB(Queue queueB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueB).to(fanoutExchange);
    }

}

注意一個細節:bindingExchangeMessageFanoutA這種參數中的queueA與創建隊列的方法queueA()名字要相同哦。這樣才知道queueA綁定了該交換機哦。

交換機的名稱也同樣。fanoutExchange參數的名字和fanoutExchange()名字要一樣哦。

生產者:

this.rabbitTemplate.convertAndSend("fanoutExchange","", context);

消費者:

//第1個消費者
@Component
@RabbitListener(queues = "hello")//監聽hello這個隊列
public class ConsumerDemo1{

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  ===================: " + hello);
    }
}

//第2個消費者
@Component
@RabbitListener(queues = "hello")//監聽hello這個隊列
public class ConsumerDemo2{

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  ===================: " + hello);
    }
}

現在生產者發送了一條消息,會發現consumer1,2都會收到。之前不是說過一個隊列里面的一條消息,只能被一個消費者消費嗎?怎么現在一條消息被兩個消費者消費了。

要知道這里對於生產者來說是只生產了一條消息,但是它發送給了交換機,交換機會根據綁定的隊列來發送。

現在綁定了queueA,queueB隊列,所以兩個隊列里面都有消息了。而消費者關注的也是兩個隊列,就看到了一條消息被兩個消費者消費的情況了。

 

四、路由模式(Direct)

  • 所有發送到Direct Exchange的消息被轉發到RouteKey中指定的Queue。
  • 消息傳遞時,RouteKey必須完全匹配才會被隊列接收,否則該消息會被拋棄。
@Configuration
public class RabbitMQConfig {
    public static final String DIRECT_EXCHANGE = "directExchange";
    public static final String QUEUE_DIRECT_A = "direct.A";
    public static final String QUEUE_DIRECT_B = "direct.B";

    //創建一個direct交換機
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    //創建隊列A  
    @Bean
    Queue queueDirectNameA() {
        return new Queue(QUEUE_DIRECT_A);
    }

    //創建隊列B
    @Bean
    Queue queueDirectNameB() {
        return new Queue(QUEUE_DIRECT_B);
    }

    //將direct.A隊列綁定到directExchange交換機中,使用direct.a.key作為路由規則
    @Bean
    Binding bindingExchangeMessageDirectA(Queue queueDirectNameA, DirectExchange directExchange) {
        return BindingBuilder.bind(queueDirectNameA).to(directExchange).with("direct.a.key");
    }

    //將direct.B隊列綁定到directExchange交換機中,使用direct.b.key作為路由規則
    @Bean
    Binding bindingExchangeMessageDirectB(Queue queueDirectNameB, DirectExchange directExchange) {
        return BindingBuilder.bind(queueDirectNameB).to(directExchange).with("direct.b.key");
    }
}

消費者:

@Component
public class ConsumerDemo {

    @RabbitListener(queues = "direct.A")
    @RabbitHandler
    public void processtopicA(String hello) {
        System.out.println("Receiver Exchanges direct.A  ===================: " + hello);
    }

    @RabbitListener(queues = "direct.B")
    @RabbitHandler
    public void processtopicB(String hello) {
        System.out.println("Receiver Exchanges direct.B  ===================: " + hello);
    }
} 

生產者:

// 往directExchange交換機中發送消息,使用direct.a.key作為路由規則。
rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, "direct.a.key", context);

direct.A,direct.B 兩個隊列都綁定了交換機directExchange,但他們的路由規則不同,a隊列用了direct.a.key,b隊列用了direct.b.key,

這種情況下,生產者使用direct.a.key作為路由規則,就只有a隊列能收到消息,b隊列則收不到消息。

 

五、主題模式(Topic)

  • 所有發送到Topic Exchange的消息被轉發到所有管線RouteKey中指定Topic的Queue上

  • Exchange將RouteKey和某Topic進行模糊匹配,此時隊列需要綁定一個Topic

@Configuration
public class RabbitMQConfig {

  public static final String TOPIC_EXCHANGE = "topicExchange";

    public static final String TOPIC_KEY_A = "topic.#";

    public static final String TOPIC_KEY_B = "topic.b.key";

    public static final String QUEUE_TOPIC_A = "topic.A";

    public static final String QUEUE_TOPIC_B = "topic.B";


    //創建一個topic交換機
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    //創建隊列A
    @Bean
    Queue queueTopicNameA() {
        return new Queue(QUEUE_TOPIC_A);
    }

    //創建隊列B
    @Bean
    Queue queueTopicNameB() {
        return new Queue(QUEUE_TOPIC_B);
    }

    //隊列topic.A綁定交換機並且關聯了topic.#正則路由規則。就是說只要topic.開頭的,topic.A隊列都將收到消息
    @Bean
    Binding bindingExchangeMessageTopicA(Queue queueTopicNameA, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueTopicNameA).to(topicExchange).with(TOPIC_KEY_A);
    }

    //隊列topic.B綁定交換機並且關聯了topic.b.key正則路由規則。就是說必須是topic.b.key,topic.B隊列才能收到消息,和directExchange類型一樣了。
    @Bean
    Binding bindingExchangeMessageTopicB(Queue queueTopicNameB, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueTopicNameB).to(topicExchange).with(TOPIC_KEY_B);
    }

}

消費者:

@Component
public class ConsumerDemo {

    @RabbitListener(queues = "topic.A")
    @RabbitHandler
    public void processtopicA(String hello) {
        System.out.println("Receiver Exchanges topic.A  ===================: " + hello);
    }

    @RabbitListener(queues = "topic.B")
    @RabbitHandler
    public void processtopicB(String hello) {
        System.out.println("Receiver Exchanges topic.B  ===================: " + hello);
    }
}

生產者:

@RequestMapping("/topic/send")
@ResponseBody
public String sendTopicExchange() {
    String context = "Exchange==topic-->a====== " + new Date();
   // 往topicExchange交換機中發送消息,使用topic.b.key作為路由規則。
   this.rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, "topic.b.key", context);
    return "success";
}

這里發送消息時,往topicExchange這個交換機中發送,並且路由規則為topic.b.key。由於b隊列綁定了交換機和路由規則就是它,所以隊列b能收到消息。

但是由於A隊列的過濾規則為topic.#,就是說只要topic開頭的就的路由規則,交換機就會往這個隊列里面發送消息。所以a隊列也能收到消息,topic.b.key是topic開頭的。

對於a隊列來說,路由規則為topic.adsf,topic.b.key,topic.a等等,a隊列都將收到消息,因為它的路由規則就是topic開頭就可以。

 

六、關系轉換

訂閱模式,路由模式,主題模式,他們三種都非常類似。而且主題模式可以隨時變成兩外兩種模式。

在主題模式下:

  • 當路由規則不為正則表達式的時候,他就和路由模式一樣。
  • 當路由規則不為正則表達式,且路由規則一樣時,就變成了訂閱模式。

在路由模式下:

  • 當路由規則一樣時,就變成了訂閱模式。

簡單總結五種模式:

  • 簡單模式:生產者,一個消費者,一個隊列
  • 工作模式:生產者,多個消費者,一個隊列
  • 訂閱與發布模式(fanout):生產者,一個交換機(fanoutExchange),沒有路由規則,多個消費者,多個隊列
  • 路由模式(direct):生產者,一個交換機(directExchange),路由規則,多個消費者,多個隊列
  • 主題模式(topic):生產者,一個交換機(topicExchange),模糊匹配路由規則,多個消費者,多個隊列

 

七、總結

  • 一個隊列,一條消息只會被一個消費者消費(有多個消費者的情況也是一樣的)。
  • 訂閱模式,路由模式,主題模式,他們的相同點就是都使用了交換機,只不過在發送消息給隊列時,添加了不同的路由規則。訂閱模式沒有路由規則,路由模式為完全匹配規則,主題模式有正則表達式,完全匹配規則。
  • 在訂閱模式中可以看到一條消息被多個消費者消費了,不違背第一條總結,因為一條消息被發送到了多個隊列中去了。
  • 在交換機模式下:隊列和路由規則有很大關系
  • 在有交換機的模式下:3,4,5模式下,生產者只用關心交換機與路由規則即可,無需關心隊列
  • 消費者不管在什么模式下:永遠不用關心交換機和路由規則,消費者永遠只關心隊列,消費者直接和隊列交互

 

 

引用:http://www.baowenwei.com/post/fen-bu-shi/rabbitmqyu-spring-springbootji-cheng-ji-mqde-liu-chong-gong-zuo-mo-shi-san


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM