springboot之rabbitmq


   一、RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務器是用Erlang語言編寫的,而集群和故障轉移是構建在開放電信平台框架上的。所有主要的編程語言均有與代理接口通訊的客戶端

  二、目錄結構

  

  三、是使用springboot搭建rabbitmq我們需要基本的依賴包

  <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

  四、這里我們主要介紹6中模式的配置和使用

  1)默認的模式(這種方式不是沒有exchange,而是使用默認的exchange。默認為Direct)

  

           

  聲明方式:

/**
 * 第一種:使用默認的交換機(direct模式)
 */
@Configuration
public class QueueConfiguration {

    /**
     * 聲明隊列:隊列有五個參數(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
     * name:隊列名稱
     * durable:持久性
     * exclusive:排他性(獨立性)
     * autoDelete:自動刪除
     * arguments:其他相關參數
     * @return
     */
    @Bean
    public Queue queue() {
        return new Queue("queue", false);
    }
}

  (1)簡單:只有一個listener在監聽queue,這樣消息只能傳到這個隊列

  (2)進階:如果存在多個listener監聽這個queue,rabbitmq會優雅的平均分配給listener

  (3)arguments(參數配置)

    x-message-ttl(Time-To-Live):消息存活時間,單位毫秒

    x-expires:隊列沒有訪問超時時,自動刪除(包含沒有消費的消息),單位毫秒。

    x-max-length:限制隊列最大長度(新增后擠出最早的),單位個數。

    x-max-length-bytes :限制隊列最大容量

    x-dead-letter-exchange:死信交換機,將刪除/過期的數據,放入指定交換機。

    x-dead-letter-routing-key:死信路由,將刪除/過期的數據,放入指定routingKey

    x-max-priority:隊列優先級。

    x-queue-mode:對列模式,默認lazy(將數據放入磁盤,消費時放入內存)。

    x-queue-master-locator:鏡像隊列

  2)主題模式/通配符模式(topicExchange)  

  

  聲明方式:

/**
 * 第二種:topic交換機模式(主題模式)
 */
@Configuration
public class TopicExchangeConfiguration {

    @Bean
    public Queue queue1() {
        return new Queue("queue1", false);
    }

    @Bean
    public Queue queue2() {
        return new Queue("queue2", false);
    }

    /**
     * 聲明交換機類型:存在4個參數(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
     * 這里的參數基本和queue一樣的理解
     * @return
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic", false, false);
    }

    /**
     * 綁定隊列到交換機上面
     * @return
     */
    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(topicExchange()).with("*.topic");
    }

    /**
     * 這里存在兩種匹配符
     * *:代表一個單位的字符(1.topic)
     * #:代表多個單位的字符(2.2.topic)
     * @return
     */
    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue2()).to(topicExchange()).with("#.topic");
    }
}

  通配符:

    *:代表一個單位的字符(1.topic)

    #:代表多個單位的字符(2.2.topic)

  3)直連模式(directExchange)

  

  聲明方式:

/**
 * 第三種:Direct模式(直連模式,默認交換機也是這種類型)
 */
@Configuration
public class DirectExchangeConfiguration {

    @Bean
    public Queue queue3() {
        return new Queue("queue3", false);
    }

    @Bean
    public Queue queue4() {
        return new Queue("queue4", false);
    }

    /**
     * 參數和topic的交換機類型一樣
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct", false, false);
    }

    @Bean
    public Binding binding3() {
        return BindingBuilder.bind(queue3()).to(directExchange()).with("direct.3");
    }

    @Bean
    public Binding binding4() {
        return BindingBuilder.bind(queue4()).to(directExchange()).with("direct.4");
    }
}

  4)發布/訂閱模式(fanout模式)

  

  聲明方式:

/**
 * 第四種:fanout模式(發布/訂閱模式)
 */
@Configuration
public class FanoutExchangeConfiguration {

    @Bean
    public Queue queue5() {
        return new Queue("queue5", false);
    }

    @Bean
    public Queue queue6() {
        return new Queue("queue6", false);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout", false, false);
    }

    /**
     * 這里的綁定不需要routingKey
     * @return
     */
    @Bean
    public Binding binding5() {
        return BindingBuilder.bind(queue5()).to(fanoutExchange());
    }

    /**
     * 相比於topic,fanout只能全部發送,topic可以更具匹配規則進行
     * @return
     */
    @Bean
    public Binding binding6() {
        return BindingBuilder.bind(queue6()).to(fanoutExchange());
    }
}

  說明:fanout模式是不需要綁定routingKey,這種方式也是廣播形式的主要方式

  5)消息頭模式(headers模式)

/**
 * 第五種:headers模式(消息頭模式)
 */
@Configuration
public class HeadersExchangeConfiguration {

    @Bean
    public Queue queue7() {
        return new Queue("queue7", false);
    }

    @Bean
    public Queue queue8() {
        return new Queue("queue8", false);
    }

    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("headers", false, false);
    }

    /**
     * 確認header是否存在
     * @return
     */
    @Bean
    public Binding binding7() {
        return BindingBuilder.bind(queue7()).to(headersExchange()).where("header").exists();
    }

    @Bean
    public Binding binding8() {
        return BindingBuilder.bind(queue8()).to(headersExchange()).where("header").exists();
    }
}

  說明:這種方式主要是限定headers,方便通過其他方式攜帶數據。

  6)rpc:

  

  聲明方式(大同小異):

@Configuration
public class RpcConfiguration {

    @Bean
    public Queue rpc() {
        return new Queue("rpc", false);
    }

    @Bean
    public DirectExchange rpcExchange() {
        return new DirectExchange("rpcExchange", false, false);
    }

    @Bean
    public Binding rpcBinding() {
        return BindingBuilder.bind(rpc()).to(rpcExchange()).with("rpcRoutingKey");
    }
}

  lisntener:

@Component
@RabbitListener(queues = "rpc")
public class RpcListener {

    @RabbitHandler
    public String rpcListener(String text, Channel channel, Message message) throws IOException {
        System.out.println("rpcServer:" + text);
        MessageProperties messageProperties = message.getMessageProperties();
        channel.basicAck(messageProperties.getDeliveryTag(), false);
        return "success";
    }
}

  注意這里是有返回數據的。

  客戶端(publish)

  這里推送存在兩種方式,同步和異步

  a、同步:主題這里默認超時是5秒,可以通過rabbitTemplate設置setReceiveTimeout超時時間。

     String message = (String) rabbitTemplate.convertSendAndReceive("rpcExchange", "rpcRoutingKey", time);
        System.out.println("rpcClient:" + message);

  b、異步:

     AsyncRabbitTemplate.RabbitConverterFuture<Object> future =
                asyncRabbitTemplate.convertSendAndReceive("rpcExchange", "rpcRoutingKey", time);
        System.out.println("rpcClient:" + future.get());

  注意:AsyncRabbitTemplate是需要手動去配置的。並且需要配置AbstractMessageListenerContainer

  如果沒有配置AbstractMessageListenerContainer,則需要配置amq.rabbitmq.reply-to(amq.*需要權限才可以配置

  這里是spring對rabbitmq在源碼部分對其進行的判斷,如果不理解可以自己跟convertSendAndReceive函數

    @Bean
    public AsyncRabbitTemplate asyncRabbitTemplate(DirectMessageListenerContainer container) {
        AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate, container);
        return asyncRabbitTemplate;
    }

    @Bean
    public DirectMessageListenerContainer directMessageListenerContainer(ConnectionFactory connectionFactory) {
        DirectMessageListenerContainer container = new DirectMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("rpc");
        //這里我改成手動了,但是沒有好的方式去獲取channel,然后ack.所以我這里使用的自動。
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //這里可以使用默認的執行器:SimpleAsyncTaskExecutor(但是,這里不是采用的線程池而是直接new Thread)
        container.setTaskExecutor(new ThreadPoolExecutor(5, 60, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3000)));
        return container;
    }

   五、消息發送者

  1)yaml配置

server:
  port: 9001
spring:
  rabbitmq:
    host: 192.168.5.100
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true
    publisher-returns: true
    template:
      #參數意義:true當沒有合適的queue直接返回到ReturnCallback
      #         false沒有合適的直接丟棄
      mandatory: true

  2)如果配置了publisher-confirms、publisher-returns為true.並且加入template.mandatory為true。可以配置如下

@Component
public class RabbitmqPublisherConfiguration {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public RabbitTemplate rabbitTemplate() {
        //1、設置publisher-confirms為true
        //2、發布確認,只是在exchange范圍
        //3、如果沒有exchange,則false.如果過為true,則說明發送到exchange成功
        rabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
            if (ack) {
                System.out.println("send success");
            } else {
                System.out.println("send fail");
            }
        });
        //1、設置publisher-returns為true
        //2、如果沒有發布成功,則將消息返回。當然這只是在接受消息層,不是exchange。
        rabbitTemplate.setReturnCallback((message, id, reason, exchange, routingKey) -> {
            StringBuffer buffer = new StringBuffer();
            buffer.append("----------------------------------------\n");
            buffer.append("接受消息: {0},失敗!\n");
            buffer.append("消息ID: {1}\n");
            buffer.append("原因: {2}\n");
            buffer.append("exchange: {3}\n");
            buffer.append("routingKey: {4}\n");
            buffer.append("----------------------------------------");
            MessageFormat messageFormat = new MessageFormat(buffer.toString());
            String text = messageFormat.format(new Object[]{new String(message.getBody()), id, reason, exchange, routingKey});
            System.out.println(text);

        });
        return rabbitTemplate;
    }
}

  a、ConfirmCallback:只是針對exchange,如果消息可以通過exchange,則發送成功。反之則失敗

  b、ReturnCallback:這個只是針對於routingKey,是否通過。如果這個routingKey不存在,則將消息返回。反之則發送。

  3)消息發送

@Component
@EnableScheduling
public class RabbitmqPublisher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Scheduled(cron = "0/15 * * * * ?")
    public void execute() {
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        String time = formatter.format(LocalDateTime.ofInstant(Instant.now(), ZoneId.systemDefault()));
        //默認
        rabbitTemplate.convertAndSend("queue", time);
        //主題模式
        rabbitTemplate.convertAndSend("topic", "1.topic", time);
        rabbitTemplate.convertAndSend("topic", "2.2.topic", time);
        //直連模式
        rabbitTemplate.convertAndSend("direct", "direct.3", time);
        rabbitTemplate.convertAndSend("direct", "direct.4", time);
        //廣播模式
        rabbitTemplate.convertAndSend("fanout", "", time);
        //headers模式
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("header", "header");
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        Message message = MessageBuilder.withBody(time.getBytes()).andProperties(messageProperties).build();
        rabbitTemplate.convertAndSend("headers", "", message);
    }
}

  六、消息監聽者

  1)yaml配置

server:
  port: 9002
spring:
  rabbitmq:
    host: 192.168.5.100
    port: 5672
    username: guest
    password: guest
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual

  說明:如果配置acknowledge-mode: manual(手動模式),則需要手動確認消息。如果沒有則不需要手動確認,否則會報錯。

  需要在每個listener下面加上

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

  listener的對手動對消息的處理方式有3種:Ack、Nack、Reject

  Ack:確認收到消息

  Nack:不確認收到消息

  Reject:拒接消息

  2)listener

@Component
public class RabbitmqListener {

    //1.默認隊列
    @RabbitListener(queues = "queue")
    public void queueDouble1(String text, Channel channel, Message message) throws IOException {
        System.out.println("queueDouble1:" + text);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = "queue")
    public void queueDouble2(String text, Channel channel, Message message) throws IOException {
        System.out.println("queueDouble2:" + text);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    //2.主題隊列
    @RabbitListener(queues = "queue1")
    public void queue1(String text, Channel channel, Message message) throws IOException {
        System.out.println("queue1:" + text);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = "queue2")
    public void queue2(String text, Channel channel, Message message) throws IOException {
        System.out.println("queue2:" + text);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    //3.直連隊列
    @RabbitListener(queues = "queue3")
    public void queue3(String text, Channel channel, Message message) throws IOException {
        System.out.println("queue3:" + text);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = "queue4")
    public void queue4(String text, Channel channel, Message message) throws IOException {
        System.out.println("queue4:" + text);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    //4.廣播隊列
    @RabbitListener(queues = "queue5")
    public void queue5(String text, Channel channel, Message message) throws IOException {
        System.out.println("queue5:" + text);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = "queue6")
    public void queue6(String text, Channel channel, Message message) throws IOException {
        System.out.println("queue6:" + text);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    //5.消息頭隊列
    @RabbitListener(queues = "queue7")
    public void queue7(String text, Channel channel, Message message) throws IOException {
        System.out.println("queue7:" + text);
        System.out.println("header7:" + message.getMessageProperties().getHeaders().get("header"));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = "queue8")
    public void queue8(String text, Channel channel, Message message) throws IOException {
        System.out.println("queue8:" + text);
        System.out.println("header8:" + message.getMessageProperties().getHeaders().get("header"));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

  2)也可以寫成,另外一種方式

@Component
@RabbitListener(queues = "queue")
public class RabbitmqHandlerListener {

    @RabbitHandler
    public void messageHandler(String text, Channel channel, Message message) throws IOException {
        System.out.println("queueDouble3:" + text);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

  七、測試

  

  

  1)默認:

  

  均勻的分配到每一個節點

  2)主題(topic):

  

  只要符合規則就接受

  3)直連(direct)

  

  和模式方式一樣,一對一。多個均勻分布

  4)廣播(fanout)

  

  5)消息頭(headers)

  

  八、當然例子也可以參考官網:https://www.rabbitmq.com/getstarted.html

    九、源碼:https://github.com/lilin409546297/springboot-rabbitmq


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM