概述
fanout扇出也稱之為廣播
在廣播模式下,消息發送的流程是這樣的,如下所示:
- 可以有多個消費者。
- 每個消費者有自己的 queue(隊列)
- 每個隊列都要綁定到 Exchange(交換機)
- 生產者發送的消息,只能發送到交換機,交換機來決定要發給哪個隊列,生產者無法決定。
- 交換機把消息發送給綁定過的所有隊列。
- 隊列的消費者都能拿到消息。實現一條消息被多個消費者消費。
創建生產者
/**
* @author: BNTang
*/
public class Producer {
@Test
public void sendMessage() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
// 創建通道
Channel channel = connection.createChannel();
// 設置交換機
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
// 向交換機發消息
channel.basicPublish("logs", "", null, ("我是個 fanout 類型的消息").getBytes());
RabbitMQUtil.closeChannelAndConnection(channel, connection);
System.out.println("消息發送成功");
}
}
創建消費者 1
/**
* @author BNTang
*/
public class Consumer1 {
@Test
public void receiveMessage() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
// 得到通道
Channel channel = connection.createChannel();
// 綁定交換機
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
// 從通道里面得到一個臨時的隊列
String queue = channel.queueDeclare().getQueue();
// 把臨時隊列和交換機進行綁定
channel.queueBind(queue, "logs", "");
// 接收消息
channel.basicConsume(queue, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者【1】接收到消息" + new String(body));
}
});
System.out.println("消費者【1】啟動成功");
System.in.read();
}
}
創建消費者 2
/**
* @author BNTang
*/
public class Consumer2 {
@Test
public void receiveMessage() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
// 得到通道
Channel channel = connection.createChannel();
// 綁定交換機
channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
// 從通道里面得到一個臨時隊列
String queue = channel.queueDeclare().getQueue();
// 把臨時的隊列和交換機進行綁定
channel.queueBind(queue, "logs", "");
// 接收消息
channel.basicConsume(queue, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者【2】接收到消息" + new String(body));
}
});
System.out.println("消費者【2】啟動成功");
System.in.read();
}
}
測試方式,先啟動消費者1,和消費者2,在啟動消息生產者即可。