MessageListenerAdapter


       消息監聽適配器:具體消息的消費邏輯,適配成 MessageListenerAdapter 定義的格式

  默認的方法簽名為:void handleMessage(byte[] messageBody)

/**
 * 自定義的消費監聽
 */
public class MessageDelegate {

    /**
     *這個handleMessage方法名要根據org.springframework.amqp.rabbit.listener.adapter包下的
     * MessageListenerAdapter.ORIGINAL_DEFAULT_LISTENER_METHOD的默認值來確定
    */
    
 public void handleMessage(byte[] messageBody) {
        System.err.println("默認方法, 消息內容:" + new String(messageBody));
    }

}

  handleMessage 方法名要根據 org.springframework.amqp.rabbit.listener.adapter 包下 MessageListenerAdapter.ORIGINAL_DEFAULT_LISTENER_METHOD 的默認值來確定

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        messageListenerContainer.setQueues(topicQueue(), topicQueueAgain());
        messageListenerContainer.setConcurrentConsumers(1);
        messageListenerContainer.setMaxConcurrentConsumers(5);
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString());

        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate());
        messageListenerContainer.setMessageListener(listenerAdapter);
        return messageListenerContainer;
}

  測試

    @Test
    public void sendMessage(){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc","自定義描述");
        messageProperties.getHeaders().put("type","自定義類型");
        Message message = new Message("hello message".getBytes(), messageProperties);

        rabbitTemplate.convertAndSend("topic.exchange","user.abc",message);
    }

  自定義消費方法

@Slf4j
public class MessageDelegate {

    public void handleMessage(byte[] messageBody) {
        log.info("默認方法 消息內容:" + new String(messageBody));
    }

    public void consumerMessage(byte[] messageBody){
        log.info("字節數據方法 消息的內容 "+new String(messageBody));
    }
}
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        messageListenerContainer.setQueues(topicQueue(), topicQueueAgain());
        messageListenerContainer.setConcurrentConsumers(1);
        messageListenerContainer.setMaxConcurrentConsumers(5);
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString());


        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate());
        listenerAdapter.setDefaultListenerMethod("consumerMessage");

        messageListenerContainer.setMessageListener(listenerAdapter);
        return messageListenerContainer;

byte[] 和 String 間的轉換器

public class TextMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object obj, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(obj.toString().getBytes(), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contentType = message.getMessageProperties().getContentType();
        if(contentType !=null && contentType.contains("text")){
            return new String(message.getBody());
        }
        return message.getBody();
    }
}

  添加String接受方法

    public void consumerMessage(String messageBody) {
        log.info("字符串方法 消息內容:" + messageBody);
    }
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        messageListenerContainer.setQueues(topicQueue(), topicQueueAgain());
        messageListenerContainer.setConcurrentConsumers(1);
        messageListenerContainer.setMaxConcurrentConsumers(5);
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString());

        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate());
        listenerAdapter.setDefaultListenerMethod("consumerMessage");
        listenerAdapter.setMessageConverter(new TextMessageConverter());
        messageListenerContainer.setMessageListener(listenerAdapter);
        return messageListenerContainer;
}

  測試

    @Test
    public void sendMessage(){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc","自定義描述");
        messageProperties.getHeaders().put("type","自定義類型");
        messageProperties.setContentType("text/plain");
        Message message = new Message("hello message".getBytes(), messageProperties);

        rabbitTemplate.convertAndSend("topic.exchange","user.abc",message);
    }

  不同隊列由不同方法消費

@Slf4j
public class MessageDelegate {

    public void method1(String messageBody) {
        log.info("method1收到的消息:" + messageBody);
    }

    public void method2(String messageBody) {
        log.info("method2收到的消息:" + messageBody);
    }
}
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        messageListenerContainer.setQueues(topicQueue(), topicQueueAgain());
        messageListenerContainer.setConcurrentConsumers(1);
        messageListenerContainer.setMaxConcurrentConsumers(5);
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString());

        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate());
        listenerAdapter.setMessageConverter(new TextMessageConverter());
        Map<String,String> queueOrTagToMethod=new HashMap<>();
        queueOrTagToMethod.put("topic.queue","method1");
        queueOrTagToMethod.put("topic.again","method2");

        listenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethod);
        messageListenerContainer.setMessageListener(listenerAdapter);
        return messageListenerContainer;
}

  發送JSON使用Map接受消息

@Slf4j
public class MessageDelegate {

    public void consumerMessage(Map messageBody) {
        log.info("消息的內容:" + messageBody);
    }
}

  添加消息轉換器

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        messageListenerContainer.setQueues(topicQueue(), topicQueueAgain());
        messageListenerContainer.setConcurrentConsumers(1);
        messageListenerContainer.setMaxConcurrentConsumers(5);
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString());

        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate());
        listenerAdapter.setDefaultListenerMethod("consumerMessage");
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        listenerAdapter.setMessageConverter(jackson2JsonMessageConverter);
        messageListenerContainer.setMessageListener(listenerAdapter);
        return messageListenerContainer;
}

  測試

    @Test
    public void sendJSONMessage(){
        Order order = new Order();
        order.setId("0001");
        order.setName("放飛夢想");
        order.setContent("不服就干");
        String str = JSON.toJSONString(order);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message message = new Message(str.getBytes(), messageProperties);

        rabbitTemplate.convertAndSend("topic.exchange","user.abc",message);
    }

  JSON轉換成JAVA對象

  處理方法

    public void consumerMessage(Order order) {
        log.info("order對象,消息的內容,id:" + order.getId() + "name:" + order.getName() + " content:" + order.getContent());
    }

  配置DefaultJackson2JavaTypeMapper將JSON轉為JAVA對象

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        messageListenerContainer.setQueues(topicQueue(), topicQueueAgain());
        messageListenerContainer.setConcurrentConsumers(1);
        messageListenerContainer.setMaxConcurrentConsumers(5);
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString());

        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate());
        listenerAdapter.setDefaultListenerMethod("consumerMessage");
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
        javaTypeMapper.setTrustedPackages("com.smart.domain");
        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
        listenerAdapter.setMessageConverter(jackson2JsonMessageConverter);
        messageListenerContainer.setMessageListener(listenerAdapter);
        return messageListenerContainer;
}

  pdf轉換器

@Slf4j
public class PDFMessageConverter implements MessageConverter {
    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error ! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        log.info("-----------pdfMessageConverter---------------");
        byte[] body = message.getBody();
        String fileName = UUID.randomUUID().toString();
        String path = "d:/test/" + fileName + ".pdf";
        File file = new File(path);

        try {
            Files.copy(new ByteArrayInputStream(body), file.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return file;
    }
}

  圖片轉換器

@Slf4j
public class ImageMessageConverter implements MessageConverter {
    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error ! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        log.info("------------imageMessageConverter-------------------");

        Object _extName = message.getMessageProperties().getHeaders().get("extName");
        String extName = _extName == null ? "png" : _extName.toString();
        byte[] messageBody = message.getBody();
        String fileName = UUID.randomUUID().toString();
        String filePrefix = "d:/test/";
        File fileDir = new File(filePrefix);
        if (!fileDir.exists()) {
            fileDir.mkdirs();
        }
        String path = filePrefix + fileName + "." + extName;
        File file = new File(path);
        try {
            Files.copy(new ByteArrayInputStream(messageBody), file.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return file;
    }
}

  整合多個轉化器

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        messageListenerContainer.setQueues(topicQueue(), topicQueueAgain());
        messageListenerContainer.setConcurrentConsumers(1);
        messageListenerContainer.setMaxConcurrentConsumers(5);
        messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        messageListenerContainer.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString());

        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MessageDelegate());
        listenerAdapter.setDefaultListenerMethod("consumerMessage");
        ContentTypeDelegatingMessageConverter messageConverter = new ContentTypeDelegatingMessageConverter();

        ImageMessageConverter imageMessageConverter = new ImageMessageConverter();
        messageConverter.addDelegate("image/png", imageMessageConverter);
        messageConverter.addDelegate("image", imageMessageConverter);

        PDFMessageConverter pdfMessageConverter = new PDFMessageConverter();
        messageConverter.addDelegate("application/pdf", pdfMessageConverter);

        listenerAdapter.setMessageConverter(messageConverter);
        messageListenerContainer.setMessageListener(listenerAdapter);
        return messageListenerContainer;
    }

}

  定義文件的消費方法

public class MessageDelegate {

    public void consumeMessage(File file) {
        System.err.println("文件對象 方法, 消息內容:" + file.getName());
    }

}

  測試

    @Test
    public void sendExtMessage() throws IOException {
        byte[] body = Files.readAllBytes(Paths.get("d:/", "cc.jpg"));
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("image/png");
        messageProperties.getHeaders().put("extName","png");
        Message message = new Message(body, messageProperties);
        rabbitTemplate.send("topic.exchange","user.ddd",message);
    }

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM