代碼整合
- maven依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
- yml配置
server:
port: 8021
spring:
#配置rabbitMq 服務器
rabbitmq:
host: 192.168.100.120
port: 5672
username: test
password: test
#Vhost
virtual-host: testRabbit
- 配置DirectConfig
@Configuration
public class DirectConfig {
/**
* 交換機
*/
public static final String DESTINATION_NAME = "rabbitMq_direct";
/**
* 隊列名稱
*/
public static final String SMS_QUEUE = "Sms_msg";
public static final String EMAIL_QUEUE = "Email_msg";
//RouteKey
public static final String SMS_ROUTINGKEY = "sms";
public static final String EMAIL_ROUTINGKEY = "email";
//配置隊列
@Bean
public Queue smsDirectQueue() {
return new Queue(SMS_QUEUE, true);
}
@Bean
public Queue emailDirectQueue() {
return new Queue(EMAIL_QUEUE, true);
}
//配置交換機
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DESTINATION_NAME);
}
//交換機與隊列綁定
@Bean
Binding smsBindingDirect() {
return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with(SMS_ROUTINGKEY);
}
@Bean
Binding emailBindingDirect() {
return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with(EMAIL_ROUTINGKEY);
}
}
- 發送方配置
@RestController
@Slf4j
public class DirectSendController {
@Autowired
private RabbitTemplate template;
@GetMapping("/sendSms")
private String sendSms(@RequestParam("msg") String message) throws Exception {
User user = new User();
user.setId(UUID.randomUUID().toString().replace("-", ""));
user.setPassword("sendSms");
user.setUsername("sendSms");
user.setMsg(message);
String userJson = JSON.toJSONString(user);
log.info("sendSms:{}", userJson);
//發送的時候需要指定隊列 指定的交換機,指定的ROUTINGKEY
template.convertAndSend(DirectConfig.DESTINATION_NAME, DirectConfig.SMS_ROUTINGKEY, userJson);
return "OK,sendSms:" + message;
}
@GetMapping("/sendEmail")
private String sendEmail(@RequestParam("msg") String message) throws Exception {
User user = new User();
user.setId(UUID.randomUUID().toString().replace("-", ""));
user.setPassword("sendEmail");
user.setUsername("sendEmail");
user.setMsg(message);
String userJson = JSON.toJSONString(user);
log.info("sendEmail:{}", userJson);
//發送的時候需要指定隊列 指定的交換機,指定的ROUTINGKEY
template.convertAndSend(DirectConfig.DESTINATION_NAME, DirectConfig.EMAIL_ROUTINGKEY, user);
return "OK,sendEmail:" + message;
}
}
- 消費者
@Component
@Slf4j
public class DirectConsumer {
@RabbitListener(queues = DirectConfig.SMS_QUEUE)
public void sms_msg(Message message) throws IOException {
System.out.println("sms_msg消費者收到消息 : "+new String(message.getBody(),"UTF-8"));
}
@RabbitListener(queues = DirectConfig.EMAIL_QUEUE)
public void email_msg(User user) {
System.out.println("email_msg消費者收到消息 : "+JSON.toJSONString(user));
}
}
其他的幾種方式都類似。
消息的手動簽收、消息退回、消息的回調
- yml增加
# 是否開啟消息確認機制
publisher-confirms: true
# 開啟消息發送到隊列失敗返回
publisher-returns: true
- 增加RabbitMq配置
@Configuration
public class RabbitMqConfig {
/**
* 使用SimpleMessageListenerContainer容器設置消費隊列監聽
* 不使用@RabbitListener注解
*/
// @Bean
// public SimpleMessageListenerContainer simpleMessageListenerContainer() {
// SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
// simpleMessageListenerContainer.setConnectionFactory(connectionFactory());
// simpleMessageListenerContainer.setMessageListener(new MessageListener() {
// @Override
// public void onMessage(Message message) {
//
// }
// });
// //simpleMessageListenerContainer.setQueueNames("","");
// //impleMessageListenerContainer.addQueueNames();
// //手動確認
// simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// return simpleMessageListenerContainer;
// }
/**
* @return
* @RabbitListener注解指定目標方法來作為消費消息的方法,通過注解參數指定所監聽的隊列或者Binding。使用@RabbitListener可以設置一個自己明確默認值的RabbitListenerContainerFactory對象。
*/
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
new SimpleRabbitListenerContainerFactory();
//這個connectionFactory就是我們自己配置的連接工廠直接注入進來
simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
//這邊設置消息確認方式由自動確認變為手動確認
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//設置消息預取數量
// simpleRabbitListenerContainerFactory.setPrefetchCount(1);
return simpleRabbitListenerContainerFactory;
}
//如果是單例的
/**
* 每個rabbitTemplate方法只可以有一個回調,不然會報錯 only one ConfirmCallback is supported by each RabbitTemplate,解決辦法是配成多利的
*
* @param connectionFactory
* @return
*/
@Bean
// @Scope("prototype")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
//成功回調
template.setConfirmCallback(new Callback());
// 開啟mandatory模式(開啟失敗回調)
template.setMandatory(true);
//失敗回調
template.setReturnCallback(new Callback());
return template;
}
- 增加回調類
public class Callback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: " + "相關數據:" + correlationData);
System.out.println("ConfirmCallback: " + "確認情況:" + ack);
System.out.println("ConfirmCallback: " + "原因:" + cause);
if (ack) {
System.out.println("消息發送確認成功");
} else {
System.out.println("消息發送確認失敗");
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//失敗的回調
try {
System.out.println("ReturnCallback: " + "消息:" + new String(message.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("ReturnCallback: " + "回應碼:" + replyCode);
System.out.println("ReturnCallback: " + "回應信息:" + replyText);
System.out.println("ReturnCallback: " + "交換機:" + exchange);
System.out.println("ReturnCallback: " + "路由鍵:" + routingKey);
}
}
- 發送方增加一個CorrelationData
每個發送的消息都需要配備一個 CorrelationData 相關數據對象,CorrelationData 對象內部只有一個 id 屬性,用來表示當前消息唯一性。
CorrelationData correlationData = new CorrelationData(id);
template.convertAndSend(DirectConfig.DESTINATION_NAME, DirectConfig.SMS_ROUTINGKEY, userJson,correlationData);
- 消費方
@RabbitListener(queues = DirectConfig.SMS_QUEUE, containerFactory = "simpleRabbitListenerContainerFactory")
public void sms_msg(Message message, Channel channel, @Headers Map<String, Object> headers) throws IOException {
long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
try {
System.out.println("sms_msg消費者收到消息 : " + new String(message.getBody(), "UTF-8"));
/**
* 手動ack
* deliveryTag:該消息的index
* multiple:是否批量.true:將一次性ack所有小於deliveryTag的消息。
*/
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//消息退回 (可以在可視化界面看到)
//批量退回 退回之后重回消息隊列 true false的話就是丟棄這條信息,如果配置了死信隊列,那這條消息會進入死信隊列
channel.basicNack(deliveryTag, false, true);
//單條退回 channel.basicReject();
}
}