Spring AMQP使用整理


RabbitAdmin功能

RabbitAdmin類用來管理RabbitMQ;

 

創建方法:

ConnectionFactory connectionFactory = new CachingConnectionFactory();

RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);

  

  • declareExchange:創建交換機
  • deleteExchange:刪除交換機
  • declareQueue:創建隊列
  • deleteQueue:刪除隊列
  • purgeQueue:清空隊列
  • declareBinding:新建綁定關系
  • removeBinding:刪除綁定關系
  • getQueueProperties:查詢隊列屬性

 

CachingConnectionFactory是Spring AMQP下一個連接工廠,適合SpringBoot的深度整合的連接工廠;

從構造方法出發

org.springframework.amqp.rabbit.connection.CachingConnectionFactory#CachingConnectionFactory()

 

最終會調用這個有參構造方法

org.springframework.amqp.rabbit.connection.CachingConnectionFactory#CachingConnectionFactory(java.lang.String, int)

 

CachingConnectionFactory最終會調用RabbitMQ的原生API,對RabbitMQ Client包下的ConnectionFactory的包裝;

 

對於下面的這個寫法,這個ConnectionFactory是Spring AMQP包下的,它繼承了AbstractConnectionFactory,而AbstractConnectionFactory實現了Spring AMQP包下的ConnectionFactory;如下圖;

ConnectionFactory connectionFactory = new CachingConnectionFactory();

 

org.springframework.amqp.core.Exchange實現類,每種實現對應一種交換機類型,如下圖;

 

org.springframework.amqp.core.Queue#Queue(java.lang.String) 對應的參數,如下圖;

 

org.springframework.amqp.core.Binding#Binding 對應的參數,如下圖;

 

org.springframework.amqp.rabbit.core.RabbitAdmin#declareExchange RabbitAdmin使用該方法將交換機進行綁定,使用了RabbitTemplate對RabbitMQ Client包進行了封裝;

 

最終在org.springframework.amqp.rabbit.core.RabbitAdmin#declareExchanges會有調用RabbitMQ Client的原生API,如下圖;

 

測試代碼

@Slf4j
@Configuration
public class RabbitConfig {

	@PostConstruct
	public void initRabbit() {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
		connectionFactory.setHost("192.168.211.135");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/dev");
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("password");

		RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);

		Exchange exchange = new TopicExchange("exchange.order");
		// 創建交換機
		rabbitAdmin.declareExchange(exchange);

		Queue queue = new Queue("queue.order");
		// 創建隊列
		rabbitAdmin.declareQueue(queue);

		Binding binding = new Binding(
				"queue.order",
				Binding.DestinationType.QUEUE,
				"exchange.order",
				"key.order",
				null
		);
		// 新建綁定關系
		rabbitAdmin.declareBinding(binding);
	}
}

  

服務重啟后,在RabbitMQ的管控台會出現新建的隊列,交換機;

 

RabbitAdmin聲明式配置

  • 將Exchange,Queue,Binding聲明為Bean;
  • 再將RabbitAdmin聲明為Bean;
  • Exchange,Queue,Binding即可自動創建;

代碼如下:

@Slf4j
@Configuration
public class RabbitConfig {

	@Bean
	public ConnectionFactory connectionFactory() {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
		connectionFactory.setHost("192.168.211.135");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/dev");
		connectionFactory.setUsername("admin");
		connectionFactory.setPassword("password");
//		connectionFactory.createConnection();
		return connectionFactory;
	}

	@Bean
	public Exchange exchange() {
		Exchange exchange = new TopicExchange("exchange.order");
		return exchange;
	}

	@Bean
	public Queue queue() {
		Queue queue = new Queue("queue.order");
		return queue;
	}

	@Bean
	public Binding binding() {
		Binding binding = new Binding(
				"queue.order",
				Binding.DestinationType.QUEUE,
				"exchange.order",
				"key.order",
				null
		);
		return binding;
	}

	@Bean
	public RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) {
		RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
		// 自動創建打開
		rabbitAdmin.setAutoStartup(true);
		return rabbitAdmin;
	}

}

  不過@Bean這種注入方式有點懶加載的意思,就是當你注入的Bean沒有被使用時,它是不會被加載的;上面配置的Bean如果沒有在其他地方被使用,這些Bean是不會被加載;

 

org.springframework.amqp.rabbit.core.RabbitAdmin類中實現了ApplicationContextAware,InitializingBean接口;之前講過ApplicationContextAware是可以獲取Spring容器的上下文,從而可以注入別的組件注入到該對象;InitializingBean是Bean創建后進行初始化的操作,此時是Bean對象已經創建;

 

在RabbitAdmin實現的InitializingBean接口的afterPropertiesSet方法,有如下一個操作;

 

org.springframework.amqp.rabbit.connection.CachingConnectionFactory#addConnectionListener

在RabbitMQ進行連接的時候會回調添加的listener;

 

在RabbitAdmin實現的afterPropertiesSet方法,最終會調用一個initialize方法;

 

org.springframework.amqp.rabbit.core.RabbitAdmin#initialize

 通過applicationContext獲取到所有的Exchange,Queue,Binding類型的Bean;

 

最終會創建相應的 Exchange,Queue,Binding;

 

當ConnectionFactory執行createConnection,相應的 Exchange,Queue,Binding通過Spring容器被創建;

connectionFactory.createConnection();

 

 

RabbitTemplate

  • RabbitTemplate與RestTemplate類似,使用了模板方法設計模式;
  • RabbitTemplate提供了豐富的功能,方便消息收發;
  • RabbitTemplate可以顯式傳入配置也可以隱式聲明配置;

 

顯式傳入配置,使用無參構造方法是不能拿到連接工廠的;

 

顯式傳入配置時,應選用帶有參數的構造方法;

 

顯式配置如下:

@Bean
public ConnectionFactory connectionFactory() {
	CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
	connectionFactory.setHost("192.168.211.135");
	connectionFactory.setPort(5672);
	connectionFactory.setVirtualHost("/dev");
	connectionFactory.setUsername("admin");
	connectionFactory.setPassword("password");
	connectionFactory.createConnection();
	return connectionFactory;
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
	RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
	return rabbitTemplate;
}

  

聲明Exchange,Queue,Binding;

@Bean
public Exchange exchange() {
	Exchange exchange = new TopicExchange("exchange.order");
	return exchange;
}

@Bean
public Queue queue() {
	Queue queue = new Queue("queue.order");
	return queue;
}

@Bean
public Binding binding() {
	Binding binding = new Binding(
			"queue.order",
			Binding.DestinationType.QUEUE,
			"exchange.order",
			"key.order",
			null
	);
	return binding;
}

  

RabbitTemplate中發送消息的方法有send和convertAndSend;

關於send方法的使用如下:

send方法使用需要將要發送的消息轉換為字節流;

Message message = new Message("test".getBytes(), null);
rabbitTemplate.send("exchange.order", "key.order", message);

 

上面代碼運行后會報錯:

 

根據異常追蹤到拋NPE的位置:

 

將上面的Message創建修改如下,消息能正常發出;

Message message = new Message("test".getBytes(), new MessageProperties());

 

關於convertAndSend方法使用如下:

rabbitTemplate.convertAndSend("exchange.order", "key.order", "test2");

 

org.springframework.amqp.rabbit.core.RabbitTemplate#send(java.lang.String, java.lang.String, org.springframework.amqp.core.Message, org.springframework.amqp.rabbit.connection.CorrelationData)

 

 execute方法里面傳入了一個lambda對象,里面的channel指的是RabbitMQ Client的Channel對象;

 

 

org.springframework.amqp.rabbit.core.RabbitTemplate#doSend最終會調用sendToRabbit方法;

org.springframework.amqp.rabbit.core.RabbitTemplate#sendToRabbit

 

而在convertAndSend方法中不需要對消息內容進行字節流,因為在convertAndSend方法最終會調send方法,在send方法傳入的消息內容的參數做了轉換;

 

配置發送端確認和消息返回

confirmCallback為生產者投遞消息后,如果Broker收到消息后,會給生產者一個ACK;生產者通過ACK,可以確認這條消息是否正常發送到Broker,即發送端確認機制;

returnCallback為交換機到隊列不成功,返回給消息生產者,觸發returnCallback,即消息返回機制;

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
	RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
	// 換機處理消息到路由失敗,則會返回給生產者
	rabbitTemplate.setMandatory(true);

	// 交換機到隊列不成功,返回給消息生產者,觸發returnCallback
	rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
		@Override
		public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
			log.info("message:{} replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",
					message, replyCode, replyText, exchange, routingKey);
		}
	});

	// 生產者投遞消息后,如果Broker收到消息后,會給生產者一個ACK。生產者通過ACK,可以確認這條消息是否正常發送到Broker
	rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
		@Override
		public void confirm(CorrelationData correlationData, boolean ack, String cause) {
			log.info("correlationData:{}, ack:{}, cause:{}", correlationData, ack, cause);
		}
	});
	return rabbitTemplate;
}

配置完后,服務重啟,發送消息,confirmCallback並沒有回調;

主要原因是這里用的ConnectionFactory是自定義配置注入的,不是Spring Boot配置注入的,如果需要使用Spring Boot配置注入,需要application.properties配置文件配置RabbitMQ的連接信息;

 

RabbitTemplate類有一個publisherConfirms的成員屬性;

org.springframework.amqp.rabbit.core.RabbitTemplate#publisherConfirms,通過connectionFactory.isPublisherConfirms()控制publisherConfirms;

 

在ConnectionFactory配置添加setPublisherConfirms,不過該方法從Spring Boot2.2已棄用,需要使用setPublisherConfirmType方法;

setPublisherConfirms對應application.properteise中確認消息發送成功,通過實現ConfirmCallBack接口,消息發送到交換器Exchange后觸發回調的配置是:

spring.rabbitmq.publisher-confirms=true

 

 

SIMPLE值發布消息成功到交換器后會觸發回調方法,但是無法得知是被確認的是哪條消息;

NONE值是禁用發布確認模式,是默認值;

CORRELATED值也是發布消息成功到交換器后會觸發回調方法,發送的消息使用CorrelationData進行相關聯;

 

配置成SIMPLE

connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
connectionFactory.setPublisherReturns(true);

效果如下:

 

配置成CORRELATED

connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(true);

發送方修改如下:

CorrelationData correlationData = new CorrelationData();
correlationData.setId("test2_Id");
rabbitTemplate.convertAndSend("exchange.order", "key.order", "test2", correlationData);

執行如下:

 

SimpleMessageListenerContainer簡單消息監聽容器

  • 設置同時監聽多個隊列,自動啟動,自動配置RabbitMQ
  • 設置消費者數量
  • 設置消息確認模式,是否重回隊列,異常捕獲
  • 設置是否獨占,其他線程消費屬性等
  • 設置具體的監聽器,消息轉換器等
  • 支持動態設置,運行中修改監聽器配置

 

使用如下:

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
	SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

	// 設置監聽的隊列
	container.setQueueNames("queue.order");
	// 設置並發的消費者
	container.setConcurrentConsumers(3);
	// 設置最大並發的消費者
	container.setMaxConcurrentConsumers(3);
	// 設置確認方式
	container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
	// 設置消息監聽回調
	container.setMessageListener(new ChannelAwareMessageListener() {
		@Override
		public void onMessage(Message message, Channel channel) throws Exception {
			log.info("message:{}", message);
			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
		}
	});

	// 現在每次只允許一條消息消費
	container.setPrefetchCount(1);

	return container;
}

  

帶有Channel的MessageListener擴展接口,用於手動確認;

 

執行如下:

message:(Body:'test2' MessageProperties [headers={spring_listener_return_correlation=b6ba6114-27c4-4c9b-ac4d-e3053ecf8762, spring_returned_message_correlation=test2_Id}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.order, receivedRoutingKey=key.order, deliveryTag=2, consumerTag=amq.ctag-HIhcGJg7Q6OdAV-q26zbnQ, consumerQueue=queue.order])

 

之前在代碼中設置SimpleMessageListenerContainer最大並發的消費者為3個,每次只允許消費一條消息,可以從管控台看出,配置已生效;

 

SimpleMessageListenerContainer類的繼承樹如下

 

org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer實現了MessageListenerContainer;

 

MessageListenerContainer接口的繼承樹如下:

 

MessageListenerContainer最終繼承了LifeCycle的接口,LifeCycle接口是Spring框架的接口,用於定義注入Spring中的Bean啟動/停止生命周期控制方法的通用接口,當Bean啟動交給Spring容器管理的時候,該Bean中的start方法會被回調;

 

org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#start

AbstractMessageListenerContainer#start會調到實現類的doStart方法;

 

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#doStart

 

調用initializeConsumers方法執行初始化消費者;initializeConsumers方法里面會調用createBlockingQueueConsumer方法創建BlockingQueueConsumer對象;

 

關於org.springframework.amqp.rabbit.listener.BlockingQueueConsumer對象,如下:

BlockingQueueConsumer這個類Spring AMQP包下RabbitMQ專門的消費者,封裝了消息broker的連接,並有自己的生命周期;

 

之后回到org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#doStart方法;

 

將與RabbitMQChannel通信的BlockingQueueConsumer對象的集合封裝到AsyncMessageProcessingConsumer對象中,而AsyncMessageProcessingConsumer對象又實現了Runnable接口,getTaskExecutor方法獲取到的是Spring封裝的SimpleAsyncTaskExecutor類型的線程池,該線程池是來一個任務創建一個線程,比較適合短期多任務,通過getTaskExecutor方法獲取的線程執行AsyncMessageProcessingConsumer對象的run方法;正常情況下run方法會執行initialize方法;

 

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#initialize

最終調用BlockingQueueConsumer#start方法;start方法里面會調用setQosAndreateConsumers方法,該方法用於創建消費者和設置消費者最大能消費的消息數量;

 

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#setQosAndreateConsumers

最終會調用consumeFromQueue方法;

 

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#consumeFromQueue

該方法啟動一個消費者,但是channel調用basicConsume方法傳入用於處理接收RabbitMQ消息時的回調Consumer是內部的InternalConsumer對象; 

當接收到消息時, InternalConsumer會回調handleDelivery方法,將消息放入到隊列中;

 

 

之后會循環執行AsyncMessageProcessingConsumer#mainLoop;

 

receiveAndExecute方法會調用SimpleMessageListenerContainer#receiveAndExecuteSimpleMessageListenerContainer#receiveAndExecute會調用SimpleMessageListenerContainer#doReceiveAndExecute方法,doReceiveAndExecute方法最終會調用BlockingQueueConsumer#nextMessage(long)方法,消息會從隊列里取出,消息接收通過basicConsumer接收,消費的消息是從mainLoop方法中取,當接收到消息時會將消息存到本地隊列,等待消息消費;

 

取出的消息是如何觸發onMessage方法?

SimpleMessageListenerContainer#doReceiveAndExecute如果沒有打開consumerBatchEnabled,則執行下面的executeListener方法,executeListener調用doExecuteListener方法,doExecuteListener會調用invokeListener方法;

 

而invokerListener是函數式接口,不是方法體;

 

invokerListenr方法的實現在proxy對象;

 

org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#actualInvokeListener

該方法是先判斷listener的類型,之后執行doInvokerListener方法,在doInvokerListener方法中會回調MessageListeneronMessage方法;

 

SimpleMessageListenerContainer調用鏈大致如下:

 

MessageListenerAdapter 消息監聽適配器

使用:

  • 實現handleMessage方法
  • 自定義隊列名->方法名的映射關系

 

使用如下:

手動注入SimpleMessageListenerContainer

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
	SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

	// 設置監聽的隊列
	container.setQueueNames("queue.order");
	// 設置並發的消費者
	container.setConcurrentConsumers(3);
	// 設置最大並發的消費者
	container.setMaxConcurrentConsumers(3);
	// 設置確認方式
	container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
	// 設置消息監聽回調
//		container.setMessageListener(new ChannelAwareMessageListener() {
//			@Override
//			public void onMessage(Message message, Channel channel) throws Exception {
//				log.info("message:{}", message);
//				channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//			}
//		});
	MessageListenerAdapter listenerAdapter = new MessageListenerAdapter();

	// 設置消息監聽回調調用orderService
	listenerAdapter.setDelegate(orderService);
	container.setMessageListener(listenerAdapter);

	// 現在每次只允許一條消息消費
	container.setPrefetchCount(1);

	return container;
}

  

注入OrderService

public interface OrderService {
	void handleMessage(String messageBody) throws IOException;
}

  

@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
	@Override
	public void handleMessage(String messageBody) throws IOException {
		log.info("orderService handleMessage:{}", messageBody);
	}
}

 

當有監聽到消息到達時,OrderServiceImpl#handleMessage方法會被調用;

執行如下:

 

為何handleMessage方法會被調用?

MessageListenerAdapter的繼承樹如下:

 

MessageListenerAdapter實現了MessageListener接口,當有消息到達時,MessageListenerAdapter#onMessage方法會被觸發;

org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter#onMessage

首先先拿到delegateListener對象,判斷delegateListener是否為ChannelAwareMessageListenerMessageListener類型,如果delegateListener是那些類型,delegateListener就調用onMessage方法;

 

delegate對象在調用doSetDelegate方法時賦值;

 

 

MessageListenerAdapter#onMessage獲取到消息處理的方法名,之后調用該方法;

 

org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter#getListenerMethodName

 

 默認情況下,queueOrTagToMethodName是沒有設置的,因此獲取的是默認的值;

 

不同的隊列如何設置調用不同的消息處理?

將上面的代碼修改如下:

MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(orderService);
HashMap<String, String> methodMap = new HashMap<>(8);
methodMap.put("queue.order1", "handleMessage1");
methodMap.put("queue.order2", "handleMessage2");
listenerAdapter.setQueueOrTagToMethodName(methodMap);
container.setMessageListener(listenerAdapter);

 

MessageConverter 消息轉換器

RabbitMQ Client原生的消息收發是通過byte[]作為消息體的,MessageConverter用來在收發消息時自動轉換消息

 

Jackson2JsonMessageConverter

Jackson轉換Json消息格式,最常用的MessageConverter,用來轉換Json格式消息;

配合ClassMapper可以直接轉換為POJO對象;

 

 

自定義MessageConverter

 

實現MessageConverter接口;

 

重寫toMessage,fromMessage方法;

 

 

使用如下:

實體類OrderDTO

@Data
public class OrderDTO implements Serializable {
	private String orderId;
	private Date createDate;
	private BigDecimal totalAmount;
	private BigDecimal payAmount;
	private String payType;
}

  

發送消息:

ObjectMapper objectMapper = new ObjectMapper();
String jsonStr = objectMapper.writeValueAsString(orderDTO);
Message message = new Message(jsonStr.getBytes(), new MessageProperties());
CorrelationData correlationData = new CorrelationData();
correlationData.setId(uuid);
rabbitTemplate.convertAndSend("exchange.order", "key.order", message, correlationData);

 

SimpleMessageListenerContainer配置修改:

設置轉換消息的類型

MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(orderService);
HashMap<String, String> methodMap = new HashMap<>(8);
methodMap.put("queue.order", "handleMessage");
listenerAdapter.setQueueOrTagToMethodName(methodMap);

Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setClassMapper(new ClassMapper() {
	@Override
	public void fromClass(Class<?> clazz, MessageProperties properties) {

	}

	/**
	 * 設置轉換消息的類型
	 * @param properties
	 * @return
	 */
	@Override
	public Class<?> toClass(MessageProperties properties) {
		return OrderDTO.class;
	}
});
listenerAdapter.setMessageConverter(converter);

 

執行如下:

 

分析:

org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter#onMessage

這里獲取轉換后的消息;

 

org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener#extractMessage

獲取往MessageListenerAdapter設置的消息轉換器,之后調用fromMessage方法轉換消息;

 

上面測試代碼設置的轉換器是Jackson2JsonMessageConverter,最終調用AbstractJackson2MessageConverter#fromMessage(org.springframework.amqp.core.Message, java.lang.Object);

 

 

轉換器沒有設置alwaysConvertToInferredType和javaTypeMapper,並且classMapper不為空,則執行下面的邏輯;

 

toClass調用的是上面測試代碼的里面的;

convertBytesToObject方法最終是調用Jackson進行Json轉換;

 

@RabbitListener注解

  @RabbitListener是一個組合注解,可以嵌套以下注解:

  @Exchange:自動聲明Exchange

  @Queue:自動聲明隊列

  @QueueBinding:自動聲明綁定關系 

 

  要使用@RabbitListener注解,需要注入RabbitListenerContainerFactory;

@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
	SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
	factory.setConnectionFactory(connectionFactory);
	factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
	return factory;
}

  

  消息監聽處理類,如下;

  使用 @Payload@Headers注解可以消息中的body(字符串類型)headers 信息;

  @RabbitHandler用於接收消息處理,下面的寫法是同一個隊列下所有類型的消息都會執行該方法;

@Slf4j
@Component
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = {
		"queue.order" })
public class OrderMessageHandler1 {

	@RabbitHandler(isDefault = true)
	public void handleOrderQueue(Message message, @Payload String body, Channel channel) throws IOException {
		long msgTag = message.getMessageProperties().getDeliveryTag();
		log.info("body:{}, message:{}, msgTag:{}", body, message, msgTag);

		// 成功確認,使用此回執方法后,消息會被 rabbitmq broker 刪除
		channel.basicAck(msgTag, false);
	}
}

  @RabbitListener注解中containerFactory為注入的RabbitListenerContainerFactory類型的bean名,queues為要監聽的隊列的名;

 

  執行如下: 

body:{"orderId":"dcfac502-2dc2-4f15-b7e0-352148c1c854","createDate":1624895624124,"totalAmount":null,"payAmount":null,"payType":null}, message:(Body:'[B@4644fcc8(byte[128])' MessageProperties [headers={spring_listener_return_correlation=2089040d-82e1-4f89-877c-7eb532a7c4fa, spring_returned_message_correlation=dcfac502-2dc2-4f15-b7e0-352148c1c854}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.order, receivedRoutingKey=key.order, deliveryTag=1, consumerTag=amq.ctag-msLGsIwV9daq1O0avuXXyQ, consumerQueue=queue.order]), msgTag:1

  

  RabbitListenerContainerFactory的繼承樹如下: 

  

  AbstractRabbitListenerContainerFactory#setMessageConverter 設置消息轉換器;

  • 消息處理方法參數是由 MessageConverter 轉化,消息的body的類型默認是byte[];如果要使用自定義 MessageConverter 則需要在 RabbitListenerContainerFactory 實例中去設置,參考上面的消息處理器(默認的實現是 SimpleRabbitListenerContainerFactory);

  • 消息的 content_type 屬性表示消息 body 數據以什么數據格式存儲,接收消息除了使用 Message 對象接收消息(包含消息屬性等信息)之外,還可直接使用對應類型接收消息 body 內容,但若方法參數類型不正確會拋異常:

    • application/octet-stream:二進制字節數組存儲,使用 byte[];

    • application/x-java-serialized-object:java 對象序列化格式存儲,使用 Object、相應類型(反序列化時類型應該同包同名,否者會拋出找不到類異常);

    • text/plain:文本數據類型存儲,使用 String;

    • application/json:JSON 格式,使用 Object、相應類型;

 

  @RabbitListener也可以修飾消息處理的方法,如下:

@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = {
		"queue.order" })
public void handleOrderQueue(Message message, @Payload String body, Channel channel)
		throws IOException {
	long msgTag = message.getMessageProperties().getDeliveryTag();
	log.info("body:{}, message:{}, msgTag:{}", body, message, msgTag);

	// 成功確認,使用此回執方法后,消息會被 rabbitmq broker 刪除
	channel.basicAck(msgTag, false);
}

  

  執行如下:

body:{"orderId":"1f4e343e-a26f-4410-9939-102e728e9df1","createDate":1624897186777,"totalAmount":null,"payAmount":null,"payType":null}, message:(Body:'[B@4d1af5df(byte[128])' MessageProperties [headers={spring_listener_return_correlation=a87fbdb8-12e1-4828-bdee-b92c76b910cf, spring_returned_message_correlation=1f4e343e-a26f-4410-9939-102e728e9df1}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.order, receivedRoutingKey=key.order, deliveryTag=1, consumerTag=amq.ctag-2liRtkURbpuuRgV4EGyyZA, consumerQueue=queue.order]), msgTag:1

  

  通過@RabbitListener注解的屬性bindings聲明Binding,如果RabbitMQ中不存在該綁定所需要的Queue、Exchange、RouteKey 則自動創建,若存在則拋出異常,使用如下:

@RabbitListener(containerFactory = "rabbitListenerContainerFactory", admin = "rabbitAdmin", bindings = {
		@QueueBinding(value = @Queue(name = "queue.order"), exchange = @Exchange(name = "exchange.order", type = ExchangeTypes.TOPIC), key = "key.order") })
public void handleOrderQueue(Message message, @Payload String body, Channel channel)
		throws IOException {
	long msgTag = message.getMessageProperties().getDeliveryTag();
	log.info("body:{}, message:{}, msgTag:{}", body, message, msgTag);

	// 成功確認,使用此回執方法后,消息會被 rabbitmq broker 刪除
	channel.basicAck(msgTag, false);
}

  

  應用啟動后,查看管控台;

 

 

  執行如下:

body:{"orderId":"97d08e05-588f-4cb0-b551-f5fb32634e96","createDate":1624898825450,"totalAmount":null,"payAmount":null,"payType":null}, message:(Body:'[B@12fea6bb(byte[128])' MessageProperties [headers={spring_listener_return_correlation=0924912e-a6a9-4e08-8196-1355a04e2325, spring_returned_message_correlation=97d08e05-588f-4cb0-b551-f5fb32634e96}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.order, receivedRoutingKey=key.order, deliveryTag=3, consumerTag=amq.ctag-1e3NzsIvhnYRFl295JKOvQ, consumerQueue=queue.order]), msgTag:1

  

Spring Boot整合RabbitMQ

使用application.properties配置,需要使用Spring Boot配置注入,在application.properties配置文件配置RabbitMQ的連接信息;

連接配置如下:

spring.rabbitmq.host=192.168.211.135
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=password
spring.rabbitmq.virtual-host=/dev

 

  • 生產者到交換機

開啟confirmCallback,生產者投遞消息后,如果Broker收到消息后,會給生產者一個ACK;生產者通過ACK,可以確認這條消息是否正常發送到Broker,這種方式是消息可靠性投遞的核心;

spring.rabbitmq.publisher-confirm-type: correlated

Spring Boot2.2之前使用如下配置:

spring.rabbitmq.publisher-confirms=true

 

  • 交換機到隊列

開啟returnCallback配置

spring.rabbitmq.publisher-returns=true

 

配置交換機投遞到隊列失敗的策略

交換機到隊列不成功,則丟棄消息(默認);交換機到隊列不成功,返回給消息生產者,觸發returnCallback;

#交換機處理消息到路由失敗,則會返回給生產者
spring.rabbitmq.template.mandatory=true

對應的RabbitTemplate配置

template.setMandatory(true);

 

  • RabbitMQ的ACK
    • 消費者從RabbitMQ收到消息並處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋后才將此消息從隊列中刪除;
    • 消費者在處理消息出現了網絡不穩定、服務器異常等現象,那么就不會有ACK反饋,RabbitMQ會認為這個消息沒有正常消費,會將消息重新放入隊列中;
    • 只有當消費者正確發送ACK應答,RabbitMQ確認收到后,消息才會從RabbitMQ服務器的數據中刪除;
    • 消息的ACK確認機制默認是自動確認的,消息如果未被進行ACK的消息確認應答,這條消息被鎖定unacked;
    • 確認方式

      • 自動確認(默認)
      • 手動確認 manual

      org.springframework.amqp.core.AcknowledgeMode 消息確認枚舉

      

 

application.properties配置

#開啟手動確認消息,如果消息重新入隊,進行重試
spring.rabbitmq.listener.simple.acknowledge-mode=manual

  

  使用了Spring Boot配置會自動生成RabbitAdmin,需要將上面的queue.order隊列消息消費的測試代碼中@RabbitListener的admin屬性去掉;

  containerFactory的創建在org.springframework.boot.autoconfigure.amqp.RabbitAnnotationDrivenConfiguration#simpleRabbitListenerContainerFactory

  

 

  RabbitListenerAnnotationBeanPostProcessor實現了BeanPostProcessor接口,BeanPostProcessor為bean的后置處理器,在bean初始化之前調用進行攔截,在bean初始化前后進行一些處理工作 ,說明此時的bean已經進行了實例化;

  

   @RabbitListener上屬性的Bean則在postProcessAfterInitialization方法里創建處理;

 

  • 死信隊列

  死信轉移過程:

   

 

   死信隊列配置類如下:

@Configuration
public class DLXQueueConfig {

	@Bean
	public Queue productDLXQueue() {
		Queue queue = new Queue("queue.product.dlx");
		queue.addArgument("x-message-ttl", 10000);
		queue.addArgument("x-dead-letter-exchange", "exchange.product.dlx");
		queue.addArgument("x-dead-letter-routing-key", "key.product.dlx");
		return queue;
	}

	@Bean
	public Exchange productExchange() {
		TopicExchange exchange = new TopicExchange("exchange.product");
		return exchange;
	}

	@Bean
	public Binding productDlxQueueBinding() {
		Binding binding = new Binding("queue.product.dlx",
				Binding.DestinationType.QUEUE,
				"exchange.product",
				"key.product",
				null);
		return binding;
	}

	@Bean
	public Queue productQueue() {
		Queue queue = new Queue("queue.product");
		return queue;
	}

	@Bean
	public Exchange productDLXExchange() {
		TopicExchange exchange = new TopicExchange("exchange.product.dlx");
		return exchange;
	}

	@Bean
	public Binding productQueueBinding() {
		Binding binding = new Binding("queue.product",
				Binding.DestinationType.QUEUE,
				"exchange.product.dlx",
				"key.product.dlx",
				null);
		return binding;
	}


}  

  程序啟動后創建對應的Queue,Exchange,Binding; 

 

 

 

 

  消息處理類

@Slf4j
@Component
public class DLXMessageHandler {

	@RabbitListener(queues = {"queue.product"})
	public void handleTTLMessage(Message message, @Payload String body, Channel channel) throws IOException {
		long msgTag = message.getMessageProperties().getDeliveryTag();
		log.info("body:{}, message:{}, msgTag:{}", body, message, msgTag);

		// 成功確認,使用此回執方法后,消息會被 rabbitmq broker 刪除
		channel.basicAck(msgTag, false);

	}
}

  

  當往exchange.product交換機投遞消息,如果該消息10秒內都不消費,10秒后該隊列會變成死信隊列,將消息投遞到另一個交換機,交換機再將消息通過路由鍵路由到對應的隊列;

  執行如下:

body:{"orderId":"c7f57429-41b1-483e-bf4d-edf24783e043","createDate":1624974305090,"totalAmount":null,"payAmount":null,"payType":null}, message:(Body:'[B@110ea79f(byte[128])' MessageProperties [headers={spring_listener_return_correlation=ab1b027e-aad0-4a1d-8551-6c27eac0772d, spring_returned_message_correlation=c7f57429-41b1-483e-bf4d-edf24783e043, x-first-death-exchange=exchange.product, x-death=[{reason=expired, count=1, exchange=exchange.product, time=Tue Jun 29 21:45:13 CST 2021, routing-keys=[key.product], queue=queue.product.dlx}], x-first-death-reason=expired, x-first-death-queue=queue.product.dlx}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.product.dlx, receivedRoutingKey=key.product.dlx, deliveryTag=2, consumerTag=amq.ctag-wGj9Z_aaSndUaUTKHIFC-g, consumerQueue=queue.product]), msgTag:2

  

  當消息發送的是一個實體,不是字符串,如下:

OrderDTO orderDTO = new OrderDTO();
String uuid = UUID.randomUUID().toString();
orderDTO.setOrderId(uuid);
orderDTO.setCreateDate(new Date());

CorrelationData correlationData = new CorrelationData();
correlationData.setId(uuid);
rabbitTemplate.convertAndSend("exchange.product", "key.product", orderDTO, correlationData);

  

  

 

  消息處理修改如下:

@Slf4j
@Component
@RabbitListener(queues = {"queue.product"})
public class DLXMessageHandler {

	@RabbitHandler
	public void handleTTLMessage(Message message, OrderDTO orderDTO, Channel channel) throws IOException {
		long msgTag = message.getMessageProperties().getDeliveryTag();
		log.info("body:{}, message:{}, msgTag:{}", orderDTO, message, msgTag);

		// 成功確認,使用此回執方法后,消息會被 rabbitmq broker 刪除
		channel.basicAck(msgTag, false);

	}
}

  執行如下:

body:OrderDTO(orderId=d36438b9-9cf7-48a8-a835-064b4a90201f, createDate=Tue Jun 29 22:27:19 CST 2021, totalAmount=null, payAmount=null, payType=null), message:(Body:'[B@7cae8af(byte[271])' MessageProperties [headers={spring_listener_return_correlation=77d4d8fc-7d6e-4ab9-a7bb-3d3c6ac30f02, spring_returned_message_correlation=d36438b9-9cf7-48a8-a835-064b4a90201f, x-first-death-exchange=exchange.product, x-death=[{reason=expired, count=1, exchange=exchange.product, time=Tue Jun 29 22:27:27 CST 2021, routing-keys=[key.product], queue=queue.product.dlx}], x-first-death-reason=expired, x-first-death-queue=queue.product.dlx}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.product.dlx, receivedRoutingKey=key.product.dlx, deliveryTag=3, consumerTag=amq.ctag-yukRV_EsHHtH6-pVxx6yZQ, consumerQueue=queue.product]), msgTag:3

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM