Rabbitmq基本使用


基本概念

channel:消息通道
exchage:消息路由規則,四種模式(topic、direct、fanout、header)

direct:默認,根據routingKey完全匹配,好處是先匹配再發送
topic:根據綁定關鍵字通配符規則匹配、比較靈活
fanout:不需要指定routingkey,相當於群發
header:不太常用,可以自定義匹配規則

queue:消息存儲
routerKey:消息路由關鍵字(發送的時候成為bindingkey,接收成為routingKey)


隊列的概念(生產者消費者啟動報錯大多數都是這幾個不匹配導致)
durable:持久化到硬盤
exclusive:唯一性
autoDelete:自動刪除

    /**
     * 第二個參數:queue的持久化是通過durable=true來實現的。
     * 第三個參數:exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次申明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:
1. 排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一連接創建的排他隊列;
    2.“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同;
3.即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用於一個客戶端發送讀取消息的應用場景。 * 第四個參數:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。 *
@param * @return * @Author zxj */ @Bean public Queue queue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 25000);//25秒自動刪除 Queue queue = new Queue("topic.messages", true, false, true, arguments); return queue; }

 

@Configuration
public class RabbitConfig {

    // 發送消息的格式轉換器
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    // 接收消息的格式轉換器
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

    // 信道配置,此地使用direct模式
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(MQConstant.EXCHANGE, true, false);
    }

    // 配置隊列規則屬性 例如保活時間 持久化 是否自動刪除等
    @Bean
    public Queue queue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl", 25000);//25秒自動刪除
        Queue queue = new Queue(MQConstant.QUEUE_NAME, true, false, true, arguments);
        return queue;
    }

    // 綁定隊列和exchange
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.QUEUE_NAME);
    }

}

如何發送
@Autowired
private RabbitTemplate template;
    //convertAndSend和send的區別是這個convert更方便使用,可以傳object進去
    template.convertAndSend(MQConstant.EXCHANGE, bindingKey, msg);

//如何接收,注意隊列名稱、exchange名稱、routingKey的指定。
//注意:隊列的消息只要被一個消費者匹配消費后就不存在了
@Component
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding(value = @Queue(value = "default_queue", durable = "true"), exchange = @Exchange(value = "default_exchange", type = ExchangeTypes.TOPIC), key = "meeting"))
@Log4j
public class RabbitMqListener {
    @RabbitHandler
    public void processMessage(MqMsg message) {
        log.error( message);
    }
}

 

交換機持久化設置: exchange 持久化,在聲明時指定 durable為true

隊列持久化設置:queue 持久化,在聲明時指定 durable 為true

 

 

Rabbitmq常用配置

  spring.rabbitmq.addresses指定client連接到的server的地址,多個以逗號分隔.
  spring.rabbitmq.dynamic是否創建AmqpAdmin bean. 默認為: true)
  spring.rabbitmq.host指定RabbitMQ host.默認為: localhost)
  spring.rabbitmq.listener.acknowledge-mode指定Acknowledge的模式.
  spring.rabbitmq.listener.auto-startup是否在啟動時就啟動mq,默認: true)
  spring.rabbitmq.listener.concurrency指定最小的消費者數量.
  spring.rabbitmq.listener.max-concurrency指定最大的消費者數量.
  spring.rabbitmq.listener.prefetch指定一個請求能處理多少個消息,如果有事務的話,必須大於等於transaction數量.
  spring.rabbitmq.listener.transaction-size指定一個事務處理的消息數量,最好是小於等於prefetch的數量.
  spring.rabbitmq.password指定broker的密碼.
  spring.rabbitmq.port指定RabbitMQ 的端口,默認: 5672)
  spring.rabbitmq.requested-heartbeat指定心跳超時,0為不指定.
  spring.rabbitmq.ssl.enabled是否開始SSL,默認: false)
  spring.rabbitmq.ssl.key-store指定持有SSL certificate的key store的路徑
  spring.rabbitmq.ssl.key-store-password指定訪問key store的密碼.
  spring.rabbitmq.ssl.trust-store指定持有SSL certificates的Trust store.
  spring.rabbitmq.ssl.trust-store-password指定訪問trust store的密碼.
  spring.rabbitmq.username指定登陸broker的用戶名.
  spring.rabbitmq.virtual-host指定連接到broker的Virtual host.
  spring.rabbitmq.publisher-confirms=true 開啟發送確認
  spring.rabbitmq.publisher-returns=true 開啟發送失敗退回

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM