RabbitMQ(二)核心組件介紹


 

前言

本文主要介紹AMQP核心組件:

  • RabbitAdmin
  • SpringAMQP 聲明
  • RabbitTemplate
  • SimpleMessageListenerContainer
  • MessageListenerAdapte
  • MessageConverter

 

RabbitAdmin

RabbitAdmin類是針對RabbitMQ管理端進行封裝操作,比如:Exchange操作、Queue操作,Binding綁定操作等,操作起來簡單便捷!

  • 用於聲明RabbitMQ相關配置、操作RabbitMQ
  • autoStartup設為true:表示Spring容器啟動時自動初始化RabbitAdmin
  • 底層實現:從Spring容器中獲取Exchange、Binding、RoutingKey以及Queue的@Bean聲明
  • rabbitTemplate的execute方法執行對應的聲明等操作
@Configuration
@ComponentScan({"com.orcas.spring"})
public class RabbitMQConfig {

   /**
     * 創建 RabbitMQ 連接工廠
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("192.168.58.129:5672");
        connectionFactory.setUsername("orcas");
        connectionFactory.setPassword("1224");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

   /**
     * 創建 RabbitAdmin 類,這個類封裝了對 RabbitMQ 管理端的操作! 比如:Exchange 操作,Queue 操作,Binding 綁定 等
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有設置為 true,spring 才會加載 RabbitAdmin 這個類
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
} 
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    /**
     * 交換機操作
     */
    @Test
    public void testAdminExchange() {
        // 創建交換機, 類型為 direct,durable 參數表示是否持久化
        rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));

        // 創建交換機,類型為 topic,durable 參數表示是否持久化
        rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));

        // 創建交換機,類型為 fanout,durable 參數表示是否持久化
        rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
    }

    /**
     * 隊列操作
     */
    @Test
    public void testAdminQueue() {
        // 創建隊列
        // durable 參數表示是否持久化
        rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
        // 創建隊列
        // durable 參數表示是否持久化
        rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
        // 創建隊列
        // durable 參數表示是否持久化
        rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
    }

    /**
     * 綁定操作
     */
    @Test
    public void testAdminBinding() {
        /**
         * 兩種寫法都可以,都選擇綁定 隊列 或者 交換機
         */

        /**
         * destination 需要綁定隊列的名字
         * DestinationType 綁定類型,
         *          Binding.DestinationType.QUEUE 表示是隊列綁定
         *          Binding.DestinationType.EXCHANGE 表示交換機綁定
         *
         * exchange 交換機名稱
         * routingKey 路由key
         * arguments 額外參數(比如綁定隊列,可以設置 死信隊列的參數)
         */
        rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "routing_direct", new HashMap<>()));

        /**
         * 鏈式寫法
         */
        rabbitAdmin.declareBinding(
                BindingBuilder.bind(new Queue("test.topic.queue", false)) // 直接創建隊列
                        .to(new TopicExchange("test.topic", false, false)) // 直接創建交換機,並建立關聯關系
                        .with("routing_topic") // 指定路由 key
        );

        /**
         * 鏈式寫法
         *
         * FanoutExchange 交換機,和路由key沒有綁定關系,因為他是給交換機內所有的 queue 都發送消息!
         */
        rabbitAdmin.declareBinding(
                BindingBuilder.bind(new Queue("test.fanout.queue", false)) // 直接創建隊列
                        .to(new FanoutExchange("test.fanout", false, false)) // 直接創建交換機,並建立關聯關系
        );
    }

    /**
     * 其他操作-清空隊列
     */
    @Test
    public void testAdminOther() {// noWait 參數是否需要等待: true 表示需要,false 表示不需要
        //      也就是需要清空的時候,我需要等待一下,在清空(會自動等待幾秒鍾)
        rabbitAdmin.purgeQueue("test.fanout.queue", false);
    }
} 

 

Spring AMQP 聲明

  • exchange
    • TopicExchange
    • FanoutExchange
    • DirectExchange
  • queue
  • binding
    • BindingBuilder

spring使用@bean聲明exchange queue binding 例子如下:

 @Bean  
 public TopicExchange exchange001() { return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //隊列持久  } @Bean public Binding binding001() { return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } 

 

RabbitTemplate

消息模板

  • 與SpringAMQP整合的時候進行發送消息的關鍵類
  • 該類提供了豐富的發送消息方法,包括可靠性投遞方法,回調監聽消息接口 ConfirmCallback,返回值確認接口 ReturnCallback等等。同樣我們需要進行注入到spring容器中。然后進行使用。
  • 與Spring整合時需要實例化,但是與Springboot整合時不需要,在配置文件添加配置即可
  • rabbitTemplate.convertAndSend方法是主要的發送消息的方法
  • MessageProperties 用於構造消息體
  • MessagePostProcessor:消息發送之后對消息進行的設置

以下是RabbitTemplate實例化的例子:

 @Bean
 public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
     RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
     rabbitTemplate.setConfirmCallback(//);
     rabbitTemplate.setReturnCallback(//);
     return rabbitTemplate;
 }

以下是發送消息例子:

    @Test
    public void testSendMessage() throws Exception { //1 創建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc", "信息描述.."); messageProperties.getHeaders().put("type", "自定義消息類型.."); Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties); rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { System.err.println("------添加額外的設置---------"); message.getMessageProperties().getHeaders().put("desc", "額外修改的信息描述"); message.getMessageProperties().getHeaders().put("attr", "額外新加的屬性"); return message; } }); }

 

SimpleMessageListenerContainer

簡單消息監聽容器 功能:

  • 監聽多個隊列、自動啟動、自動聲明
  • 設置事務特性、事務管理器、事務屬性、事務容器、是否開啟事務、回滾消息
  • 設置消費者數量、最小最大數量、批量消費
  • 設置消息確認和自動確認模式、是否重回隊列、異常捕獲函數
  • 設置消費者標簽生成策略、是否獨占模式、消費者屬性等
  • 設置具體的監聽器、消息轉換器 message convert
  • SimpleMessageListenerContainer可以進行動態設置,比如在運行中的應用可以動態的修改其消費者的大小、接收消息的模式等,可以基於此開發rabbitmq自定義后台管控平台
屬性:
  • queues: 消費隊列
  • concurrentConsumers:當前消費者數
  • maxConcurrentConsumers:最大消費者並發
  • defaultRequeueRejected: 是否重回隊列,默認: false
  • acknowledgeMode:消息確認模型,默認:AUTO
  • exposeListenerChannel:
  • messageListener: 消息監聽
  • consumerTagStrategy:consumerTag生成策略
    /**
     * 簡單消息監聽容器
     * 配置完成后。可以在管控台看到消息者信息。 以及消費者標簽信息
     *
     * @param connectionFactory 鏈接工廠
     * @return SimpleMessageListenerContainer
     */
    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        //設置要監聽的隊列
        simpleMessageListenerContainer.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
        //初始化消費者數量
        simpleMessageListenerContainer.setConcurrentConsumers(1);
        //最大消費者數量
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
        //設置是否重回隊列[一般為false]
        simpleMessageListenerContainer.setDefaultRequeueRejected(false);
        //設置自動ack
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //設置channel 是否外露
         simpleMessageListenerContainer.setExposeListenerChannel(true);
        //設置消費端標簽的策略
        simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queueName) {
                return queueName + "_" + UUID.randomUUID().toString();
            }
        });
        //設置消息監聽 ChannelAwareMessageListener
        simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.out.println("----------消費者: " + msg);
            }
        });

        return simpleMessageListenerContainer;
    }

 

MessageListenerAdaper

消息監聽適配器

  • extends AbstractAdaptableMessageListener 消息listener
  • queueOrTagToMethodName 隊列標識與方法名稱組成的集合
  • defaultListenerMethod 默認監聽方法名稱
  • Delegate 委托對象:實際真實的委托對象
  • 可以一一進行隊列和方法名稱的匹配
  • 隊列和方法名稱綁定,即指定隊列里的消息會被綁定的方法所接受處理
/**
 * 通過`simpleMessageListenerContainer` 配置消息監聽適配器。 指向這個類
 */
public class MessageDelegate {
    /**
     * MessageListenerAdapter 默認指定接收消息的方法的名字就是 handleMessage .當然也可以手動設置
     *
     * @param messageBody message信息
     */
    public void handleMessage(byte[] messageBody) {
        System.err.println("默認方法,消息內容: " + new String(messageBody));
    }

    public void consumeMessage(byte[] messageBody) {
        System.err.println("字節數組方法, 消息內容:" + new String(messageBody));
    }

    public void consumeMessage(String messageBody) {
        System.err.println("字符串方法, 消息內容:" + messageBody);
    }
}



/**
 * spring amqp 消息轉換器
 *
 * @author yangHX
 * createTime  2019/4/6 12:28
 */
public class TextMessageConverter implements MessageConverter {

    /**
     * 將數據轉化為 message 類
     *
     * @param o                 要發送的數據
     * @param messageProperties 消息頭
     * @return Message
     * @throws MessageConversionException ex
     */
    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(o.toString().getBytes(), messageProperties);
    }

    /**
     * 將message轉換為想要的數據類型
     *
     * @param message message
     * @return Object
     * @throws MessageConversionException ex
     */
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {

        String contentType = message.getMessageProperties().getContentType();
        if (null != contentType && contentType.contains("text")) {
            return new String(message.getBody());
        }
        return message.getBody();
    }
}

//消息監聽適配器 只截取了一小段 /* * 適配器方式。 默認是有自己的方法名字。 handleMessage * 可以自己指定一個方法的名稱。 consumerMessage * 也可以添加一個轉換器: 從字節數組轉換為String */ MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate); //設定queue使用哪個adapter方法處理 Map<String,String> queueOrTagToMethodName = new HashMap<>(); queueOrTagToMethodName.put("queue001","method1"); queueOrTagToMethodName.put("queue002","method2"); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); //設置默認處理方法,默認處理方法是handleMessage adapter.setDefaultListenerMethod("handleMessage"); //設置消息轉換方式 adapter.setMessageConverter(new TextMessageConvert()); //消息監聽 container.setMessageListener(adapter); /** * 發送消息。測試轉換器和適配器 * <p> * 轉換器判斷contentType 將字節數組轉化為字符串 * 適配器將數據交給 MessageDelegate 的 consumeMessage 方法進行處理 */ @Test public void testMessage4Text() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plan"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.send("topic002", "rabbit.abc", message); }

 

MessageConverter

  • 我們在消息傳輸的時候,正常情況下消息體為二進制的數據方式進行傳輸。如果希望內部幫我們進行轉換,或者指定自定義的轉換器,就需要用到MessageConverter

  • 自定義常用轉換器 MessageConverter 一般來講都需要實現這個接口

  • 重寫下面兩個方法

    • toMessage : java 對象轉換為Message
    • fromMessage : Message對象轉換為java對象
  • 轉換器類型
    • json轉換器: jackson2JsonMessageConverter: 可以進行java對象的轉換功能
    • DefaultJackson2JavaTypeMapper映射器: 可以進行java對象的映射關系
    • 自定義二進制轉換器: 比如圖片類型、PDF,PPT, 流媒體

1、Json轉換器:Jackson2JsonMessageConverter

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
container.setMessageListener(jackson2JsonMessageConverter);

2、支持Java對象的轉換:DefaultJackson2JavaTypeMapper&Jackson2JsonMessageConverter

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
        
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper);
        adapter.setMessageConverter(jackson2JsonMessageConverter);

3、支持java對象多映射轉換:DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
        
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
// (標簽, 類的全路徑)將標簽和類進行綁定
idClassMapping.put("order", com.orcas.spring.entity.Order.class);
idClassMapping.put("packaged", com.orcas.spring.entity.Packaged.class);
        
javaTypeMapper.setIdClassMapping(idClassMapping);
        
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);

4、全局的轉換器:ContentTypeDelegatingMessageConverter

MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
        
//全局的轉換器:
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
        
TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
        
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
        
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
        
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
                
adapter.setMessageConverter(convert);
container.setMessageListener(adapter);

5、圖片轉換器:

public class ImageMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error ! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.err.println("-----------Image MessageConverter----------");
        // 獲取消息擴展屬性中的"extName"
        Object _extName = message.getMessageProperties().getHeaders().get("extName");
        // 若為空默認為png, 否則就獲取該擴展名
        String extName = _extName == null ? "png" : _extName.toString();
        
        byte[] body = message.getBody();
        String fileName = UUID.randomUUID().toString();
        // 該圖的路徑+圖片名
        String path = "d:/010_test/" + fileName + "." + extName;
        File f = new File(path);
        try {
            Files.copy(new ByteArrayInputStream(body), f.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return f;
    }

}

六、PDF轉換器:

public class PDFMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error ! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.err.println("-----------PDF MessageConverter----------");
        
        byte[] body = message.getBody();
        String fileName = UUID.randomUUID().toString();
        String path = "d:/010_test/" + fileName + ".pdf";
        File f = new File(path);
        try {
            Files.copy(new ByteArrayInputStream(body), f.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return f;
    }

}

七、測試代碼:

    @Test
    public void testSendExtConverterMessage() throws Exception {
//            byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png"));
//            MessageProperties messageProperties = new MessageProperties();
//            messageProperties.setContentType("image/png");
//            messageProperties.getHeaders().put("extName", "png");
//            Message message = new Message(body, messageProperties);
//            rabbitTemplate.send("", "image_queue", message);
        
            byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf"));
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("application/pdf");
            Message message = new Message(body, messageProperties);
            rabbitTemplate.send("", "pdf_queue", message);
    }

 

 

 

引用:

https://juejin.im/post/5d0309596fb9a07ebf4b6ad2

https://www.javatt.com/p/11158


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM