添加rabbitmq的依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在配置文件中添加必要的配置信息
spring.rabbitmq.host=192.168.0.86 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456
好了,基本的配置就已經配置完畢了
rabbitmq有六種模式

我們逐個來看springboot是怎么實現的呢
1.hello world

P代表生產者,C代表消費者,紅色代碼消息隊列。P將消息發送到消息隊列,C對消息進行處理。
我們先創建一個隊列
@Bean
public Queue Queue() {
return new Queue("hello");
}
然后我再創建一個生產者
@Controller
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}
再創建一個消費者
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
再寫一個測試用例看看
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
private HelloSender helloSender;
@Test
public void hello() throws Exception {
helloSender.send();
}
}

成功!
2.工作模式(競爭)

一個消息產生者,多個消息的消費者。競爭搶消息
我們先創建一個隊列
@Bean
public Queue Queue2() {
return new Queue("neo");
}
再創建一個消息生產者
@Controller
public class NeoSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(int i) {
String context = "spirng boot neo queue"+" ****** "+i;
System.out.println("Sender1 : " + context);
this.rabbitTemplate.convertAndSend("neo", context);
}
}
再創建兩個消息的消費者
1 @Component
2 @RabbitListener(queues = "neo")
3 public class NeoReceiver1 {
4 @RabbitHandler
5 public void process(String neo) {
6 System.out.println("Receiver 1: " + neo);
7 }
8 }
9
10
11
12 @Component
13 @RabbitListener(queues = "neo")
14 public class NeoReceiver2 {
15 @RabbitHandler
16 public void process(String neo) {
17 System.out.println("Receiver 2: " + neo);
18 }
19
20 }
我們寫一個測試用例
@Test
public void oneToMany() throws Exception {
for (int i=0;i<100;i++){
// Thread.sleep(10);
neoSender.send(i);
}
}
運行


可以看到消息均勻的被兩個消費者消費了。
通過這個例子我們可以看做高並發情況下的消息產生和消費,這會產生一個消息丟失的問題。萬一客戶端在處理消息的時候掛了,那這條消息就相當於被浪費了,針對這種情況,rabbitmq推出了消息ack機制,熟悉tcp三次握手的一定不會陌生。
我們看看springboot是實現ack的
很簡單,在我們的配置類中,配置一個新的消費者,將原先的消費者先都去掉:
@Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(Queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//消息確認后才能刪除
container.setPrefetchCount(5);//每次處理5條消息
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("消費端接收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
但這里會有個問題,test模式下消息發送完畢系統就會直接shutdown,所以只能消費部分消息,不過等真正啟動項目,這個問題就不存在了。
3.發布訂閱模式

生產者將消息不是直接發送到隊列,而是發送到X交換機,然后由交換機發送給兩個隊列,兩個消費者各自監聽一個隊列,來消費消息。
這種方式實現同一個消息被多個消費者消費。工作模式是同一個消息只能有一個消費者。
我們新建三個隊列
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
再新建一個交換機
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
再把這些隊列綁定到交換機上去
@Bean
Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
基本的配置完成后,再新建一個消息生產者
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}
}
同樣的,我們再新建三個消息消費者
1 @Component
2 @RabbitListener(queues = "fanout.A")
3 public class FanoutReceiveA {
4
5 @RabbitHandler
6 public void process(String message) {
7 System.out.println("fanout Receiver A : " + message);
8 }
9 }
10
11 @Component
12 @RabbitListener(queues = "fanout.B")
13 public class FanoutReceiverB {
14 @RabbitHandler
15 public void process(String message) {
16 System.out.println("fanout Receiver B: " + message);
17 }
18 }
19
20 @Component
21 @RabbitListener(queues = "fanout.C")
22 public class FanoutReceiverC {
23 @RabbitHandler
24 public void process(String message) {
25 System.out.println("fanout Receiver C: " + message);
26 }
27 }
三個消費者分別監聽3個隊列的內容
新建一個測試用例:
@RunWith(SpringRunner.class)
@SpringBootTest
public class FanoutTest {
@Autowired
private FanoutSender fanoutSender;
@Test
public void setFanoutSender(){
fanoutSender.send();
}
}

三個隊列都接受到了消息
4:路由模式

需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配,這是一個完整的匹配。
5.主題模式
發送端不只按固定的routing key發送消息,而是按字符串匹配發送,接收端同樣如此
符號#匹配一個或多個詞,符號*匹配不多不少一個詞。

4/5兩者模式很相似,我們放在一起演示
新建兩個隊列
final static String message = "topic.A";
final static String messages = "topic.B";
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
新建一個交換機
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
綁定隊列到交換機上,路由模式,需要完整匹配topic.message,才能接受
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
topic模式,前綴匹配到topic.即可接受
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
我們新建三個消息生產者
@Component
public class TopicSend {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, i am message all";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context);
}
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context);
}
}
send的key是topic.1 send1的key是topic.message,send2的key是topic.messages
所以理論上send會被兩個隊列消費,1.2都應該只有一個隊列消費
我們再新建兩個消費者
@Component
@RabbitListener(queues = "topic.A")
public class TopicReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver1 : " + message);
}
}
@Component
@RabbitListener(queues = "topic.B")
public class TopicReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver2 : " + message);
}
}
寫三個測試用例
@RunWith(SpringRunner.class)
@SpringBootTest
public class TopicTest {
@Autowired
private TopicSend sender;
@Test
public void topic() throws Exception {
sender.send();
}
@Test
public void topic1() throws Exception {
sender.send1();
}
@Test
public void topic2() throws Exception {
sender.send2();
}
}
send的運行結果

send1的運行結果

send2的運行結果

結果符合預期。
轉自:https://www.cnblogs.com/xmzJava/p/8036591.html

