對於消息發布者而言它只負責把消息發布出去,甚至它也不知道消息是發到哪個queue,消息通過exchange到達queue,exchange的職責非常簡單,就是一邊接收發布者的消息一邊把這些消息推到queue中。
而exchange是怎么知道消息應該推到哪個queue呢,這就要通過綁定queue與exchange時的routingkey了,通過代碼進行綁定並且指定routingkey,下面有一張關系圖,p(發布者) —> x(exchange) bindding(綁定關系也就是我們的routingkey) 紅色代表着queue

在消息的生產者端:
@Component
public class RabbitOrderSender {
//自動注入RabbitTemplate模板類
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private BrokerMessageLogMapper brokerMessageLogMapper;
//回調函數: confirm確認
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
String messageId = correlationData.getId();
if(ack){
//如果confirm返回成功 則進行更新
brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
} else {
//失敗則進行具體的后續操作:重試 或者補償等手段
System.err.println("異常處理...");
}
}
};
//發送消息方法調用: 構建自定義對象消息
public void sendOrder(Order order) throws Exception {
// 通過實現 ConfirmCallback 接口,消息發送到 Broker 后觸發回調,確認消息是否到達 Broker 服務器,也就是只確認是否正確到達 Exchange 中
rabbitTemplate.setConfirmCallback(confirmCallback);
//消息唯一ID
CorrelationData correlationData = new CorrelationData(order.getMessageId());
rabbitTemplate.convertAndSend("order-exchange", "order.ABC", order, correlationData);
}
}
利用rabbitTemplate(import org.springframework.amqp.rabbit.core.RabbitTemplate;需要在pom.xml中導入amqp的依賴)的convertAndSend方法就可以發送,這里order-exchange為交換機exchange,order.ABC為routingKey,並沒有指定對應消息需要發往哪個隊列,還有指定消息回調。
在消息的消費者端:
@Component
public class OrderReceiver {
//配置監聽的哪一個隊列,同時在沒有queue和exchange的情況下會去創建並建立綁定關系
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true"),
exchange = @Exchange(name="order-exchange",durable = "true",type = "topic"),
key = "order.*"
)
)
@RabbitHandler//如果有消息過來,在消費的時候調用這個方法
public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel) throws IOException {
//消費者操作
System.out.println("---------收到消息,開始消費---------");
System.out.println("訂單ID:"+order.getId());
/**
* Delivery Tag 用來標識信道中投遞的消息。RabbitMQ 推送消息給 Consumer 時,會附帶一個 Delivery Tag,
* 以便 Consumer 可以在消息確認時告訴 RabbitMQ 到底是哪條消息被確認了。
* RabbitMQ 保證在每個信道中,每條消息的 Delivery Tag 從 1 開始遞增。
*/
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
/**
* multiple 取值為 false 時,表示通知 RabbitMQ 當前消息被確認
* 如果為 true,則額外將比第一個參數指定的 delivery tag 小的消息一並確認
*/
boolean multiple = false;
//ACK,確認一條消息已經被消費。不然的話,在rabbitmq首頁會有Unacked顯示為未處理數1.
channel.basicAck(deliveryTag,multiple);
}
}
消費者需要指定監聽的隊列,routingkey,和exchage,如果在localhost:15672的rabbitmq的首頁沒有手動創建,@RabbitListener會自動幫我們創建的並綁定關系。rabbitmq的routingkey還可以用來過濾從隊列中取的的信息。
