RabbitMQ模式,RabbitMQ和springboot整合,RabbitMQ全鏈路消息不丟失解決


一,RabbitMQ簡介

  RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務器是用Erlang語言編寫的,而集群和故障轉移是構建在開放電信平台框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫。

  優點:跨語言,高並發。

二,實際項目中作用

(一)任務異步處理

場景:用戶下單后,根據訂單信息進行減庫存以及增加積分。

傳統:必須要整個流程走完。

如今:下完單,用戶操作就可以結束了,不必等積分增加完。

(二) 應用程序解耦

場景:下單減庫存。

傳統模式的缺點:當前是基於同步調用方式,訂單服務與庫存服務處於耦合狀態,假設庫存服務狀態異常,則訂單服務在調用庫存服務時,也會出現失敗。

如今:訂單服務:將用戶訂單信息寫入訂單數據庫,並向消息隊列發送訂單消息,成功發送之后,向用戶返回下單成功。

庫存服務:監聽消息隊列,發現消息隊列中存在訂單消息,則取回,根據訂單信息進行庫存操作

此時兩個服務之前不存在耦合關系,兩個服務之間基於消息隊列完成消息傳遞。

(三)流量削峰

場景:秒殺或搶購活動,

傳統:很有可能因為出現瞬時流量暴增,導致后端服務無法承受這些流量,最終導致服務宕機。

如今:架設消息隊列,對流量進行限流計數。可以控制前端請求數量。

三,幾種常見模式案例

1.工作對列模式:一個producer生產者

work queues兩個消費端,兩個消費端共同消費同一個隊列中的消息。

應用場景:對於 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。

測試:

1、使用入門程序,啟動多個消費者。

2、生產者發送多個消息。

生產者:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class QuickProducer {
    public static final String QUEUE="helloworld";
    public static  void main(String[] args){
        Connection connection =null;
        Channel channel = null;

        try{

            //聲明連接工廠
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //設置ip
            connectionFactory.setHost("127.0.0.1");
            //設置端口
            connectionFactory.setPort(5672);
            //設置用戶名
            connectionFactory.setUsername("guest");
            //設置密碼
            connectionFactory.setPassword("guest");
            //設置虛擬主機
            //虛擬主機:當安裝好了一個rabbitMQ服務之后,內部可以去設置多個虛擬的rabbitmq,對於虛擬主機的設置是通過名稱進行划分的。且相互獨立
            connectionFactory.setVirtualHost("/");

            //聲明連接對象與通道對象
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();

            //聲明隊列
            /**
             * String queue, :隊列名稱
             * boolean durable, :隊列是否持久化。當為true,隊列信息會被保存到磁盤
             * boolean exclusive, :是否獨占此連接,當為true,則此連接中只有這一個隊列,當為false,可以存在多個
             * boolean autoDelete,:是否自動刪除,當為true,此隊列一旦沒有消費者監聽,則自動從rabbitmq中刪除,當為false,則不會
             * Map<String, Object> arguments:隊列的屬性設置
             */
            channel.queueDeclare(QUEUE,true,false,false,null);

            //發送消息
            /**
             * String exchange, :交換機
             * String routingKey, :路由key
             * BasicProperties props,:消息的屬性設置
             * byte[] body:被發送的消息
             */
            String message ="第三條消息";
            channel.basicPublish("",QUEUE,null,message.getBytes());

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

 

消費者1:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class QuickConsumer {
    public static final String QUEUE="helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = null;
        Channel channel = null;

        //聲明連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        //聲明連接對象與通道對象
        connection = connectionFactory.newConnection();
        channel = connection.createChannel();



        Consumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * String consumerTag, : 消費者標識
             * Envelope envelope,:信封
             * AMQP.BasicProperties properties,:默認屬性值
             * byte[] body:被消費的數據
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String value = new String(body);
                System.out.println("消費者一號"+value);
            }
        };

        //設置監聽
        /**
         * String queue,:被監聽的隊列名稱
         * boolean autoAck,:是否開啟自動應答。當為true, 。。當為false,需要開發人員手動的編碼去通知rabbitmq
         * Consumer callback:獲取消息的回調方法
         */
        channel.basicConsume(QUEUE,true,consumer);
    }
}

 

消費者2:

public class QuickConsumer2 {
    public static final String QUEUE="helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = null;
        Channel channel = null;

        //聲明連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        //聲明連接對象與通道對象
        connection = connectionFactory.newConnection();
        channel = connection.createChannel();



        Consumer consumer = new DefaultConsumer(channel){
            @Override
            /**
             * String consumerTag, : 消費者標識
             * Envelope envelope,:信封
             * AMQP.BasicProperties properties,:默認屬性值
             * byte[] body:被消費的數據
             */
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String value = new String(body);
                System.out.println("消費者二號"+value);
            }
        };

        //設置監聽
        /**
         * String queue,:被監聽的隊列名稱
         * boolean autoAck,:是否開啟自動應答。當為true, 。。當為false,需要開發人員手動的編碼去通知rabbitmq
         * Consumer callback:獲取消息的回調方法
         */
        channel.basicConsume(QUEUE,true,consumer);
    }
}

結果:

默認輪詢策略

消費者一號第一條消息

消費者二號第二條消息

消費者一號第三條消息

2.發布、訂閱模式

Publish/subscribe

生產:

 1 import com.rabbitmq.client.BuiltinExchangeType;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5 
 6 import java.io.IOException;
 7 import java.util.concurrent.TimeoutException;
 8 
 9 public class PublishProducer {
10 
11     //隊列名稱
12     public static final String QUEUE_INFORM_EMAIL ="queue_inform_email";
13     public static final String QUEUE_INFORM_SMS ="queue_inform_sms";
14 
15     //交換機名稱
16     private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";
17 
18     public static void main(String[] args) {
19 
20         Connection connection = null;
21         Channel channel = null;
22 
23         try{
24 
25             //聲明連接工廠
26             ConnectionFactory connectionFactory = new ConnectionFactory();
27             connectionFactory.setHost("127.0.0.1");
28             connectionFactory.setPort(5672);
29             connectionFactory.setUsername("guest");
30             connectionFactory.setPassword("guest");
31             connectionFactory.setVirtualHost("/");
32 
33             //聲明連接對象與通道對象
34             connection = connectionFactory.newConnection();
35             channel = connection.createChannel();
36 
37             //聲明隊列
38             channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
39             channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
40 
41             //聲明交換機
42             /**
43              * name:交換機名稱
44              * type: 交換機的類型  當工作模式為發布/訂閱模式,交換機的類型:fanount
45              */
46             channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
47 
48             //隊列綁定交換機
49             channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
50             channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
51 
52             //生產者會將消息發送到交換機,交換機會按照一定的規則將消息轉發給一個或多個隊列
53 
54             //發送消息
55             channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,"publish message".getBytes());
56 
57         }catch (Exception e){
58             e.printStackTrace();
59         }finally {
60             try {
61                 channel.close();
62             } catch (IOException e) {
63                 e.printStackTrace();
64             } catch (TimeoutException e) {
65                 e.printStackTrace();
66             }
67             try {
68                 connection.close();
69             } catch (IOException e) {
70                 e.printStackTrace();
71             }
72         }
73     }
74 }

 


消費:

 1 import com.rabbitmq.client.*;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 public class PublishConsumer {
 7 
 8     //隊列名稱
 9     public static final String QUEUE_INFORM_EMAIL ="queue_inform_email";
10     public static final String QUEUE_INFORM_SMS ="queue_inform_sms";
11 
12     public static void main(String[] args) throws IOException, TimeoutException {
13 
14         Connection connection = null;
15         Channel channel = null;
16 
17         //聲明連接工廠
18         ConnectionFactory connectionFactory = new ConnectionFactory();
19         connectionFactory.setHost("127.0.0.1");
20         connectionFactory.setPort(5672);
21         connectionFactory.setUsername("guest");
22         connectionFactory.setPassword("guest");
23         connectionFactory.setVirtualHost("/");
24 
25         //聲明連接對象與通道對象
26         connection = connectionFactory.newConnection();
27         channel = connection.createChannel();
28 
29 
30         //設置監聽
31         Consumer consumer = new DefaultConsumer(channel){
32             @Override
33             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
34                 String value = new String(body);
35                 System.out.println(value);
36             }
37         };
38         channel.basicConsume( QUEUE_INFORM_EMAIL,true,consumer);
39 
40     }
41 }

 

結果:

channel.basicConsume( QUEUE_INFORM_EMAIL,true,consumer);

 

channel.basicConsume( QUEUE_INFORM_SMS,true,consumer);

 

 

總結

發布訂閱模式:

1、每個消費者監聽自己的隊列。

2、生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收到消息

 
        

3. Routing路由模式

路由模式:

1、每個消費者監聽自己的隊列,並且設置routingkey。

2、生產者將消息發給交換機,由交換機根據routingkey來轉發消息到指定的隊列。

生產者:

 1 package com.itheima.producer;
 2 
 3 import com.rabbitmq.client.BuiltinExchangeType;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.ConnectionFactory;
 7 
 8 import java.io.IOException;
 9 import java.util.concurrent.TimeoutException;
10 
11 public class RoutingProducer {
12 
13     //隊列名稱
14     private static final String QUEUE_ROUTING_EMAIL = "queue_routing_email";
15     private static final String QUEUE_ROUTING_SMS = "queue_routing_sms";
16 
17     //交換機名稱
18     private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";
19 
20     public static void main(String[] args) {
21 
22         Connection connection = null;
23         Channel channel = null;
24 
25         try{
26 
27             //聲明連接工廠
28             //聲明連接工廠
29             ConnectionFactory connectionFactory = new ConnectionFactory();
30             connectionFactory.setHost("127.0.0.1");
31             connectionFactory.setPort(5672);
32             connectionFactory.setUsername("guest");
33             connectionFactory.setPassword("guest");
34             connectionFactory.setVirtualHost("/");
35 
36             //聲明連接對象與通道對象
37             connection = connectionFactory.newConnection();
38             channel = connection.createChannel();
39 
40             //聲明隊列
41             channel.queueDeclare(QUEUE_ROUTING_EMAIL,true,false,false,null);
42             channel.queueDeclare(QUEUE_ROUTING_SMS,true,false,false,null);
43 
44             //聲明交換機
45             //當使用路由模式,交換機類型必須為direct
46             channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
47 
48             //隊列綁定交換機
49             channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,"info");
50             channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,"info");
51 
52             //發送消息
53             channel.basicPublish(EXCHANGE_ROUTING_INFORM,"info",null,"info message".getBytes());
54 
55         }catch (Exception e){
56             e.printStackTrace();
57         }finally {
58             try {
59                 channel.close();
60             } catch (IOException e) {
61                 e.printStackTrace();
62             } catch (TimeoutException e) {
63                 e.printStackTrace();
64             }
65             try {
66                 connection.close();
67             } catch (IOException e) {
68                 e.printStackTrace();
69             }
70         }
71     }
72 }

 

消費者:

 1 import com.rabbitmq.client.*;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 public class RoutingConsumer {
 7 
 8     //隊列名稱
 9     private static final String QUEUE_ROUTING_EMAIL = "queue_routing_email";
10     private static final String QUEUE_ROUTING_SMS = "queue_routing_sms";
11 
12     public static void main(String[] args) throws IOException, TimeoutException {
13 
14         Connection connection = null;
15         Channel channel = null;
16 
17         //聲明連接工廠
18         ConnectionFactory connectionFactory = new ConnectionFactory();
19         connectionFactory.setHost("127.0.0.1");
20         connectionFactory.setPort(5672);
21         connectionFactory.setUsername("guest");
22         connectionFactory.setPassword("guest");
23         connectionFactory.setVirtualHost("/");
24 
25         //聲明連接對象與通道對象
26         connection = connectionFactory.newConnection();
27         channel = connection.createChannel();
28 
29 
30         //設置監聽
31         Consumer consumer = new DefaultConsumer(channel){
32             @Override
33             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
34                 String value = new String(body);
35                 System.out.println(value);
36             }
37         };
38         channel.basicConsume(QUEUE_ROUTING_EMAIL,true,consumer);
39 
40     }
41 }

 

結果:

info message
info message

 

4.通配符模式

特點:

1、每個消費者監聽自己的隊列,並且設置帶統配符的routingkey。

2、生產者將消息發給broker,由交換機根據routingkey來轉發消息到指定的隊列

生產者:

 1 import com.rabbitmq.client.BuiltinExchangeType;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5 
 6 import java.io.IOException;
 7 import java.util.concurrent.TimeoutException;
 8 
 9 public class TopicProducer {
10 
11     //隊列名稱
12     public static final String QUEUE_EMAIL="queue_topic_email";
13     public static final String QUEUE_SMS="queue_topic_sms";
14 
15     //交換機名稱
16     public static final String EXCHANGE_TOPIC_INFORM="exchange_topic_inform";
17 
18     //路由key
19     public static final String ROUTING_EMAIL="inform.#.email.#"; // inform.email
20     public static final String ROUTING_SMS="inform.#.sms.#";//
21 
22     public static void main(String[] args) {
23 
24         Connection connection = null;
25 
26         Channel channel = null;
27 
28         try{
29 
30             //聲明連接工廠
31             //聲明連接工廠
32             ConnectionFactory connectionFactory = new ConnectionFactory();
33             connectionFactory.setHost("127.0.0.1");
34             connectionFactory.setPort(5672);
35             connectionFactory.setUsername("guest");
36             connectionFactory.setPassword("guest");
37             connectionFactory.setVirtualHost("/");
38 
39             //聲明連接對象與通道對象
40             connection = connectionFactory.newConnection();
41             channel = connection.createChannel();
42 
43             //聲明隊列
44             channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);
45             channel.queueDeclare(QUEUE_SMS,true,false,false,null);
46 
47             //聲明交換機
48             //當使用通配符模式,交換機類型必須為topic
49             channel.exchangeDeclare(EXCHANGE_TOPIC_INFORM, BuiltinExchangeType.TOPIC);
50 
51             //隊列綁定交換機
52             channel.queueBind(QUEUE_EMAIL,EXCHANGE_TOPIC_INFORM,ROUTING_EMAIL);
53             channel.queueBind(QUEUE_SMS,EXCHANGE_TOPIC_INFORM,ROUTING_SMS);
54 
55             //發送消息
56             //發送消息到郵件隊列
57             //channel.basicPublish(EXCHANGE_TOPIC_INFORM,"inform.email",null,"email message".getBytes());
58 
59             //發送消息到短信隊列
60             //channel.basicPublish(EXCHANGE_TOPIC_INFORM,"inform.sms",null,"sms message".getBytes());
61 
62             //發送消息到郵件與短信隊列
63             channel.basicPublish(EXCHANGE_TOPIC_INFORM,"inform.sms.email",null,"sms email message".getBytes());
64         }catch(Exception e)
65 
66         {
67             e.printStackTrace();
68         }finally{
69             try {
70                 channel.close();
71             } catch (IOException e) {
72                 e.printStackTrace();
73             } catch (TimeoutException e) {
74                 e.printStackTrace();
75             }
76             try {
77                 connection.close();
78             } catch (IOException e) {
79                 e.printStackTrace();
80             }
81         }
82     }
83 }

 


消費者:

 1 import com.rabbitmq.client.*;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 public class TopicConsumer {
 7 
 8     //隊列名稱
 9     public static final String QUEUE_EMAIL="queue_topic_email";
10     public static final String QUEUE_SMS="queue_topic_sms";
11 
12     public static void main(String[] args) throws IOException, TimeoutException {
13 
14         Connection connection = null;
15         Channel channel = null;
16 
17         //聲明連接工廠
18         ConnectionFactory connectionFactory = new ConnectionFactory();
19         connectionFactory.setHost("127.0.0.1");
20         connectionFactory.setPort(5672);
21         connectionFactory.setUsername("guest");
22         connectionFactory.setPassword("guest");
23         connectionFactory.setVirtualHost("/");
24 
25         //聲明連接對象與通道對象
26         connection = connectionFactory.newConnection();
27         channel = connection.createChannel();
28 
29 
30         //設置監聽
31         Consumer consumer = new DefaultConsumer(channel){
32             @Override
33             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
34                 String value = new String(body);
35                 System.out.println(value);
36             }
37         };
38         channel.basicConsume(QUEUE_SMS,true,consumer);
39 
40     }
41 }

 


結果:

sms email message

總結幾種模式:

四,springboot整合RabbitMQ

1.消費者config

 1 import org.springframework.amqp.core.*;
 2 import org.springframework.beans.factory.annotation.Qualifier;
 3 import org.springframework.context.annotation.Bean;
 4 import org.springframework.context.annotation.Configuration;
 5 
 6 @Configuration
 7 public class RabbitMQConfig {
 8 
 9     public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
10     public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
11 
12     public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
13 
14 
15     //聲明隊列
16     @Bean(QUEUE_INFORM_EMAIL)
17     public Queue QUEUE_INFORM_EMAIL(){
18         Queue queue = new Queue(QUEUE_INFORM_EMAIL);
19         return queue;
20     }
21 
22     @Bean(QUEUE_INFORM_SMS)
23     public Queue QUEUE_INFORM_SMS(){
24         Queue queue = new Queue(QUEUE_INFORM_SMS);
25         return queue;
26     }
27 
28     //聲明交換機
29     @Bean(EXCHANGE_TOPICS_INFORM)
30     public Exchange EXCHANGE_TOPICS_INFORM(){
31         return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
32     }
33 
34     //隊列綁定交換機
35     @Bean
36     public Binding BIND_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL)Queue queue,@Qualifier(EXCHANGE_TOPICS_INFORM)Exchange exchange){
37         return BindingBuilder.bind(queue).to(exchange).with(QUEUE_INFORM_EMAIL).noargs();
38     }
39 
40     @Bean
41     public Binding BIND_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS)Queue queue,@Qualifier(EXCHANGE_TOPICS_INFORM)Exchange exchange){
42         return BindingBuilder.bind(queue).to(exchange).with(QUEUE_INFORM_SMS).noargs();
43     }
44 
45 }

消費者config

 1 import org.springframework.amqp.core.Queue;
 2 import org.springframework.context.annotation.Bean;
 3 import org.springframework.context.annotation.Configuration;
 4 
 5 @Configuration
 6 public class RabbitMQConfig {
 7 
 8     public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
 9     public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
10 
11 
12     //聲明隊列
13     @Bean(QUEUE_INFORM_EMAIL)
14     public Queue QUEUE_INFORM_EMAIL(){
15         Queue queue = new Queue(QUEUE_INFORM_EMAIL);
16         return queue;
17     }
18 
19     @Bean(QUEUE_INFORM_SMS)
20     public Queue QUEUE_INFORM_SMS(){
21         Queue queue = new Queue(QUEUE_INFORM_SMS);
22         return queue;
23     }
24 
25 }

 

監聽消費者:

import com.itheima.springboot.consumer.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyMessageConsumer {

    //設置監聽方法 , 每一個監聽方法可以去設置監聽一個或多個隊列
    @RabbitListener(queues = {RabbitMQConfig.QUEUE_INFORM_EMAIL})
    public void receivceMessage(String message){
        System.out.println(message);
    }
}

 

五,全鏈路消息不丟失

TPS,QPS是指標。

1.生產者宕機:

情形一:Q:當訂單服務發送的消息成功到達了rabbitmq並保存在內存中,但是此消息在沒有被消費者拿走之前,rabbitmq宕機了此時應該怎么辦?

A:此時可以考慮進行持久化操作,此處需要將==隊列與消息==都進行持久化操作,將這兩部分信息寫入磁盤。

情形二; Q:當rabbitmq未將消息寫入到磁盤時,消息隊列宕機了,怎么辦?

A:此時需要保證生產者發送的消息,rabbitmq會一定成功的進行隊列與消息持久化,否則進行重發。

 

​ RabbitMQ針對生產者投遞數據丟失,已經提供了兩種解決機制, 分別是 ==重量級的事務機制====輕量級的confirm機制==.

選擇confirm機制:(事務同步效率低):

confirm模式需要基於channel進行設置, 一旦某條消息被投遞到隊列之后,消息隊列就會發送一個確認信息給生產者。

情形三:高並發下生產者

  1.阿里sql,對比mysql性能提高15%,對比nosql數據可以保存在磁盤不用考慮在內存空間的大小。生產者實例收到一個消息ack之后,就從kv存儲中刪除這條臨時消息;收到一個消息nack之后,就從kv存儲提取這條消息然后重新投遞一次即可;也可以自己對kv存儲里的消息做監控,如果超過一定時長沒收到ack,就主動重發消息。(生產者有個記錄表,每個一段時間掃描此表)。

  2.預抓取總數100-300,根據壓力測試臨界值,閾值向下減100左右來設定,不可過大不可過小(堆積,效率低)。

2.消費者宕機

情形一     

消費者成功接收到消息之后,可以自動的向rabbitMQ返回一個應答信息,通知rabbitMQ此條消息已經被成功的接收,當rabbitMQ接收到這條消息之后,則會將此條消息刪除。這叫做自動ACK(自動應答)。

可以將自動應答改為手動應答,另外提高效率還可以批量ACK,集中返回此時當消費者接收到消息,不會馬上自動向消息隊列發送應答消息,而是需要開發人員手動編碼發送應答消息, 從而保證消息隊列不會自動刪除這條消息,而是等到消費者返回ACK確認消息才會進行刪除。(開發人員要做的就是當且只有庫存服務執行完畢並調用成功物流系統之后才會向消息隊列對返回一條確認消息,當消息隊列接收到這條消息之后才刪除此消息),現某個庫存服務(消費者)宕機了. 那么就會將這個訂單消息發送給其他的庫存服務實例(消費者)去使用這條消息.

情形二;

A消費者宕機后,另一台消費者B機器發貨,A重啟,又發之前沒發的貨,發了兩遍。

解決方案:避免重復發貨,發貨時判斷狀態碼(冪等性)。

六。冪等性

1冪等性:

​ 對於相同的一次請求操作,不管操作多少次,得到的結果都是相同的

2場景:

​ 在開啟消費者手動ack的情況下, 庫存服務的消費者已經接收到了消息,並調用物流系統並成功更改狀態進行發送, 但是在ack返回消息的時候, 這個消費者服務可能突然宕機, 因為消費者ack未返回, 所以會導致這個消息一直在MQ中, 當消費者重新啟動就會又接收這個消息進行發貨, 這樣的話,就會導致相同的貨品發貨了兩次!!!

3解決方案:

​ 1)狀態判斷:在消費者接收消息執行之前, 先根據消息的標識信息查詢DB,判斷狀態, 如果為已發貨/待發貨狀態的話,則直接ack返回MQ,刪除這條消息

​ 2)樂觀鎖

七,有序性

對於上述流程因為有多個消費者共同監聽了同一個隊列,每一個隊列都會獲取到不同消息,但是消費者的執行時間不盡相同,因此會導致數據的無序性。

​ 將上述流程改造為只有一個消費者監聽此隊列,或這些消息只會推送給一個消費者即可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM