自定義MessageConverter--消息轉換器


我們在進行發送消息的時候,正常情況下消息體為二進制的數據方式進行傳輸,如果希望內部幫我們進行轉換,或者指定自定義的轉換器,就需要用到MessageConverter

  • 自定義常用轉換器:MessageConverter,一般來講都需要實現這個接口
  • 重寫下面兩個方法: toMessage:java對象轉換為Message fromMessage:Message對象轉換為java對象
  • Json轉換器:Jackson2JsonMessageConverter:可以進行Java對象的轉換功能
  • DefaultJackson2JavaTypeMapper映射器:可以進行java對象的映射關系
  • 自定義二進制轉換器:比如圖片類型、PDF、PPT、流媒體

在pom.xml文件中加入依賴:

    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.10.0</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.10.0</version>
    </dependency>

1.完成將byte數組轉換成String字符串

上節的適配器不變,在適配器中添加一個轉換器TextMessageConverter

        //1.適配器方式:默認是有自己的方法的名字的:handleMessage
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        //自己指定一個默認的方法名
        adapter.setDefaultListenerMethod("consumeMessage");
        //也可以加一個轉換器:從字節數組轉換為String
        adapter.setMessageConverter(new TextMessageConverter());
        container.setMessageListener(adapter);

TextMessageConverter代碼:

package com.dwz.spring.converter;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

public class TextMessageConverter implements MessageConverter {
    //將其它對象轉換成Message
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(object.toString().getBytes(), messageProperties);
    }
    
    //將Message對象轉換成String
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contentType = message.getMessageProperties().getContentType();
        if(null != contentType && contentType.contains("text")) {
            System.err.println("contentType:--String--" + contentType);
            return new String(message.getBody());
        }
        return message.getBody();
    }
}

此時我們適配器自定義的委托對象MessageDelegate的consumeMessage()接收的參數類型要與 fromMessage()返回的類型一致

MessageDelegate類如下:

public class MessageDelegate {
    public void consumeMessage(String messageBody) {
        System.err.println("consumeMessage默認方法,消息內容:String--" + messageBody);
    }
}

測試代碼:

    @Test
    public void testMessage02() {
        //創建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message = new Message("spring consumeMessage消息".getBytes(), messageProperties);
        rabbitTemplate.send("topic001", "spring.abc", message);
        
        rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
        
        rabbitTemplate.convertAndSend("topic002", "rabbit.amqp", "hello object message send!");
    }

2. 支持json格式的轉換器

添加Order和Packaged這兩個類

Order類:

package com.dwz.spring.entity;

import java.io.Serializable;
public class Order implements Serializable{
    private static final long serialVersionUID = 1L;

    private String id;
    
    private String name;
    
    private String content;

    public Order() {
        super();
        // TODO Auto-generated constructor stub
    }

    public Order(String id, String name, String content) {
        super();
        this.id = id;
        this.name = name;
        this.content = content;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
    
    
}

Packaged類:

package com.dwz.spring.entity;

import java.io.Serializable;
public class Packaged implements Serializable{
    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    private String id;
    
    private String name;
    
    private String description;

    public Packaged() {
        super();
        // TODO Auto-generated constructor stub
    }

    public Packaged(String id, String name, String description) {
        super();
        this.id = id;
        this.name = name;
        this.description = description;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }
    
}

設置適配器中setMessageConverter(jackson2JsonMessageConverter)轉換器參數

        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);

MessageDelegate類的consumeMessage()的入參修改為Map<String, Object>如下:

    public void consumeMessage(Map<String, Object> messageBody) {
        System.err.println("consumeMessage的map方法,消息內容:" + messageBody);
    }

測試代碼:

    /**
     *     支持json格式的轉換器
     * @throws JsonProcessingException
     */
    @Test
    public void testSendJsonMessage() throws JsonProcessingException {
        Order order = new Order();
        order.setId("001");
        order.setName("消息訂單");
        order.setContent("描述信息");
        
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        System.err.println("order 4 json:" + json);
        
        MessageProperties messageProperties = new MessageProperties();
        //這里一定要修改contentType為application/json
        messageProperties.setContentType("application/json");
        
        Message message = new Message(json.getBytes(), messageProperties);
        rabbitTemplate.send("topic001", "spring.order", message);
    }

 3.支持java對象轉換

更改適配器部分代碼,完成Jackson2JsonMessageConverter轉換器的DefaultJackson2JavaTypeMapper設置

        //1.2 DefaultJackson2JavaTypeMapper 和 Jackson2JsonMessageConverter 支持java對象轉換
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
        //如果使用RabbitMQ默認的轉換方式,並不會涉及到本章遇到的信任package問題,如果想自定義消息轉換並且使用DefaultClassMapper作為映射,
        //肯定會出現信任package的問題,所以如果需要自定義轉換的小伙伴,記住要設置trustedPackages。
        javaTypeMapper.addTrustedPackages("com.dwz.spring.entity");
        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);

MessageDelegate類的consumeMessage()的入參修改為Order對象如下:

    public void consumeMessage(Order order) {
        System.err.println("order對象,消息內容, id:" + order.getId()
                          +", name:" + order.getName()
                          +", content:" + order.getContent());
    }

測試代碼:

    /**
     * json與java對象之間的轉換
     * @throws JsonProcessingException
     */
    @Test
    public void testSendJavaMessage() throws JsonProcessingException {
        Order order = new Order();
        order.setId("001");
        order.setName("消息訂單");
        order.setContent("描述信息");
        
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        System.err.println("order 4 json:" + json);
        
        MessageProperties messageProperties = new MessageProperties();
        //這里一定要修改contentType為application/json
        messageProperties.setContentType("application/json");
        messageProperties.getHeaders().put("__TypeId__", "com.dwz.spring.entity.Order");
        
        Message message = new Message(json.getBytes(), messageProperties);
        rabbitTemplate.send("topic001", "spring.order", message);
    }

4.支持java對象多映射轉換

更改適配器部分代碼,加入多個typeid和對象的映射

        //1.3 DefaultJackson2JavaTypeMapper 和 Jackson2JsonMessageConverter 支持java對象多映射轉換
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
        Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();    
        idClassMapping.put("order", Order.class);
        idClassMapping.put("packaged", Packaged.class);
        javaTypeMapper.setIdClassMapping(idClassMapping);
        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);

MessageDelegate類的consumeMessage()添加入參為Order和Packaged對象的兩個重載方法如下:

    public void consumeMessage(Order order) {
        System.err.println("order對象,消息內容, id:" + order.getId()
                          +", name:" + order.getName()
                          +", content:" + order.getContent());
    }
    
    public void consumeMessage(Packaged pack) {
        System.err.println("Packaged對象,消息內容, id:" + pack.getId()
                          +", name:" + pack.getName()
                          +", description:" + pack.getDescription());
    }

測試代碼:

要在Headers中添加

messageProperties2.getHeaders().put("__TypeId__", "packaged");
    @Test
    public void testSendMappingMessage() throws JsonProcessingException {
        Order order = new Order();
        order.setId("001");
        order.setName("訂單消息");
        order.setContent("訂單描述信息");
        
        ObjectMapper mapper = new ObjectMapper();
        String json1 = mapper.writeValueAsString(order);
        System.err.println("order 4 json:" + json1);
        
        MessageProperties messageProperties = new MessageProperties();
        //這里一定要修改contentType為application/json
        messageProperties.setContentType("application/json");
        messageProperties.getHeaders().put("__TypeId__", "order");
        
        Message message = new Message(json1.getBytes(), messageProperties);
        rabbitTemplate.send("topic001", "spring.order", message);
        
        Packaged packaged = new Packaged();
        packaged.setId("002");
        packaged.setName("包裹消息");
        packaged.setDescription("包裹描述信息");
        
        String json2 = mapper.writeValueAsString(packaged);
        System.err.println("packaged 4 json:" + json2);
        
        MessageProperties messageProperties2 = new MessageProperties();
        //這里一定要修改contentType為application/json
        messageProperties2.setContentType("application/json");
        messageProperties2.getHeaders().put("__TypeId__", "packaged");
        
        Message message2 = new Message(json2.getBytes(), messageProperties2);
        rabbitTemplate.send("topic001", "spring.packaged", message2);
    }

5.全局轉換器:convert

更改適配器部分代碼,加入全局轉換器

        //1.4 全局轉換器:convert
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        
        //全局轉換器
        ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
        
        TextMessageConverter textConvert = new TextMessageConverter();
        convert.addDelegate("text", textConvert);
        convert.addDelegate("html/text", textConvert);
        convert.addDelegate("xml/text", textConvert);
        convert.addDelegate("text/plain", textConvert);
        
        Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
        convert.addDelegate("json", jsonConverter);
        convert.addDelegate("application/json", jsonConverter);
        
        ImageMessageConverter imageConverter = new ImageMessageConverter();
        convert.addDelegate("image/png", imageConverter);
        convert.addDelegate("image", imageConverter);
        
        PDFMessageConverter pdfConverter = new PDFMessageConverter();
        convert.addDelegate("application/pdf", pdfConverter);
        
        adapter.setMessageConverter(convert);
        container.setMessageListener(adapter);

MessageDelegate類的consumeMessage()添加方法

    public void consumeMessage(File file) {
        System.err.println("文件對象,消息內容, " + file.getName());
    }

添加自定義轉換器ImageMessageConverter和PDFMessageConverter

package com.dwz.spring.converter;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.UUID;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.util.FileCopyUtils;

public class ImageMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.err.println("---------------Image MessageConverter-------------------");
        Object _extName = message.getMessageProperties().getHeaders().get("extName");
        String extName = _extName == null ? "png" : _extName.toString();
        
        byte[] body = message.getBody();
        String fileName = UUID.randomUUID().toString();
        String path = "D:\\dwz_temp\\" + fileName + "." + extName;
        File f = new File(path);
        System.out.println(path);
        
        try {
            Files.copy(new ByteArrayInputStream(body), Paths.get(path), StandardCopyOption.REPLACE_EXISTING);
//            FileCopyUtils.copy(body, f);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return f;
    }
    
}
package com.dwz.spring.converter;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

public class PDFMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.err.println("---------------PDF MessageConverter-------------------");
        
        byte[] body = message.getBody();
        String fileName = UUID.randomUUID().toString();
        String path = "D:/dwz_temp/" + fileName + ".pdf";
        File f = new File(path);
        try {
            Files.copy(new ByteArrayInputStream(body), f.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return f;
    }
}

測試代碼:

圖片轉換測試

    @Test
    public void testSendImgMessage1() throws IOException {
        byte[] body = Files.readAllBytes(Paths.get("C:/Users/Administrator/Pictures/Saved Pictures/img02/dwz.png"));
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("image/png");
        messageProperties.getHeaders().put("extName", "png");
        
        Message message = new Message(body, messageProperties);
        rabbitTemplate.send("", "image_queue", message);
    }

pdf文件轉換測試

    @Test
    public void testSendPDFMessage2() throws IOException {
        byte[] body = Files.readAllBytes(Paths.get("F:\\dwz\\my\\2019全新Java學習路線圖.pdf"));
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/pdf");
        
        Message message = new Message(body, messageProperties);
        rabbitTemplate.send("", "pdf_queue", message);
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM