RabbitMQ 默認采用輪詢的方式分發消息,當一個消息需要有多個消費者都消費時,需要創建多個隊列實現,示例如下:
@Component
public class SimpleConsume {
@RabbitListener(
bindings =
@QueueBinding(
value = @Queue(durable = "true"), // 不指定隊列名稱,系統會自動生成隊列名
exchange = @Exchange(value = "simpleExchange",
type = "topic"),
key = "simple.*"
)
)
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消費端Payload: " + message.getPayload());
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
@SpringBootTest
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendSimpleMessage() {
String exchange = "simpleExchange";
String routingKey = "simple.message";
for(int i=0;i<5;i++) {
rabbitTemplate.convertAndSend(exchange, routingKey, "hello simpleExchange ===="+ i);
}
}
}
啟動兩個端口不同的項目:執行 sendSimpleMessage 方法,查看控制台輸出:
端口 8080 控制台:
--------------------------------------
消費端Payload: hello simpleExchange ====0
--------------------------------------
消費端Payload: hello simpleExchange ====1
--------------------------------------
消費端Payload: hello simpleExchange ====2
--------------------------------------
消費端Payload: hello simpleExchange ====3
--------------------------------------
消費端Payload: hello simpleExchange ====4
端口 8081 控制台:
--------------------------------------
消費端Payload: hello simpleExchange ====0
--------------------------------------
消費端Payload: hello simpleExchange ====1
--------------------------------------
消費端Payload: hello simpleExchange ====2
--------------------------------------
消費端Payload: hello simpleExchange ====3
--------------------------------------
消費端Payload: hello simpleExchange ====4
可以看到兩個消費者都可以消費到所有消息了。