結合生活案例實現rabbitmq消息通信


ps : 這篇文章比較長,讀者還是需要耐心的閱讀的。干貨多多。

在分布式項目中為了提高性能,也為了實現項目規范,我們都會在處理消息隊列的時候引入消息中間件。中間件的作用一個是為了解耦,還有一個是性能提升。消息中間件我們每個人每天都在接觸,相信大家都用過美團或者是聽過美團。從程序員的角度看美團外賣涉及三方角色。【商家】【騎手】【顧客】。這三者的關系簡單理解如下

三角色結構

下面案例會通過代碼說明,如下是項目結構
項目結構
rabbit-demo

下訂單

今天我們着重介紹下訂單的流程,那為什么選擇下訂單流程而不選擇其他兩個流程呢。因為派送流程不是消息隊列。如果非要規划為消息塊我只能認為是消息消費。而發訂單呢流程和下訂單是一樣的。考慮讀者應該都是平時點餐的那位。對下訂單應該更加的了解。

線下買單

我們想想在美團外賣推出之前我們點餐的過程。是顧客到商家店鋪里進行買單消費的。這種模式存在什么弊端。這就必須商家的店鋪夠大,人手夠多才能夠讓我們消費者不擁堵起來。但是這樣就增加了商家的成本。所以在這種局限的條件下我們的線上交易誕生了。
線下交易

線上交易

有了美團我們就只需要在手機點餐,商家就會進行選擇行接受,然后出貨。這就是下單的一個流程。后面的就是騎手商家流程了。

我們的顧客相當於是在向消息隊列中添加消息,消息體就是我們的訂單。在美團外賣的系統中同一時刻可能多人點單這就導致商家一時間處理不過來,這是我們的mq就作為一中緩存機制先將顧客的訂單本地化,mq按先進先出的順序將消息有條不紊的派發到只能的商家手中。這就解決我們人多導致訂單繁瑣的問題了。
好了以上是我們用mq的好處。下面我們通過代碼看看mq具體的發送消息的集中方式。
線上交易

mq結構圖

結構

環境准備

rabbitmq安裝

由於RabbitMQ是用Erlang語言編寫的,因此需要先安裝Erlang。

  • 通過http://www.erlang.org/downloads獲取對應安裝文件進行安裝
  • 增加環境變量ERLANG_HOME=D:\Program Files\erl9.3(這里的目錄是我的安裝目錄,你要換成自己的目錄)
  • 修改環境變量Path,在原來的值后面加上“;%ERLANG_HOME%\bin”

安裝完Erlang之后,我們就可以安裝RabbitMQ了。

  • 到http://www.rabbitmq.com/install-windows-manual.html下載安裝包進行安裝
  • 增加環境變量RABBITMQ_HOEM=D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.5(這里的目錄是我的安裝目錄,你要換成自己的目錄)
  • 修改環境變量Path,在原來的值后面加上“;%RABBITMQ_HOME%\sbin”

安裝好之后,RabbitMQ就作為一個服務按照默認方式進行啟動了

運行命令rabbitmq-plugins enable rabbitmq_management 開啟Web管理插件
啟動
通過瀏覽器訪問http://localhost:15672,並通過默認用戶guest進行登錄,密碼也是guest,登錄后的頁面:
瀏覽器界面

springboot項目搭建

關於springboot的搭建這里就不能細說了。我們直接在zxhtom框架中擴展了。只要在里面添加如下依賴就行了


<!-- rabbit mq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

然后我們只需要創建一個RabbitConfig類。這個類的作用就是讓mq在項目啟動初期檢查並完成各個組件的創建。既然交給spring完成自然需要在類上加上@Configuration注解。

只需要完成以上操作,我們的mq服務就算是配置完成了。剩下的就應該是生產消費了。
這里比較注意的是RabbitConfig這個類,這個類里到底是什么東西,我們在看代碼前我們先來了解下mq的原理。

rabbitmq組件拆裝

Queue : 消息隊列 , 生產者的直接接觸對象。
Exchange : 交換機 , 正常情況消費者是直接接觸交換機的。
Routing Key : 生產者和消費者接觸的兩個不同對象。這就需要routing key 將兩者對象進行關聯起來
Binding : 通過routing key進行綁定到一起。

下面是消費者、生產者、隊列、交換機、rk、binding之間的關系

關系

四種交換機

上面我們了解了mq的內部結構,其實還是很簡單的。但是我們mq有的時候有廣播模式,有的點對點模式。這些是這么來的呢。對了正是我們本節介紹的內容--交換機。

Direct Exchange

顧名思義就是【直連交換機】。要求發送的消息與一個特定的路由鍵完全的匹配。如下代碼中我們queue1的隊列和directExchange交換機通過DIRECTION綁定到了一起。那么我們發送消息是必須將消息標記為DIRECTION


@Bean
public Binding binding1(){
    return BindingBuilder.bind(queue1()).to(directExchange()).with(DIRECTIONKEY);
}


public void directionSend() {
    rabbitTemplate.convertAndSend(RabbitConfig.DIRECTEXCHANGE, "directionKey", msg);
}

Fanout Exchange

fanout中文翻譯為【輸出】,在這里我們理解成廣播。【廣播交換機】
廣播的意思就是所有人都能接受到,在這里需要重新定義為只要綁定在次交換機上的所有隊列都能接收到。不想其他交換機需要key來進行暗號匹配。Fanout Exchange就是我們常用的廣播模式,只要訂閱就會收到消息。像一些房產中介就是這種模式,只要你留下手機號碼樓盤動態就會實時的推送在你的手機上。


@Bean
public Binding fanoutBinding1(){
    return BindingBuilder.bind(queue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2(){
    return BindingBuilder.bind(queue2()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding3(){
    return BindingBuilder.bind(queue3()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding4(){
    return BindingBuilder.bind(queue4()).to(fanoutExchange());
}

下面代碼就是只要有消息發送到Fanout Exchange上所有的隊列就會收到消息

public void fanoutSend1() {
    rabbitTemplate.convertAndSend(RabbitConfig.FANOUTEXCHANGE, RabbitConfig.QUEUETHIRD, msg);
}

Topic Exchange

將路由鍵和某種模式進行匹配,這里可以理解成Direction Exchange里的key可以通過某種正則匹配
# : 表示一個或多個詞,
* : 表示一個詞
zxh.# : 能夠匹配 zxh ; zxh. ; zxh.abc ; zxh.ab.sd.s.a
zxh.* : 能夠匹配 zxh.test ; zxh.demo

Headers Exchange

不處理路由鍵。而是根據發送的消息內容中的headers屬性進行匹配。在綁定Queue與Exchange時指定一組鍵值對;當消息發送到RabbitMQ時會取到該消息的headers與Exchange綁定時指定的鍵值對進行匹配;如果完全匹配則消息會路由到該隊列,否則不會路由到該隊列。headers屬性是一個鍵值對,可以是Hashtable,鍵值對的值可以是任何類型。而fanout,direct,topic 的路由鍵都需要要字符串形式的。

匹配規則x-match有下列兩種類型:

x-match = all :表示所有的鍵值對都匹配才能接受到消息

x-match = any :表示只要有鍵值對匹配就能接受到消息

//請求數據中必須符合headerValues中任意一個參數
@Bean
public Binding headBinding1(){
    Map<String,Object> headerValues = new HashMap<>();
    headerValues.put("type", "cash");
    headerValues.put("aging", "fast");
    return BindingBuilder.bind(queue1()).to(headersExchange()).whereAll(headerValues).match();
}

死信隊列

上面四中交換機介紹完了。我們這里還有一種叫做死信隊列
還是美團外賣為列, 我們平時美團點單完之后,但是不確定是不是現在就要商家制作。這個時候我們就不會選擇下單。美團就會提示我們30分鍾之內完成付款,負責失效。想想也對美團不可能一直在哪里等着你下單。你這樣會占用人家內存的。


@Bean
public Queue deadLetterQueue(){
    Map<String, Object> arguments = new HashMap<String, Object>();
    //設置此死信隊列消息過期后,消息轉發到那個隊列上
    arguments.put("x-dead-letter-exchange", DIRECTEXCHANGE);
    arguments.put("x-dead-letter-routing-key", DIRECTIONKEY);
    return new Queue(QUEUEDEAD, true, false, false,arguments);
}

@Bean
public Binding deadBinding(){
    return BindingBuilder.bind(deadLetterQueue()).to(directExchange()).with(QUEUEDEAD);
}
public void deadSend() {
    MessagePostProcessor processor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setExpiration("60000");
            return message;
        }
    };
    log.info("延時消息開始發送:");
    rabbitTemplate.convertAndSend(RabbitConfig.DIRECTEXCHANGE, RabbitConfig.QUEUEDEAD, msg,processor);
}

通過上述代碼我們就能實現發送一個消息,會出現延時的效果。這就是我們平時30分鍾支付的效果代碼。

總結

這篇文章比較長,讀者還是需要耐心的閱讀的。干貨多多。

加入戰隊

# 加入戰隊

微信公眾號

微信公眾號


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM