消息中間件——RabbitMQ(九)RabbitMQ整合Spring AMQP實戰!(全)


求關注

RabbitMQ整合Spring AMQP實戰!(全)

前言

1. AMQP 核心組件

  • RabbitAdmin
  • SpringAMQP聲明
  • RabbitTemplate
  • SimpleMessageListenerContainer
  • MessageListenerAdapter
  • MessageConverter

2. RabbitAdmin

RabbitAdmin類可以很好的才注意RabbitMQ,在Spring中直接進行諸如即可。

RabbitAdmin

注意:

  • autoStartUp必須要設置為true,否則Spring容器不會加載RabbitAdmin類
  • RabbitAdmin底層實現就是從Spring容器中獲取Exchange、Bingding、RoutingKey以及Queue的@Bean聲明
  • 使用RabbitTemplate的execute方法執行對應的什么、修改、刪除等一系列RabbitMQ基礎功能操作
  • 例如:添加一個交換機、刪除一個綁定、清空一個隊列里的消息等等

2.1 代碼演示

2.1.1 引入Pom文件


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.cp</groupId>
	<artifactId>rabbitmq-spring</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>rabbitmq-spring</name>
	<description>rabbitmq-spring</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.14.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>3.6.5</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>		
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


</project>


2.1.2 配置Bean


@Configuration
@ComponentScan({"com.cp.spring.*"})
public class RabbitMQConfig {

	//相當於<Bean id="connectionFactory"></Bean>
	@Bean
	public ConnectionFactory connectionFactory(){
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
		connectionFactory.setAddresses("127.0.0.1:5672");
		connectionFactory.setUsername("user_cp");
		connectionFactory.setPassword("123456");
		connectionFactory.setVirtualHost("/vhost_cp");
		return connectionFactory;
	}
	
	//形參名稱要與bean的方法名保持一致
	@Bean
	public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
		RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
		rabbitAdmin.setAutoStartup(true);
		return rabbitAdmin;
	}
}
	

2.1.3 測試類


@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

	@Test
	public void contextLoads() {
	}
	
	@Autowired
	private RabbitAdmin rabbitAdmin;
	
	@Test
	public void testAdmin() throws Exception {
		//直連監聽
		rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
		
		rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
		
		rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
		
		rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
		
		rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
		
		rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));

		//第一個參數:具體的隊列 第二個參數:綁定的類型 第三個參數:交換機 第四個參數:路由key 第五個參數:arguments 參數
		rabbitAdmin.declareBinding(new Binding("test.direct.queue",
				Binding.DestinationType.QUEUE,
				"test.direct", "direct", new HashMap<>()));

		//BindingBuilder 鏈式編程
		rabbitAdmin.declareBinding(
				BindingBuilder
				.bind(new Queue("test.topic.queue", false))		//直接創建隊列
				.to(new TopicExchange("test.topic", false, false))	//直接創建交換機 建立關聯關系
				.with("user.#"));	//指定路由Key
		
		
		rabbitAdmin.declareBinding(
				BindingBuilder
				.bind(new Queue("test.fanout.queue", false))		
				.to(new FanoutExchange("test.fanout", false, false)));
		
		//清空隊列數據
		rabbitAdmin.purgeQueue("test.topic.queue", false);
	}
}

通過以上代碼,可以自行測試一下結果。

RabbitAdmin源碼

RabbitAdmin源碼UML圖

實現了InitializingBean接口,表明在Bean配置加載完后再加載RabbitAdmin配置。找到afterPropertiesSet()方法中最要的initialize()初始化方法。

afterPropertiesSet方法

initialize方法

initialize

this.applicationContext.getBeansOfType(Collection.class, false, false).values()

可以看到Exchange、Queue、Binding都是從Spring容器中獲取三種類型,加載到上方定義的contextExchanges、contextQueues、contextBindings三種容器中。
后續的源碼中,也可以看出通過篩選Spring容器中RabbitMQ的信息之后,再去建立RabbitMQ服務器的連接。主要通過Spring以@Bean的方式,將配置加載到Spring容器之后,再從容器中獲取相關信息,再去建立連接。

3. SpringAMQP聲明

  • 在Rabbit基礎API里面聲明一個Exchange、聲明一個綁定、一個隊列

channel

-使用SpringAMQP去聲明,就需要使用SpringAMQP的如下模式,即聲明@Bean方式

聲明AMQP

3.1 代碼演示


@Configuration
@ComponentScan({"com.cp.spring.*"})
public class RabbitMQConfig {

	//相當於<Bean id="connectionFactory"></Bean>
	@Bean
	public ConnectionFactory connectionFactory(){
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
		connectionFactory.setAddresses("127.0.0.1:5672");
		connectionFactory.setUsername("user_cp");
		connectionFactory.setPassword("123456");
		connectionFactory.setVirtualHost("/vhost_cp");
		return connectionFactory;
	}
	
	//形參名稱要與bean的方法名保持一致
	@Bean
	public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
		RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
		rabbitAdmin.setAutoStartup(true);
		return rabbitAdmin;
	}
	
    /**  
     * 針對消費者配置  
     * 1. 設置交換機類型  
     * 2. 將隊列綁定到交換機  
        FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念  
        HeadersExchange :通過添加屬性key-value匹配  
        DirectExchange:按照routingkey分發到指定隊列  
        TopicExchange:多關鍵字匹配  
     */  
    @Bean  
    public TopicExchange exchange001() {  
        return new TopicExchange("topic001", true, false);  
    }  

    @Bean  
    public Queue queue001() {  
        return new Queue("queue001", true); //隊列持久  
    }  
    
    @Bean  
    public Binding binding001() {  
        return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");  
    }  
    
    @Bean  
    public TopicExchange exchange002() {  
        return new TopicExchange("topic002", true, false);  
    }  
    
    @Bean  
    public Queue queue002() {  
        return new Queue("queue002", true); //隊列持久  
    }
    
    @Bean  
    public Binding binding002() {  
        return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");  
    } 
    
    @Bean  
    public Queue queue003() {  
        return new Queue("queue003", true); //隊列持久  
    }
    
    @Bean  
    public Binding binding003() {  
    	//同一個Exchange綁定了2個隊列
        return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*");  
    } 
}

再次運行ApplicationTests類中testAdmin()方法,可以在控制台中,查看到一個Exchange綁定兩個Queue。

運行結果

4. RabbitTemplate

RabbitTemplate,即消息模板

  • 我們在與SpringAMQP整合的時候進行發送消息的關鍵詞

  • 該類提供了豐富的發送消息方法,包括可靠性投遞消息方法、回調監聽消息接口ConfirmCallback、返回值確認接口ReturnCallback等等。同樣我們需要進行注入到Spring容器中,然后直接使用

  • 在與SPring整合時需要實例化,但是在與SpringBoot整合時,在配置文件里添加配置即可

4.1 代碼演示

4.1.1 RabbitMQConfig類

在RabbitMQConfig類中寫RabbitTemplate配置


@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
	RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
	return rabbitTemplate;
}

4.1.2 ApplicationTests類

在ApplicationTests測試類中添加測試方法,進行測試。


@Autowired
	private RabbitTemplate rabbitTemplate;


	@Test
	public void testSendMessage() throws Exception {
		//1 創建消息
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.getHeaders().put("desc", "信息描述..");
		messageProperties.getHeaders().put("type", "自定義消息類型..");
		//消息體,與參數
		Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
		//轉換並發送
		//MessagePostProcessor 在消息發送完畢后再做一次轉換進行再加工,匿名接口,需要重寫方法
		rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
			@Override
			public Message postProcessMessage(Message message) throws AmqpException {
				System.err.println("------添加額外的設置---------");
				message.getMessageProperties().getHeaders().put("desc", "額外修改的信息描述");
				message.getMessageProperties().getHeaders().put("attr", "額外新加的屬性");
				return message;
			}
		});
	}

4.1.3 查看管控台

運行前,可以看到queue001中是沒有消息的。

執行前Queue001

運行testSendMessage()方法。並獲取消息。

執行后Queue001

獲取消息

4.1.4 簡單寫法


@Test
public void testSendMessage2() throws Exception {
	//1 創建消息
	MessageProperties messageProperties = new MessageProperties();
	messageProperties.setContentType("text/plain");
	Message message = new Message("mq 消息1234".getBytes(), messageProperties);

	rabbitTemplate.send("topic001", "spring.abc", message);

	rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
	rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
}

我們往topic001中發送了兩條消息,topic002中發送了一條消息。運行testSendMessage2() 接下來再查看下管控台

查看管控台

獲取消息1

獲取消息2

獲取消息3

可以看到topic001中已經有了三條消息,剛才發送的消息也還在。GetMessage並不是消費消息,而只是獲取消息。

5. SimpleMessageListenerContainer

簡單消息監聽容器

  • 這個類非常的強大,我們可以對它進行很多設置,對於消費者的配置項,這個類都可以滿足
  • 監聽隊列(多個隊列)、自動啟動、自動聲明功能
  • 設置事務特性、事務管理器、事務屬性、事務容器(並發)、是否開啟事務、回滾消息等
  • 設置消費者數量、最小最大數量、批量消費
  • 設置消息確認和自動確認模式、是否重回隊列、異常捕捉handler函數
  • 設置消費者標簽生成策略、是否獨占模式、消費者屬性等
  • 設置具體的監聽器、消息轉換器等等。

注意:

  • SimpleMessageListenerContainer可以進行動態設置,比如在運行中的應用可以動態的修改其消費者數量的大小、接收消息的模式等
  • 很多機遇RabbitMQ的自制定話后端管控台在進行動態設置的時候,也是根據這一特性去實現的。所以可以看出SpringAMQP非常的強大

思考

SimpleMessageListenerContainer為什么可以動態感知配置變更?

5.1 代碼演示

5.1.1 RabbitMQConfig類

配置中添加如下代碼:


@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
	
	SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
	//添加多個隊列進行監聽
	container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
	//當前消費者數量
	container.setConcurrentConsumers(1);
	//最大消費者數量
	container.setMaxConcurrentConsumers(5);
	//設置重回隊列,一般設置false
	container.setDefaultRequeueRejected(false);
	//設置自動簽收機制
	container.setAcknowledgeMode(AcknowledgeMode.AUTO);
	//設置listener外露
    container.setExposeListenerChannel(true);
	//消費端標簽生成策略
	container.setConsumerTagStrategy(new ConsumerTagStrategy() {
		@Override
		public String createConsumerTag(String queue) {
			//每個消費端都有自己獨立的標簽
			return queue + "_" + UUID.randomUUID().toString();
		}
	});

	//消息監聽
	container.setMessageListener(new ChannelAwareMessageListener() {
		@Override
		public void onMessage(Message message, Channel channel) throws Exception {
			String msg = new String(message.getBody());
			System.err.println("----------消費者: " + msg);
		}
	});
	return container;
}


運行之前寫的testSendMessage2()方法,查看管控台中的相關信息以及控制台打印信息

查看Channel1

查看Channel2

打印信息

6. MessageListenerAdapter

MessageListenerAdapter 即消息監聽適配器

6.1 代碼演示

6.1.1 適配器使用方式1

我們把之前的消息監聽代碼注釋,可以不用直接加消息監聽,而是采用MessageListenerAdapter的方式,通過適配器方式1,我們來學習下如何使用默認的handleMessage,自定義方法名,自定義轉換器。

使用默認handleMessage


//消息監聽
/*container.setMessageListener(new ChannelAwareMessageListener() {
	@Override
	public void onMessage(Message message, Channel channel) throws Exception {
		String msg = new String(message.getBody());
		System.err.println("----------消費者: " + msg);
	}
});*/


MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
container.setMessageListener(adapter);


MessageListenerAdapter 適配器類,熟悉適配器模式的朋友肯定了解適配器模式的話,可以通過適配器,適配自己的實現,這里我們適配自定義的MessageDelegate類。我們就可以不采用監聽的方式,采用適配的方式。

自定義MessageDelegate

public class MessageDelegate {

	public void handleMessage(byte[] messageBody) {
		System.err.println("默認方法, 消息內容:" + new String(messageBody));
	}
}

MessageDelegate類中,方法名與參數handleMessage(byte[] messageBody)是固定的。為什么呢?

MessageListenerAdapter源碼分析

我們來看下MessageListenerAdapter底層代碼

MessageListenerAdapter

MessageListenerAdapter類中


public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";

默認方法名就是叫handleMessage。當然也可以自己去指定設置。通過messageListenerAdapter的代碼我們可以看出如下核心屬性

  • defaultListenerMethod默認監聽方法名稱:用於設置監聽方法名稱
  • Delegate 委托對象:實際真實的委托對象,用於處理消息
  • queueOrTagToMethodName 隊列標識與方法名稱組成集合
  • 可以一一進行隊列與方法名稱的匹配
  • 隊列和方法名稱綁定,即指定隊列里的消息會被綁定的方法所接受處理

測試一下默認使用的handleMessage方法。啟動ApplicationTests類,運行testSendMessage()測試方法。

打印消息

自定義方法名

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
container.setMessageListener(adapter);

修改MessageDelegate()類

public class MessageDelegate {
	
	public void consumeMessage(byte[] messageBody) {
		System.err.println("字節數組方法, 消息內容:" + new String(messageBody));
	}
}

自定義TextMessageConverter轉換器


public class TextMessageConverter implements MessageConverter {

	@Override
	public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
		return new Message(object.toString().getBytes(), messageProperties);
	}

	@Override
	public Object fromMessage(Message message) throws MessageConversionException {
		String contentType = message.getMessageProperties().getContentType();
		if(null != contentType && contentType.contains("text")) {
			return new String(message.getBody());
		}
		return message.getBody();
	}

}


修改RabbitMQConfig類


MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);

修改MessageDelegate類


public class MessageDelegate {

	public void consumeMessage(String messageBody) {
		System.err.println("字符串方法, 消息內容:" + messageBody);
	}
}

運行testSendMessage4Text()測試方法


@Test
public void testSendMessage2() throws Exception {
	//1 創建消息
	MessageProperties messageProperties = new MessageProperties();
	messageProperties.setContentType("text/plain");
	Message message = new Message("mq 消息1234".getBytes(), messageProperties);
	rabbitTemplate.send("topic001", "spring.abc", message);
	rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
	rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
}

注意:在發消息的時候,必須符合自己的轉換器。

打印結果
打印結果

6.1.2 適配器使用方式2

自定義隊列名稱和方法名稱。

/**
* 2 適配器方式: 我們的隊列名稱 和 方法名稱 也可以進行一一的匹配
* /
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setMessageConverter(new TextMessageConverter());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("queue001", "method1");
queueOrTagToMethodName.put("queue002", "method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);    

public class MessageDelegate {
	public void method1(String messageBody) {
		System.err.println("method1 收到消息內容:" + new String(messageBody));
	}
	
	public void method2(String messageBody) {
		System.err.println("method2 收到消息內容:" + new String(messageBody));
	}
	
}

運行 測試方法

@Test
public void testSendMessage4Text() throws Exception {
	//1 創建消息
	MessageProperties messageProperties = new MessageProperties();
	messageProperties.setContentType("text/plain");
	Message message = new Message("mq 消息1234".getBytes(), messageProperties);
	rabbitTemplate.send("topic001", "spring.abc", message);
	rabbitTemplate.send("topic002", "rabbit.abc", message);
}

運行結果:

打印結果

7. MessageConverter消息轉換器

我們在進行發送消息的時候,正常情況下消息體為二進制的數據方式進行傳輸,如果希望內部幫我們進行轉換,或者指定自定義的轉換器,就需要用到MessageConverter

  • 自定義常用轉換器:MessageConverter,一般來講都需要實現這個接口
  • 重寫下面兩個方法:
    toMessage:java對象轉換為Message
    fromMessage:Message對象轉換為java對象
  • Json轉換器:Jackson2JsonMessageConverter:可以進行Java對象的轉換功能
  • DefaultJackson2JavaTypeMapper映射器:可以進行java對象的映射關系
  • 自定義二進制轉換器:比如圖片類型、PDF、PPT、流媒體

7.1 代碼演示

其實我們在介紹MessageListenerAdapter的時候,中間就介紹到了TextMessageConverter轉換器,將二進制數據轉換成字符串數據。

7.1.1 添加json格式的轉換器

修改RabbitMQConfig類


// 1.1 支持json格式的轉換器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//重點,加入json格式的轉換器 json對應Map對象
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);

container.setMessageListener(adapter);

修改MessageDelegate

public class MessageDelegate {
	//json對應Map對象
	public void consumeMessage(Map messageBody) {
		System.err.println("map方法, 消息內容:" + messageBody);
	}
}

定義一個Order對象


public class Order {
	private String id;
	private String name;
	private String content;
	...省略get/set等方法
}

定義測試方法

@Test
public void testSendJsonMessage() throws Exception {

	Order order = new Order();
	order.setId("001");
	order.setName("消息訂單");
	order.setContent("描述信息");
	ObjectMapper mapper = new ObjectMapper();
	String json = mapper.writeValueAsString(order);
	System.err.println("order 4 json: " + json);

	MessageProperties messageProperties = new MessageProperties();
	//這里注意一定要修改contentType為 application/json
	messageProperties.setContentType("application/json");
	Message message = new Message(json.getBytes(), messageProperties);

	rabbitTemplate.send("topic001", "spring.order", message);
}
  

打印結果:
打印結果

7.1.2 添加支持Java對象轉換

修改RabbitMQConfig類


MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");

Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();

//需要將javaTypeMapper放入到Jackson2JsonMessageConverter對象中
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);

adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);

修改MessageDelegate

public class MessageDelegate {
	public void consumeMessage(Order order) {
		System.err.println("order對象, 消息內容, id: " + order.getId() + 
				", name: " + order.getName() + 
				", content: "+ order.getContent());
	}
}

定義測試方法

@Test
public void testSendJavaMessage() throws Exception {

	Order order = new Order();
	order.setId("001");
	order.setName("訂單消息");
	order.setContent("訂單描述信息");
	ObjectMapper mapper = new ObjectMapper();
	String json = mapper.writeValueAsString(order);
	System.err.println("order 4 json: " + json);

	MessageProperties messageProperties = new MessageProperties();
	//這里注意一定要修改contentType為 application/json
	messageProperties.setContentType("application/json");
	//添加typeid 與類的全路徑
	messageProperties.getHeaders().put("__TypeId__", "com.cp.spring.entity.Order");
	Message message = new Message(json.getBytes(), messageProperties);

	rabbitTemplate.send("topic001", "spring.order", message);
}

打印結果:

打印結果

7.1.3 添加支持java對象多映射轉換

修改RabbitMQConfig類


//1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java對象多映射轉換
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();

//key表示標簽 對應一個類的具體全路徑。類和標簽綁定之后,標簽是order,意思就是轉換成order類
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("order", com.cp.spring.entity.Order.class);
idClassMapping.put("packaged", com.cp.spring.entity.Packaged.class);

javaTypeMapper.setIdClassMapping(idClassMapping);
//一層套一層
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);

修改MessageDelegate

public class MessageDelegate {
	//json對應Map對象
	public void consumeMessage(Order order) {
		System.err.println("order對象, 消息內容, id: " + order.getId() + 
				", name: " + order.getName() + 
				", content: "+ order.getContent());
	}
	 
	public void consumeMessage(Packaged pack) {
		System.err.println("package對象, 消息內容, id: " + pack.getId() + 
				", name: " + pack.getName() + 
				", content: "+ pack.getDescription());
	}
}

定義一個Packaged對象


public class Packaged {
	private String id;
	private String name;
	private String description;
	...省略get/set等方法
}

定義測試方法


@Test
public void testSendMappingMessage() throws Exception {

	ObjectMapper mapper = new ObjectMapper();

	Order order = new Order();
	order.setId("001");
	order.setName("訂單消息");
	order.setContent("訂單描述信息");

	String json1 = mapper.writeValueAsString(order);
	System.err.println("order 4 json: " + json1);

	MessageProperties messageProperties1 = new MessageProperties();
	//這里注意一定要修改contentType為 application/json
	messageProperties1.setContentType("application/json");
	//設置的是標簽,而不是全路徑
	messageProperties1.getHeaders().put("__TypeId__", "order");
	Message message1 = new Message(json1.getBytes(), messageProperties1);
	rabbitTemplate.send("topic001", "spring.order", message1);

	Packaged pack = new Packaged();
	pack.setId("002");
	pack.setName("包裹消息");
	pack.setDescription("包裹描述信息");

	String json2 = mapper.writeValueAsString(pack);
	System.err.println("pack 4 json: " + json2);

	MessageProperties messageProperties2 = new MessageProperties();
	//這里注意一定要修改contentType為 application/json
	messageProperties2.setContentType("application/json");
	//設置的是標簽,而不是全路徑
	messageProperties2.getHeaders().put("__TypeId__", "packaged");
	Message message2 = new Message(json2.getBytes(), messageProperties2);
	rabbitTemplate.send("topic001", "spring.pack", message2);
}


打印結果:

打印結果

在通過單元測試運行testSendMappingMessage()方法時會存在一個問題:委派對象MessageDelegate可能會收不到對象。
因為單元測試spring容器在運行完畢之后就停止,不會等到消費者消費完消息之后再停止,所以需要通過正常啟動springboot項目,可以看到正常消費消息。

7.1.4 添加全局轉換器

修改RabbitMQConfig類


@Bean  
public Queue queue_image() {  
	return new Queue("image_queue", true); //隊列持久  
}

@Bean  
public Queue queue_pdf() {  
	return new Queue("pdf_queue", true); //隊列持久  
}

//1.4 ext convert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");

//全局的轉換器:所有小的Converter都可以放到這個大的Converter中
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();

TextMessageConverter textConvert = new TextMessageConverter();
//text走文本轉換器
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
//json走json轉換器
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
//圖片走圖片轉換器
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
//pdf走pdf轉換器
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);


adapter.setMessageConverter(convert);
container.setMessageListener(adapter);

修改MessageDelegate

public class MessageDelegate {

	public void handleMessage(byte[] messageBody) {
		System.err.println("默認方法, 消息內容:" + new String(messageBody));
	}
	
	public void consumeMessage(byte[] messageBody) {
		System.err.println("字節數組方法, 消息內容:" + new String(messageBody));
	}
	
	public void consumeMessage(String messageBody) {
		System.err.println("字符串方法, 消息內容:" + messageBody);
	}
	
	public void method1(String messageBody) {
		System.err.println("method1 收到消息內容:" + new String(messageBody));
	}
	
	public void method2(String messageBody) {
		System.err.println("method2 收到消息內容:" + new String(messageBody));
	}
	
	//json對應Map對象
	public void consumeMessage(Map messageBody) {
		System.err.println("map方法, 消息內容:" + messageBody);
	}
	public void consumeMessage(Order order) {
		System.err.println("order對象, 消息內容, id: " + order.getId() + 
				", name: " + order.getName() + 
				", content: "+ order.getContent());
	}
	public void consumeMessage(Packaged pack) {
		System.err.println("package對象, 消息內容, id: " + pack.getId() + 
				", name: " + pack.getName() + 
				", content: "+ pack.getDescription());
	}
	public void consumeMessage(File file) {
		System.err.println("文件對象 方法, 消息內容:" + file.getName());
	}
}

添加PDFMessageConverter


public class PDFMessageConverter implements MessageConverter {

	@Override
	public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
		throw new MessageConversionException(" convert error ! ");
	}

	@Override
	public Object fromMessage(Message message) throws MessageConversionException {
		System.err.println("-----------PDF MessageConverter----------");
		
		byte[] body = message.getBody();
		String fileName = UUID.randomUUID().toString();
		String path = "d:/010_test/" + fileName + ".pdf";
		File f = new File(path);
		try {
			Files.copy(new ByteArrayInputStream(body), f.toPath());
		} catch (IOException e) {
			e.printStackTrace();
		}
		return f;
	}

}


添加ImageMessageConverter


public class ImageMessageConverter implements MessageConverter {

	@Override
	public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
		throw new MessageConversionException(" convert error ! ");
	}

	@Override
	public Object fromMessage(Message message) throws MessageConversionException {
		System.err.println("-----------Image MessageConverter----------");
		
		Object _extName = message.getMessageProperties().getHeaders().get("extName");
		String extName = _extName == null ? "png" : _extName.toString();
		
		byte[] body = message.getBody();
		String fileName = UUID.randomUUID().toString();
		//將接受到的圖片放到該位置
		String path = "d:/010_test/" + fileName + "." + extName;
		File f = new File(path);
		try {
			Files.copy(new ByteArrayInputStream(body), f.toPath());
		} catch (IOException e) {
			e.printStackTrace();
		}
		return f;
	}
}

定義測試方法


@Test
public void testSendExtConverterMessage() throws Exception {
//		byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png"));
//		MessageProperties messageProperties = new MessageProperties();
//		messageProperties.setContentType("image/png");
//		messageProperties.getHeaders().put("extName", "png");
//		Message message = new Message(body, messageProperties);
//		rabbitTemplate.send("", "image_queue", message);

		byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf"));
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentType("application/pdf");
		Message message = new Message(body, messageProperties);
		rabbitTemplate.send("", "pdf_queue", message);
}

可以自己測試下圖片和pdf的保存。

源碼地址:https://gitee.com/573059382/rabbitmq-demos

文末

歡迎關注個人微信公眾號:Coder編程
獲取最新原創技術文章和免費學習資料,更有大量精品思維導圖、面試資料、PMP備考資料等你來領,方便你隨時隨地學習技術知識!
新建了一個qq群:315211365,歡迎大家進群交流一起學習。謝謝了!也可以介紹給身邊有需要的朋友。

文章收錄至
Github: https://github.com/CoderMerlin/coder-programming
Gitee: https://gitee.com/573059382/coder-programming
歡迎關注並star~
微信公眾號

參考文章:

《RabbitMQ消息中間件精講》

推薦文章:

消息中間件——RabbitMQ(六)理解Exchange交換機核心概念!

消息中間件——RabbitMQ(七)高級特性全在這里!(上)

消息中間件——RabbitMQ(八)高級特性全在這里!(下)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM