3、RabbitMQ交換機作用_分類_特點_使用案例_五種形式隊列


1、RabbitMQ交換機的作用:
生產者發送消息不會像傳統方式直接將消息投遞到隊列中,而是先將消息投遞到交換機中,在由交換機轉發到具體的隊列,隊列再將消息以推送或者拉取方式給消費者進行消費。交換機的作用根據具體的路由策略分發到不同的隊列中。
2、RabbitMQ的Exchange(交換器)分為四種類型:
direct(默認)、headers、fanout、topic。其中headers交換器允許你匹配AMQP消息的header而非路由鍵,除此之外headers交換器和direct交換器完全一致,但性能卻很差,幾乎用不到,所以本文也不做講解。fanout、topic交換器是沒有歷史數據的,也就是說對於中途創建的隊列,獲取不到之前的消息。
2.1 Direct Exchange(直連交換器):
1)也是默認的交換機,是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的
2)直連交換機的特性:
a.公平調度:當接收端訂閱者有多個的時候,direct會輪詢公平的分發給每個訂閱者(訂閱者消息確認正常);
b.消息的發后既忘特性:發后既忘模式是指接受者不知道消息的來源,如果想要指定消息的發送者,需要包含在發送內容里面,這點就像我們在信件里面注明自己的姓名一樣,只有這樣才能知道發送者是誰;
c.消息確認:消息接收到之后必須使用channel.basicAck()方法手動確認(非自動確認刪除模式下);
c.1如果應用程序接收了消息,缺忘記確認接收的話(可能因為bug),消息在隊列的狀態會從“Ready”變為“Unacked”;
c.2如果消息收到卻未確認,Rabbit將不會再給這個應用程序發送更多的消息了,這是因為Rabbit認為你沒有准備好接收下一條消息。此條消息會一直保持Unacked的狀態,直到你確認了消息,或者斷開與Rabbit的連接,Rabbit會自動把消息改回Ready狀態,分發給其他訂閱者。當然你可以利用這一點,讓你的程序延遲確認該消息,直到你的程序處理完相應的業務邏輯,這樣可以有效的防治Rabbit給你過多的消息,導致程序崩潰。
d.消息拒絕:消息在確認之前,可以有兩個選擇:
d.1 斷開與Rabbit的連接,這樣Rabbit會重新把消息分派給另一個消費者;
d.2 拒絕Rabbit發送的消息使用channel.basicReject(long deliveryTag, boolean requeue)
參數說明:【參數1:消息的id;
參數2:處理消息的方式;
如果是true,Rabbib會重新分配這個消息給其他訂閱者,如果設置成false的話,Rabbit會把消息發送到一個特殊的“死信”隊列,用來存放被拒絕而不重新放入隊列的消息;

2.2 Fanout Exchange(廣播交換器):Fanout 中文意思為 扇出
1)fanout有別於direct交換器,fanout是一種發布/訂閱模式的交換器,當你發送一條消息的時候,交換器會把消息廣播到所有附加到這個交換器的隊列上。
比如用戶上傳了自己的頭像,這個時候圖片需要清除緩存,同時用戶應該得到積分獎勵,你可以把這兩個隊列綁定到圖片上傳的交換器上,這樣當有第三個、第四個上傳完圖片需要處理的需求的時候,原來的代碼可以不變,只需要添加一個訂閱消息即可,這樣發送方和消費者的代碼完全解耦,並可以輕而易舉的添加新功能了。
2)廣播交換機的特點(也可以說是與直連交換器的區別)
a.在發送消息時需新增channel.exchangeDeclare(ExchangeName, "fanout"),這行代碼聲明fanout交換器;
b.在接受消息時需要聲明fanout路由器,並且fanout需要綁定隊列到對應的交換器用於訂閱消息;
channel.queueDeclare().getQueue()為隨機隊列,Rabbit會隨機生成隊列名稱,一旦消費者斷開連接,該隊列會自動刪除。
注意:對於fanout交換器來說routingKey(路由鍵)是無效的,這個參數是被忽略的。
2.3 Topic Exchange(主題交換器):是一種匹配訂閱模式
1)topic交換器運行和fanout類似,但是可以更靈活的匹配自己想要訂閱的信息,這個時候routingKey路由鍵就排上用場了,使用路由鍵進行消息(規則)匹配。
2)假設我們現在有一個日志系統,會把所有日志級別的日志發送到交換器,warning、log、error、fatal,但我們只想處理error以上的日志,要怎么處理?這就需要使用topic路由器了。topic路由器的關鍵在於定義路由鍵,定義routingKey名稱不能超過255字節。主題交換器使用“.”作為分隔符;"*"匹配一個分段(用“.”分割)的內容; "#"匹配0和多個字符;
示列:例如發布了一個“com.mq.rabbit.error”的消息:
能匹配上的路由鍵:
cn.mq.rabbit.*
cn.mq.rabbit.#
#.error
cn.mq.#
#
不能匹配上的路由鍵:
cn.mq.*
*.error
*
所以如果想要訂閱所有消息,可以使用“#”匹配。
注意:fanout、topic交換器是沒有歷史數據的,也就是說對於中途創建的隊列,獲取不到之前的消息。
3、RabbitMQ 直連交換機、廣播交換機、主題交換機使用案例:
3.1 依賴引入:
<!--rabbitmq依賴引用-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
mq.properties文件配置:
rabbitmq.address=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
3.2 代碼示列:
1)生產和消費請求:RabbitMqController
package com.zj.weblearn.controller.rabbitmq;

import com.zj.weblearn.serviceImpl.rabbitmq.DirectExchangeServiceImpl;
import com.zj.weblearn.serviceImpl.rabbitmq.FanoutExchangeServiceImpl;
import com.zj.weblearn.serviceImpl.rabbitmq.TopicExchangeServiceImpl;
import com.zj.weblearn.utils.ResponseDTO;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.servlet.http.HttpServletRequest;
import java.util.Arrays;
import java.util.List;

/*
 * @Copyright (C), 2002-2020,
 * @ClassName: MqController
 * @Author:
 * @Date: 2020/9/23 9:49
 * @Description:
 * @History:
 * @Version:1.0
 */
@Controller
@RequestMapping("/mq/")
public class RabbitMqController {

    @Autowired
    DirectExchangeServiceImpl directExchangeServiceImpl;

    @Autowired
    FanoutExchangeServiceImpl fanoutExchangeServiceImpl;

    @Autowired
    TopicExchangeServiceImpl topicExchangeServiceImpl;

    private final static String CANCLED_ORDER_QUEUE = "cancled_order_queue";

    private final static String CANCLED_ORDER_FAIR_QUEUE = "cancled_order_queue";

    /**
     * @Method:
     * @Author:
     * @Description: 將退貨的訂單通過直連交換器(Direct Exchange)讓入消費隊里中
     * 訪問路徑:http://localhost:8080/mq/sendCancleOrderToRabbitMqQueue.do?orderNos=1111111,2222222
     * param:
     * @Return:
     * @Exception:
     * @Date: 2020/12/8 14:18
     */
    @RequestMapping("/sendCancleOrderToRabbitMqQueue")
    @ResponseBody
    public Object saveOrderInfoToQueue(HttpServletRequest request) {
        String orderNos = request.getParameter("orderNos");
        List orderList = null;
        if (StringUtils.isNotEmpty(orderNos)) {
            orderList = Arrays.asList(orderNos.split(","));
        }
        return directExchangeServiceImpl.productMessageByDirectExchange(orderList, CANCLED_ORDER_QUEUE);
    }


    /**
     * @Method:
     * @Author:
     * @Description: 通過直連交換器(Direct Exchange)消費 隊列 cancle_order_queue 消費消息
     * http://localhost:8080/mq/consumerQueueInfo.do
     * param:
     * @Return:
     * @Exception:
     * @Date: 2020/12/8 15:07
     */
    @RequestMapping("/consumerQueueOnNoFairWay")
    @ResponseBody
    public Object toConsumeQueueInfo() {
        return directExchangeServiceImpl.consumeMessageByDirectExchange(CANCLED_ORDER_QUEUE);
    }

    //http://localhost:8080/mq/saveOrderToQueueFairForward.do

    /**
     * @Method:
     * @Author:
     * @Description:  通過直連交換器(Direct Exchange)創建消費隊列,但是采用 RabbitMQ的公平轉發進行消費
     *
     * param:
     * @Return:
     * @Exception:
     * @Date: 2020/12/8 15:32
     */
    @RequestMapping("/sendCancleOrderToRabbitMqFairQueue")
    @ResponseBody
    public Object saveOrderToQueueFairForward(HttpServletRequest request) {
        String orderNos = request.getParameter("orderNos");
        List orderList = null;
        if (StringUtils.isNotEmpty(orderNos)) {
            orderList = Arrays.asList(orderNos.split(","));
        }
        return directExchangeServiceImpl.productMessQueueOnFairForward(CANCLED_ORDER_FAIR_QUEUE,orderList);
    }


    @RequestMapping("/consumerQueueOnFairWay")
    @ResponseBody
    public Object consumerQueueOnFairWay() {
        ResponseDTO responseDTO= directExchangeServiceImpl.consumerMessageOnFariWay(CANCLED_ORDER_FAIR_QUEUE);
        ResponseDTO responseDTO2=  directExchangeServiceImpl.consumerMessageOnFariWay(CANCLED_ORDER_FAIR_QUEUE);
        if(!responseDTO.isSuccess()){
            return responseDTO;
        }else if(!responseDTO2.isSuccess()){
            return responseDTO2;
        }
        return responseDTO2;
    }


}
View Code
        2)創建連接工具方法:RabbitMqConnectionUtils
package com.zj.weblearn.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
 * @Copyright (C), 2002-2020,
 * @ClassName: RabbitMqConnectionUtils
 * @Author:
 * @Date: 2020/9/23 9:42
 * @Description:
 * @History:
 * @Version:1.0
 */
public class RabbitMqConnectionUtils {

    public static Connection getConnection() {
        //創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置服務器地址
        factory.setHost(ReadPropertiesUtils1.getValue("rabbitmq.address"));
        //設置端口號
        factory.setPort(Integer.parseInt(ReadPropertiesUtils1.getValue("rabbitmq.port")));
        //設置用戶名
        factory.setUsername(ReadPropertiesUtils1.getValue("rabbitmq.username"));
        //設置密碼
        factory.setPassword(ReadPropertiesUtils1.getValue("rabbitmq.password"));
        //設置vhost
        factory.setVirtualHost("/");
        try {
            //創建連接
            return factory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }
}
View Code
        3)通過直連交換器生產和消費消息:DirectExchangeServiceImpl
package com.zj.weblearn.serviceImpl.rabbitmq;

import com.rabbitmq.client.*;
import com.zj.weblearn.enums.ErrorCodeEnum;
import com.zj.weblearn.utils.RabbitMqConnectionUtils;
import com.zj.weblearn.utils.ResponseDTO;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;

/*
 * @Copyright (C), 2002-2020,
 * @ClassName: DirectExchangeServiceImpl
 * @Author:
 * @Date: 2020/9/23 20:39
 * @Description:
 * @History:
 * @Version:1.0
 */
@Service
public class DirectExchangeServiceImpl {

    /**
     * @Method:
     * @Author:
     * @Description: 通過直連交換器(Direct Exchange)向 queueName 中生產消息
     * param:
     * @Return:
     * @Exception:
     * @Date: 2020/9/23 20:43
     */
    public ResponseDTO productMessageByDirectExchange(List<String> orderNos, String queueName) {
        ResponseDTO responseDTO = new ResponseDTO(ErrorCodeEnum.OK);
        //1、獲取連接
        Connection connection = RabbitMqConnectionUtils.getConnection();
        Channel channel = null;
        try {
            //2、創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();
            //3、聲明隊列 如果Rabbit中沒有此隊列將自動創建。
            //【方法入參:
            // 參數1:隊列的名稱;
            // 參數2:是否持久化,代表隊列在服務器重啟后是否還存在;
            // 參數3:是否獨占此鏈接,是否是排他性隊列。排他性隊列只能在聲明它的 Connection中使用(可以在同一個 Connection 的不同的 channel 中使用),連接斷開時自動刪除;
            // 參數4:隊列不再使用時是否自動刪除;
            // 參數5:隊列的其他屬性 Map<String, Object> arguments
            //
            channel.queueDeclare(queueName, false, false, false, null);
            //4、發送消息:
            // 【方法入參:
            // 參數1: Exchange的名稱,如果沒有指定,則使用Default Exchange;
            // 參數2:routingKey,消息的路由Key,是用於Exchange(交換機)將消息轉發到指定的消息隊列;
            // 參數3:消息包含的屬性;
            // 參數4:消息體
            ////這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定哪個默認的交換機,但是不能顯示綁定或解除綁定認的交換機,routingKey(路由鍵)等於隊列名稱
            for (String orderNo : orderNos) {
                channel.basicPublish("", queueName, null, orderNo.getBytes());
            }
            System.out.println("message send body orderNos:" + orderNos);
            channel.close();
            connection.close();
        } catch (IOException e) {
            responseDTO = new ResponseDTO(ErrorCodeEnum.FAIL_501);
            responseDTO.setErrorMsg(e.toString());
        } catch (TimeoutException e) {
            responseDTO = new ResponseDTO(ErrorCodeEnum.FAIL_501);
            responseDTO.setErrorMsg(e.toString());
        }
        return responseDTO;
    }

    /**
     * @Method:
     * @Author:
     * @Description: 通過直連交換器(Direct Exchange)消費 隊列queueName 消費消息
     * param:
     * @Return:
     * @Exception:
     * @Date: 2020/9/23 20:43
     */
    public ResponseDTO consumeMessageByDirectExchange(String queueName) {
        ResponseDTO responseDTO = new ResponseDTO(ErrorCodeEnum.OK);
        //1、得到連接
        Connection connection = RabbitMqConnectionUtils.getConnection();
        Channel channel = null;
        try {
            //2、創建一個通道
            channel = connection.createChannel();
            //3、定義消費方法
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    //得到交換機
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    //消息id
                    long deliveryTag = envelope.getDeliveryTag();
                    //消息內容
                    String message = new String(body, "utf-8");
                    System.out.println("Consumer consumption news>>" + message);
                }
            };
            //4、監聽隊列
            // 【方法入參:
            // 參數1: 隊列名稱;
            // 參數2: 是否自動確認收到
            //      設置為true表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,
            //      設置為false則需要手動回復;
            // 參數3: 消費消息的方法,消費者接收到消息后調用此方法
            //
            channel.basicConsume(queueName, true, consumer);
        } catch (IOException e) {
            responseDTO = new ResponseDTO(ErrorCodeEnum.FAIL_501);
            responseDTO.setData(e.toString());
        }
        return responseDTO;
    }


    /*
    * 目前消息轉發機制是平均分配,這樣就會出現倆個消費者,奇數的任務很耗時,偶數的任何工作量很小,造成的原因就是近當消息到達隊列進行轉發消息。並不在乎有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。
    為了解決這樣的問題,我們可以使用basicQos方法,傳遞參數為prefetchCount= 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條消息。
   換句話說,只有在消費者空閑的時候會發送下一條信息。調度分發消息的方式,也就是告訴RabbitMQ每次只給消費者處理一條消息,也就是等待消費者處理完畢並自己對剛剛處理的消息進行確認之后,才發送下一條消息,防止消費者太過於忙碌,也防止它太過去清閑。通過 設置channel.basicQos(1);
    * */
    public ResponseDTO productMessQueueOnFairForward(String queueName, List<String> orderNos) {
        ResponseDTO responseDTO = new ResponseDTO(ErrorCodeEnum.OK);
        //1、得到連接
        Connection connection = RabbitMqConnectionUtils.getConnection();
        //2、創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
        Channel channel = null;
        try {
            channel = connection.createChannel();
            //3、聲明隊列 如果Rabbit中沒有此隊列將自動創建
            channel.queueDeclare(queueName, false, false, false, null);

            channel.basicQos(1);// 保證一次只分發一次 限制發送給同一個消費者 不得超過一條消息
            for (String orderNo : orderNos) {
                //4、發送消息【參數說明:參數一:交換機名稱如果沒有指定,則使用Default Exchange;參數二:隊列名稱,參數三:消息的其他屬性-路由的headers信息;參數四:消息主體】
                channel.basicPublish("", queueName, null, orderNo.getBytes());
            }
            System.out.println("message send over :" + orderNos);
            channel.close();
            connection.close();
        } catch (IOException e) {
            responseDTO = new ResponseDTO(ErrorCodeEnum.FAIL_501);
            responseDTO.setData(e.toString());
        } catch (TimeoutException e) {
            responseDTO = new ResponseDTO(ErrorCodeEnum.FAIL_501);
            responseDTO.setData(e.toString());
        }
        return responseDTO;
    }


    /**
     * @Method:
     * @Author:
     * @Description: param:
     * @Return:
     * @Exception:
     * @Date: 2020/12/8 14:26
     */
    public ResponseDTO consumerMessageOnFariWay(String queueName) {
        ResponseDTO responseDTO = new ResponseDTO(ErrorCodeEnum.OK);
        //1、得到連接
        Connection connection = RabbitMqConnectionUtils.getConnection();
        //2、創建一個通道
        Channel channel = null;
        try {
            channel = connection.createChannel();
            //3、聲明隊列
            channel.basicQos(1);// 保證一次只分發一次 限制發送給同一個消費者 不得超過一條消息
            //4、定義消費方法
            Channel finalChannel = channel;
            DefaultConsumer consumer = new DefaultConsumer(finalChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    //得到交換機
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    //消息id
                    long deliveryTag = envelope.getDeliveryTag();
                    //消息內容
                    String message = new String(body, "utf-8");
                    System.out.println("消費者消費:" + message);
                    try {
                        //睡眠1s
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        // 手動回執消息
                        finalChannel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //5、監聽隊列(入參依次為:隊列名稱;設置為true表示消息接收到自動向mq回復接收到了,設置為false則需要手動回復;消費消息的方法,消費者接收到消息后調用此方法;)
            channel.basicConsume(queueName, false, consumer);
        } catch (IOException e) {
            responseDTO = new ResponseDTO(ErrorCodeEnum.FAIL_501);
            responseDTO.setData(e.toString());
        }
        return responseDTO;
    }


}
View Code
        4)通過廣播交換器生產和消費消息:FanoutExchangeServiceImpl
package com.zj.weblearn.serviceImpl.rabbitmq;

import com.rabbitmq.client.*;
import com.zj.weblearn.utils.RabbitMqConnectionUtils;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
 * @Copyright (C), 2002-2020,
 * @ClassName: FanoutExchangeServiceImpl
 * @Author:
 * @Date: 2020/9/23 20:26
 * @Description:
 * @History:
 * @Version:1.0
 */
@Service
public class FanoutExchangeServiceImpl {

    final String fanoutExchangeName = "fanoutExchange"; //交換器名稱(廣播交換器)

    /**
     * @Method:
     * @Author:
     * @Description: 通過廣播交換器(Fanout Exchange)生產消息
     * param:
     * @Return:
     * @Exception:
     * @Date: 2020/9/23 20:27
     */
    public boolean byFanoutExchangeProductMessage(String orderMsg, String queueName) {
        boolean productResult = false;
        //1、得到連接
        Connection connection = RabbitMqConnectionUtils.getConnection();
        Channel channel = null;
        try {
            //2、創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();
            //3、聲明交換器為廣播交換器(是一種發布訂閱交換器)
            channel.exchangeDeclare(fanoutExchangeName, "fanout");
            //4、聲明隊列 如果Rabbit中沒有此隊列將自動創建
            channel.queueDeclare(queueName, false, false, false, null);
            //5、發送消息
            channel.basicPublish(fanoutExchangeName, queueName, null, orderMsg.getBytes());
            System.out.println("message send body:" + orderMsg);
            try {
                channel.close();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            connection.close();
            productResult = true;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return productResult;
    }

    /**
     * @Method:
     * @Author:
     * @Description: 通過廣播交換器(Fanout Exchange)消費消息
     * param:
     * @Return:
     * @Exception:
     * @Date: 2020/9/23 20:30
     */
    public void byFanoutExchangeConsumeMessage() {
        //1、得到連接
        Connection connection = RabbitMqConnectionUtils.getConnection();
        Channel channel = null;
        try {
            //2、創建一個通道
            channel = connection.createChannel();
            //3、聲明fanout交換器
            channel.exchangeDeclare(fanoutExchangeName, "fanout");
            //4、聲明隊列
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, fanoutExchangeName, "");
            //5、定義消費方法
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                }
            };
            //6、監聽隊列(入參依次為:隊列名稱;設置為true表示消息接收到自動向mq回復接收到了,設置為false則需要手動回復;消費消息的方法,消費者接收到消息后調用此方法;)
            channel.basicConsume(queueName, true, consumer);//
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
View Code
        5)通過主題交換器生產和消費消息:TopicExchangeServiceImpl
package com.zj.weblearn.serviceImpl.rabbitmq;

import com.rabbitmq.client.*;
import com.zj.weblearn.utils.RabbitMqConnectionUtils;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
 * @Copyright (C), 2002-2020,
 * @ClassName: TopicExchangeServiceImpl
 * @Author:
 * @Date: 2020/9/23 20:33
 * @Description:
 * @History:
 * @Version:1.0
 */
@Service
public class TopicExchangeServiceImpl {

    final String topicExchangeName = "topicExchange"; //主題交換器名稱

    /**
     * @Method:
     * @Author:
     * @Description: 通過主題交換器(Topic Exchange)生產消息
     * param:
     * @Return:
     * @Exception:
     * @Date: 2020/9/23 20:27
     */
    public boolean byTopicExchangeProductMessage(String orderMsg, String queueName) {
        boolean productResult = false;
        //1、得到連接
        Connection connection = RabbitMqConnectionUtils.getConnection();
        Channel channel = null;
        try {
            //2、創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();
            //3、聲明交換器為主題交換器(是一種匹配訂閱模式)
            channel.exchangeDeclare(topicExchangeName, "topic");
            //4、聲明隊列 如果Rabbit中沒有此隊列將自動創建
            channel.queueDeclare(queueName, false, false, false, null);
            //5、發送消息
            channel.basicPublish(topicExchangeName, queueName, null, orderMsg.getBytes());
            System.out.println("message send body:" + orderMsg);
            try {
                channel.close();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            connection.close();
            productResult = true;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return productResult;
    }


    /**
     * @Method:
     * @Author:
     * @Description: 通過主題交換器(Topic Exchange)消費消息
     * param:
     * @Return:
     * @Exception:
     * @Date: 2020/9/23 20:27
     */
    public void byTopicExchangeConsumeMessage(){
        //1、得到連接
        Connection connection = RabbitMqConnectionUtils.getConnection();
        Channel channel = null;
        try {
            //2、創建一個通道
            channel = connection.createChannel();
            //3、聲明topic交換器
            channel.exchangeDeclare(topicExchangeName, "topic");
            //4、聲明隊列
            String queueName = channel.queueDeclare().getQueue();
            String routingKey = "#.error";
            channel.queueBind(queueName, topicExchangeName, routingKey);
            //5、定義消費方法
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(routingKey + "|接收消息 => " + message);
                }
            };
            //6、監聽隊列(入參依次為:隊列名稱;設置為true表示消息接收到自動向mq回復接收到了,設置為false則需要手動回復;消費消息的方法,消費者接收到消息后調用此方法;)
            channel.basicConsume(queueName, true, consumer);//
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
View Code
        6)相關的實體類:
import com.zj.weblearn.enums.BaseEnum;
import com.zj.weblearn.enums.ErrorCodeEnum;

import java.io.Serializable;

public class ResponseDTO<T> implements Serializable {

    private boolean success;

    private String errorCode;

    /**
     * 原因
     */
    private String errorMsg;

    /**
     * 返回數據值
     */
    private T data;


    public ResponseDTO() {
    }

    public ResponseDTO(ErrorCodeEnum errorCode) {
        this.errorCode = errorCode.getErrorCode();
        this.errorMsg = errorCode.getErrorMsg();
        if(!"0".equals(errorCode.getErrorCode())){
            success=false;
        }else{
            success=true;
        }
    }

    public ResponseDTO(String errorCode, String errorMsg) {
        this.errorCode = errorCode;
        this.errorMsg = errorMsg;
        if(!"0".equals(errorCode)){
            success=false;
        }else{
            success=true;
        }
    }


    public void setErrorCodeEnum(BaseEnum errorCode) {
        this.errorCode = errorCode.getErrorCode();
        this.errorMsg = errorCode.getErrorMsg();
        if(!"0".equals(errorCode.getErrorCode())){
            success=false;
        }else{
            success=true;
        }
    }

    public String getErrorCode() {
        return errorCode;
    }

    public void setErrorCode(String errorCode) {
        this.errorCode = errorCode;
    }

    public String getErrorMsg() {
        return errorMsg;
    }

    public void setErrorMsg(String errorMsg) {
        this.errorMsg = errorMsg;
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    public boolean isSuccess() {
        return success;
    }

    public void setSuccess(boolean success) {
        this.success = success;
    }

    @Override
    public String toString() {
        return "ResponseDTO{" +
                "errorCode='" + errorCode + '\'' +
                ", errorMsg='" + errorMsg + '\'' +
                ", data=" + data +
                '}';
    }

}

public enum ErrorCodeEnum implements BaseEnum{

    OK("0", "成功"),
    FAIL_500("500", "系統開小差了,請稍后再試!"),
    FAIL_501("501", "服務異常,請聯系管理員處理!"),
    PARAM_ERROR("502", "入參異常,請檢查后重試!");

    private String errorCode;
    private String errorMsg;

    ErrorCodeEnum(String errorCode, String errorMsg) {
        this.errorCode = errorCode;
        this.errorMsg = errorMsg;
    }

    ErrorCodeEnum(BaseEnum errorCodeEnum) {
        this.errorCode = errorCodeEnum.getErrorCode();
        this.errorMsg = errorCodeEnum.getErrorMsg();
    }


    public String getErrorCode() {
        return errorCode;
    }

    public String getErrorMsg() {
        return errorMsg;
    }
}
View Code
4、RabbitMQ的五種形式隊列
1)簡單隊列(點對點):一個生產者P發送消息到隊列Q,一個消費者C接收。
點對點模式(一對一模式):一個生產者投遞消息給隊列,只能允許有一個消費者進行消費。如果是消費集群的話,會進行均攤消費。
推(由隊列指向消費者):消費者已經啟動了,建立長連接,一旦生產者向隊列投遞消息會立馬推送給消費者;
取(由消費者指向隊列):生產者先投遞消息到隊列進行緩存,這時候消費者在啟動的時候就向隊列中獲取消息
隊列:以先進先出原則存放消息集合;
2)工作(公平性)隊列模式
work queues與簡單隊列相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息。
應用場景:對於 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。
公平隊列原理:隊列服務器向消費者發送消息的時候,消費者采用手動應答模式,隊列服務器必須要收到消費者發送ack結果通知,才會發送下一個消息。
默認消費者集群為均攤消費。假設生產者向隊列發送10個消息,消息1和2都各自消費5個,保證消費唯一。
思考:均攤消費弊端-如果每個消費者處理消息的業務時間情況不相同,可能對消息處理比較慢的消費者不公平。應該能這多勞,誰消費快,就讓其多消費消息。
3)發布/訂閱模式(Publish/Subscribe):這個可能是消息隊列中最重要的隊列了,其他的都是在它的基礎上進行了擴展。發布訂閱模式說明:
(1)一個生產者,多個消費者
(2)每一個消費者都有自己的一個隊列,並對其進行監聽;
(3)生產者沒有直接發消息到隊列中,而是發送到交換機;
(4)每個消費者的隊列都綁定到交換機上;
(5)消息通過交換機到達每個消費者的隊列 該模式就是Fanout Exchange(廣播交換機)將消息路由給綁定到它身上的所有隊列 以用戶發郵件案例講解
注意:交換機沒有存儲消息功能,如果消息發送到沒有綁定消費隊列的交換機,消息則丟失。生產者將消息發給broker(消息隊列服務),由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收到消息;
4)路由模式Routing:
(1)每個消費者監聽自己的隊列,並且設置routingkey。
(2)生產者發送消息到交換機並指定一個路由key,消費者隊列綁定到交換機時要指定路由key(key匹配就能接受消息,key不匹配就不能接受消息)
5)通配符模式Topics
(1)每個消費者監聽自己的隊列,並且設置帶統配符的routingkey。
(2)生產者P發送消息到交換機X,type=topic,交換機根據綁定隊列的routing key的值進行通配符匹配,由交換機根據routingkey來轉發消息到指定的隊列。
符號#:匹配一個或者多個詞error.# 可以匹配error.order或者error.order.cancle
符號*:只能匹配一個詞error.* 可以匹配error.order或者error.ceshi

參看文章:https://www.cnblogs.com/vipstone/p/9295625.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM