消息中間件系列三:使用RabbitMq原生Java客戶端進行消息通信(消費者(接收方)自動確認模式、消費者(接收方)自行確認模式、生產者(發送方)確認模式)


准備工作:

1)安裝RabbitMQ,參考文章:消息中間件系列二:RabbitMQ入門(基本概念、RabbitMQ的安裝和運行)

2.)分別新建名為OriginalRabbitMQProducer和OriginalRabbitMQConsumer的maven工程

在pom.xml文件里面引入如下依賴:

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.0.0</version>
    </dependency>

說明:5系列的版本最好使用JDK8及以上, 低於JDK8可以使用4.x(具體的版本號到Maven的中央倉庫查)的版本

一、消費者(接收方)自動確認模式

 前面有談到消費者收到的每一條消息都必須進行確認,消息的確認機制分為自動確認和消費者自行確認。下面我們來看一下自動確認的示例:

示例1:交換器是direct

1. 在工程OriginalRabbitMQProducer新建一個一個direct的生產者

package study.demo.normal;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 交換器是direct的生產者 路由鍵完全匹配時,消息才投放到對應隊列
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class DirectProducer {

    //定義交換器的名字
    private final static String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        //1.創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        
        //設置要連接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //設置用戶名 這里使用缺省的
        //factory.setUsername(..);
        
        //設置連接斷開  這里使用缺省的
        //factory.setPort();
        
        //設置虛擬主機 這里使用缺省的
        //factory.setVirtualHost();


        //2.通過連接工廠創建一個連接
        Connection connection = factory.newConnection();

        //3.通過連接創建一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();

        //4.通過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //定義一組路由鍵
        String[]routingKeys = {"error","info","warning"};

        //發布消息的交換器上
        for(int i=0;i<3;i++){
            //路由鍵
            String routingKey = routingKeys[i];
            
            //要發送的消息
            String message = "Hello world_"+(i+1);

            /**
             * 發送消息到交換器上
             * 參數1:交換器的名字
             * 參數2:路由鍵
             * 參數3:BasicProperties
             * 參數4:要發送的消息
             */
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
            System.out.println("Sent "+routingKey+":"+message);

        }

        channel.close();
        connection.close();

    }

}

2. 在工程OriginalRabbitMQConsumer新建一個direct的只消費error日志的消費者

package study.demo.normal;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 交換器是direct 只消費error日志的消費者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ConsumerError {

    //定義交換器的名字
    private static final String EXCHANGE_NAME = "direct_logs";
    // private static final String EXCHANGE_NAME = "fanout_logs_1";

    public static void main(String[] argv) throws IOException, TimeoutException {
        //1.創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要連接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.通過連接工廠創建一個連接
        Connection connection = factory.newConnection();
        
        //3.通過連接創建一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.通過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        
        //5.聲明隨機隊列
        String queueName = channel.queueDeclare().getQueue();
        
        //6.聲明一個只消費錯誤日志的路由鍵error
        String routingKey = "error";
        
        //7.隊列通過路由鍵綁定到交換器上
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        
        System.out.println("Waiting message.......");

        //8.設置一個監聽器監聽消費消息
        Consumer consumerB = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
            }
        };
     //9.自動確認:autoAck參數為true
        channel.basicConsume(queueName,true,consumerB);
    }

}

3. 啟動消費者ConsumerError,再啟動生產者DirectProducer,查看效果

啟動消費者ConsumerError:

啟動生產者DirectProducer:

 

查看消費者ConsumerError現在的狀況:

 

可以看到消費者只消費了error級別的消息,這是因為在direct模式下,消費者只定義了路由鍵routingKey = "error";

 4. 在工程OriginalRabbitMQConsumer新建一個direct的消費所有日志的消費者

package study.demo.normal;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 交換器是direct 消費所有日志的消費者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ConsumerAll {
    //定義交換器的名字
    private static final String EXCHANGE_NAME = "direct_logs";
    // private static final String EXCHANGE_NAME = "fanout_logs_1";

    public static void main(String[] argv) throws IOException,
            InterruptedException, TimeoutException {

        //1.創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要連接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.通過連接工廠創建一個連接
        Connection connection = factory.newConnection();
        
        //3.通過連接創建一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.通過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        
        
        //5.聲明隨機隊列
        String queueName = channel.queueDeclare().getQueue();
        
        //6.定義一組路由鍵消費所有日志
        String[]routingKeys = {"error","info","warning"};
        
        //7.隊列通過路由鍵綁定到交換器上
        for(String routingKey:routingKeys){
            //隊列和交換器的綁定
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        }
        System.out.println("Waiting message.......");

        //8.設置一個監聽器監聽消費消息
        Consumer consumerA = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
            }
        };
         //9.自動確認:autoAck參數為true
         channel.basicConsume(queueName,true,consumerA);

    }

  }

5. 啟動消費者ConsumerAll,再啟動生產者DirectProducer,查看效果

啟動消費者ConsumerAll:

啟動生產者DirectProducer:

 查看消費者ConsumerAll的狀態:

 可以看到消費者ConsumerAll消費了生產者DirectProducer產生的所有消息,這是因為在direct模式下,消費者定義了和生產者一樣個數的路由鍵String[]routingKeys = {"error","info","warning"};

示例2:交換器是fanout

1. 在工程OriginalRabbitMQProducer新建一個交換器是fanout的生產者

package study.demo.normal;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 交換器是fanout的生產者 可以理解為廣播,會把所有消息投放到綁定到這個交換器上的隊列上
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class FanoutProducer {

    private final static String EXCHANGE_NAME = "fanout_logs_1";

    public static void main(String[] args) throws IOException, TimeoutException {

        //1.創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        
        //設置要連接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //設置用戶名 這里使用缺省的
        //factory.setUsername(..);
        
        //設置連接斷開  這里使用缺省的
        //factory.setPort();
        
        //設置虛擬主機 這里使用缺省的
        //factory.setVirtualHost();


        //2.通過連接工廠創建一個連接
        Connection connection = factory.newConnection();

        //3.通過連接創建一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();

        //4.通過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        //定義一組路由鍵
        String[]routingKeys = {"error","info","warning"};

        //發布消息的交換器上
        for(int i=0;i<3;i++){
            //路由鍵
            String routingKey = routingKeys[i];
            
            //要發送的消息
            String message = "Hello world_"+(i+1);

            /**
             * 發送消息到交換器上
             * 參數1:交換器的名字
             * 參數2:路由鍵
             * 參數3:BasicProperties
             * 參數4:要發送的消息
             */
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
            System.out.println("Sent "+routingKey+":"+message);

        }

        channel.close();
        connection.close();

    }

}

2. 在工程OriginalRabbitMQConsumer新建一個fanout的只消費error日志的消費者

package study.demo.normal;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 交換器是fanout,只消費error日志的消費者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ConsumerError {

    //定義交換器的名字
    //private static final String EXCHANGE_NAME = "direct_logs";
     private static final String EXCHANGE_NAME = "fanout_logs_1";

    public static void main(String[] argv) throws IOException, TimeoutException {
        //1.創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要連接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.通過連接工廠創建一個連接
        Connection connection = factory.newConnection();
        
        //3.通過連接創建一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.通過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        
        //5.聲明隨機隊列
        String queueName = channel.queueDeclare().getQueue();
        
        //6.聲明一個只消費錯誤日志的路由鍵error
        String routingKey = "error";
        
        //7.隊列通過路由鍵綁定到交換器上
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        
        System.out.println("Waiting message.......");

        //8.設置一個監聽器監聽消費消息
        Consumer consumerB = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
            }
        };
      //9.自動確認:autoAck參數為true
     channel.basicConsume(queueName,true,consumerA);


    }

}

3. 啟動消費者ConsumerError,再啟動生產者DirectProducer,查看效果

啟動消費者ConsumerError:

啟動生產者FanoutProducer:

 

查看消費者ConsumerError現在的狀況:

 

可以看到消費者消費了所有級別的消息,這是因為在fanout模式下,雖然消費者只定義了路由鍵routingKey = "error",但是因為fanut是廣播模式,會把所有消息投放到綁定到這個交換器上的隊列上

 4. 在工程OriginalRabbitMQConsumer新建一個fanout的消費所有日志的消費者

package study.demo.normal;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 交換器是fanout 消費所有日志的消費者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ConsumerAll {
    //定義交換器的名字
    //private static final String EXCHANGE_NAME = "direct_logs";
    private static final String EXCHANGE_NAME = "fanout_logs_1";

    public static void main(String[] argv) throws IOException,
            InterruptedException, TimeoutException {

        //1.創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要連接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.通過連接工廠創建一個連接
        Connection connection = factory.newConnection();
        
        //3.通過連接創建一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.通過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        
        
        //5.聲明隨機隊列
        String queueName = channel.queueDeclare().getQueue();
        
        //6.定義一組路由鍵消費所有日志
        String[]routingKeys = {"error","info","warning"};
        
        //7.隊列通過路由鍵綁定到交換器上
        for(String routingKey:routingKeys){
            //隊列和交換器的綁定
            channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        }
        System.out.println("Waiting message.......");

        //8.設置一個監聽器監聽消費消息
        Consumer consumerA = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
            }
        };
       //9.自動確認:autoAck參數為true
       channel.basicConsume(queueName,true,consumerA);



    }
}

5. 啟動消費者ConsumerAll,再啟動生產者DirectProducer,查看效果

啟動消費者ConsumerAll:

啟動生產者FanoutProducer:

 查看消費者ConsumerAll的狀態:

 可以看到消費者ConsumerAll消費了生產者DirectProducer產生的所有消息,這是因為在fanout模式下,消費者定義了和生產者一樣個數的路由鍵String[]routingKeys = {"error","info","warning"};

三、消費者(接收方)自行確認模式

1. 在工程OriginalRabbitMQProducer新建一個消費者自行確認生產者

package study.demo.consumerconfirm;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 消費者自行確認生產者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ConsumerConfirmProducer {

    //交換器
    private final static String EXCHANGE_NAME = "direct_cc_confirm_1";
    
    //路由鍵
    private final static String ROUTE_KEY = "error";

    public static void main(String[] args) throws IOException, TimeoutException,
            InterruptedException {

        //1.創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        
        //設置要連接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //設置用戶名 這里使用缺省的
        //factory.setUsername(..);
        
        //設置連接斷開  這里使用缺省的
        //factory.setPort();
        
        //設置虛擬主機 這里使用缺省的
        //factory.setVirtualHost();


        //2.通過連接工廠創建一個連接
        Connection connection = factory.newConnection();

        //3.通過連接創建一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();

        //4.通過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //發布消息的交換器上
        for(int i=0;i<10;i++){
            //要發送的消息
            String message = "Hello world_"+(i+1);

            /**
             * 發送消息到交換器上
             * 參數1:交換器的名字
             * 參數2:路由鍵
             * 參數3:BasicProperties
             * 參數4:要發送的消息
             */
            channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,message.getBytes());
            System.out.println("Sent "+ROUTE_KEY+":"+message);

        }

        channel.close();
        connection.close();

    }

}

2. 在工程OriginalRabbitMQConsumer新建一個消費者自行確認消費者

package study.demo.consumerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 消費者自行確認消費者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ClientConsumerAck {

    private static final String EXCHANGE_NAME = "direct_cc_confirm_1";

    public static void main(String[] argv) throws IOException, TimeoutException {
        //1.創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要連接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.通過連接工廠創建一個連接
        Connection connection = factory.newConnection();
        
        //3.通過連接創建一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.通過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //5.聲明隊列
        String queueName = "consumer_confirm";
        channel.queueDeclare(queueName,false,false,
                false,null);

        //6.聲明一個只消費錯誤日志的路由鍵error
        String routingKey = "error";
        
        //7.隊列通過路由鍵綁定到交換器上
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        System.out.println("Waiting message.......");

        //8.設置一個監聽器監聽消費消息
        Consumer consumerB = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
                //消費者自行確認
                this.getChannel().basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //9.消費者自行確認:autoAck參數為false
        channel.basicConsume(queueName,false,consumerB);
    }

} 

3. 在工程OriginalRabbitMQConsumer新建一個消費者自行確認休眠不回復ack的消費者

package study.demo.consumerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 消費者自行確認休眠不回復ack的消費者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ClientConsumerSlowAck {

    private static final String EXCHANGE_NAME = "direct_cc_confirm_1";

    public static void main(String[] argv) throws IOException, TimeoutException {
        //1.創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要連接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.通過連接工廠創建一個連接
        Connection connection = factory.newConnection();
        
        //3.通過連接創建一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.通過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //5.聲明隊列
        String queueName = "consumer_confirm";
        channel.queueDeclare(queueName,false,false,
                false,null);

        //6.聲明一個只消費錯誤日志的路由鍵error
        String routingKey = "error";
        
        //7.隊列通過路由鍵綁定到交換器上
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        System.out.println("Waiting message.......");

        //8.設置一個監聽器監聽消費消息
        Consumer consumerB = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                try {
                    //消費者自行確認時不回復ack,一直休眠
                    Thread.sleep(20000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(body,"UTF-8");
                System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
                //this.getChannel().basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //9.消費者自行確認:autoAck參數為false
        channel.basicConsume(queueName,false,consumerB);
    }

} 

4. 分別啟動消費者自行確認消費者ClientConsumerAck和消費者自行確認休眠不回復ack的消費者ClientConsumerSlowAck,再啟動消費者自行確認生產者ConsumerConfirmProducer查看狀態

啟動消費者自行確認消費者ClientConsumerAck:

啟動消費者自行確認休眠不回復ack的消費者ClientConsumerSlowAck

 

啟動消費者自行確認生產者ConsumerConfirmProducer

 

查看消費者自行確認消費者ClientConsumerAck的狀態:

查看消費者自行確認休眠不回復ack的消費者ClientConsumerSlowAck的狀態:

查看RabbitMQ服務器上的隊列情況:

可以看到隊列consumer_confirm里面有5條消息未消費,這是因為消費者自行確認休眠不回復ack的消費者ClientConsumerSlowAck收到了這5條消息,但是沒有向RabbitMQ服務器發送確認消息,RabbitMQMQ認為這5條消息還沒有被消費就一直存在隊列里面

下面停掉ClientConsumerSlowAck,查看ClientConsumerAck和RabbitMQ服務器里面隊列consumer_confirm的狀態

 

可以看到停掉ClientConsumerSlowAck以后,之前的5條消息被ClientConsumerAck消費了

5. 在工程OriginalRabbitMQConsumer 新建一個消費者自行確認拒絕消息的消費者

package study.demo.consumerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 消費者自行確認拒絕消息的消費者
 * @author leeSmall
 * @date 2018年9月15日
 *
 */
public class ClientConsumerReject {

    private static final String EXCHANGE_NAME = "direct_cc_confirm_1";

    public static void main(String[] argv) throws IOException, TimeoutException {
        //1.創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要連接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.通過連接工廠創建一個連接
        Connection connection = factory.newConnection();
        
        //3.通過連接創建一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.通過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //5.聲明隊列
        String queueName = "consumer_confirm";
        channel.queueDeclare(queueName,false,false,
                false,null);

        //6.聲明一個只消費錯誤日志的路由鍵error
        String routingKey = "error";
        
        //7.隊列通過路由鍵綁定到交換器上
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        System.out.println("Waiting message.......");

        //8.設置一個監聽器監聽消費消息
        Consumer consumerB = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //消費者自行拒絕消息 參數requeue=true,讓RabbitMQ服務器重新分發消息,requeue=false讓RabbitMQ服務器移除消息
                this.getChannel().basicReject(envelope.getDeliveryTag(),true);
                System.out.println("Reject:"+envelope.getRoutingKey()
                        +":"+new String(body,"UTF-8"));
            }
        };

        //9.消費者自行確認:autoAck參數為false
        channel.basicConsume(queueName,false,consumerB);
    }

} 

 6. 分別啟動消費者自行確認消費者ClientConsumerAck和消費者自行確認拒絕消息的消費者ClientConsumerReject,再啟動消費者自行確認生產者ConsumerConfirmProducer查看狀態

 啟動消費者自行確認消費者ClientConsumerAck:

 

 啟動消費者自行確認拒絕消息的消費者ClientConsumerReject:

 

 

啟動消費者自行確認生產者ConsumerConfirmProducer

 

 

 查看消費者自行確認消費者ClientConsumerAck的狀態:

查看消費者自行確認拒絕消息的消費者ClientConsumerReject的狀態:

可以看到消息都被ClientConsumerAck消費了,這是因為消費者ClientConsumerReject拒絕了所有消息,這里要注意

this.getChannel().basicReject(envelope.getDeliveryTag(),true);

 

這段代碼的basicReject的第二個參數requeue,參數requeue=true,讓RabbitMQ服務器重新分發消息,requeue=false讓RabbitMQ服務器移除消息

 requeue=false時,RabbitMQ服務器會刪掉被ClientConsumerReject拒絕的消息,消費者ClientConsumerAck就不能消費所有消息了

四、生產者(發送方)確認模式

 為什么要有個發送方確認模式?

生產者不知道消息是否真正到達RabbitMq,也就是說發布操作不返回任何消息給生產者。
AMQP協議層面為我們提供的事務機制解決了這個問題,但是事務機制本身也會帶來問題:
1)嚴重的性能問題
2)使生產者應用程序產生同步
RabbitMQ團隊為我們拿出了更好的方案,即采用發送方確認模式,該模式比事務更輕量,性能影響幾乎可以忽略不計。

1. 在OriginalRabbitMQProducer工程新建一個生產者(發送方)確認同步模式的類

package study.demo.producerconfirm;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 
 * @Description: 生產者(發送方)確認同步模式
 * @author leeSmall
 * @date 2018年9月16日
 *
 */
public class ProducerConfirm {

    private final static String EXCHANGE_NAME = "producer_confirm";
    private final static String ROUTE_KEY = "error";

    public static void main(String[] args) throws IOException, TimeoutException,
            InterruptedException {
        //1.創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        
        //設置要連接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //設置用戶名 這里使用缺省的
        //factory.setUsername(..);
        
        //設置連接斷開  這里使用缺省的
        //factory.setPort();
        
        //設置虛擬主機 這里使用缺省的
        //factory.setVirtualHost();


        //2.通過連接工廠創建一個連接
        Connection connection = factory.newConnection();

        //3.通過連接創建一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        //將信道設置為發送方確認
        channel.confirmSelect();

        //發布消息的交換器上
        for(int i=0;i<2;i++){
            String msg = "Hello "+(i+1);
            channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,msg.getBytes());
            //等待RabbitMQ返回消息確認消息已送達RabbitMQ服務器
            if (channel.waitForConfirms()){
                System.out.println("發送方同步確認: "+ROUTE_KEY+":"+msg);
            }
        }


        // 關閉頻道和連接
        channel.close();
        connection.close();
    }

}

2. 在OriginalRabbitMQProducer工程新建一個生產者(發送方)確認異步模式的類

package study.demo.producerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;



/**
 * 
 * @Description: 生產者(發送方)確認異步模式
 * @author leeSmall
 * @date 2018年9月16日
 *
 */
public class ProducerConfirmAsync {

    private final static String EXCHANGE_NAME = "producer_confirm";

    public static void main(String[] args) throws IOException, TimeoutException,
            InterruptedException {
        //1.創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        
        //設置要連接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //設置用戶名 這里使用缺省的
        //factory.setUsername(..);
        
        //設置連接斷開  這里使用缺省的
        //factory.setPort();
        
        //設置虛擬主機 這里使用缺省的
        //factory.setVirtualHost();


        //2.通過連接工廠創建一個連接
        Connection connection = factory.newConnection();

        //3.通過連接創建一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();

        //4.通過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //將信道設置為發送方確認
        channel.confirmSelect();

        //信道被關閉監聽器 用於RabbitMQ服務器斷線重連
        //channel.addShutdownListener();

        /**
         * 生產者異步確認監聽
         * 參數deliveryTag代表了當前channel唯一的投遞
         * 參數multiple:false
         * 
         */
        channel.addConfirmListener(new ConfirmListener() {
            //RabbitMQ服務器確認收到消息
            public void handleAck(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("RabbitMQ服務器確認收到消息Ack deliveryTag="+deliveryTag
                        +"multiple:"+multiple);
            }

            //RabbitMQ服務器由於自己內部出現故障沒有收到消息
            public void handleNack(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("RabbitMQ服務沒有收到消息Ack deliveryTag="+deliveryTag
                        +"multiple:"+multiple);
            }
        });

        //生產者異步返回監聽 這里和發布消息時的mandatory參數有關
        //參數mandatory:mandatory=true,投遞消息時無法找到一個合適的隊列,把消息返回給生產者,mandatory=false 丟棄消息(缺省)
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText,
                                     String exchange, String routingKey,
                                     AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body);
                System.out.println("replyText:"+replyText);
                System.out.println("exchange:"+exchange);
                System.out.println("routingKey:"+routingKey);
                System.out.println("msg:"+msg);
            }
        });


        //聲明一組路由鍵
        String[] routingKeys={"error","info","warning"};
        //發送消息到交換器上
        for(int i=0;i<3;i++){
            String routingKey = routingKeys[i%3];
            // 發送的消息
            String message = "Hello World_"+(i+1)+("_"+System.currentTimeMillis());
            
            //通過路由鍵把消息發送到交換器上
            //參數mandatory: mandatory=true,投遞消息時無法找到一個合適的隊列,把消息返回給生產者,
            //              mandatory=false 丟棄消息(缺省)
            channel.basicPublish(EXCHANGE_NAME, routingKey, false,
                    null, message.getBytes());
            System.out.println("----------------------------------------------------");
            System.out.println(" Sent Message: [" + routingKey +"]:'"+ message + "'");
            //sleep一下讓程序不快速結束 可以看到RabbitMQ服務器的響應
            Thread.sleep(1000);
        }

        // 關閉信道和連接
        channel.close();
        connection.close();
    }


}

3. 在OriginalRabbitMQConsumer工程新建一個發送方確認消費者

package study.demo.producerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 
 * @Description: 發送方確認消費者
 * @author leeSmall
 * @date 2018年9月16日
 *
 */
public class ProducerConfirmConsumer {

    private static final String EXCHANGE_NAME = "producer_confirm";

    public static void main(String[] argv) throws IOException, TimeoutException {
        //1.創建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設置要連接的RabbitMQ服務器的地址
        factory.setHost("127.0.0.1");
        
        //2.通過連接工廠創建一個連接
        Connection connection = factory.newConnection();
        
        //3.通過連接創建一個信道 信道是用來傳送數據的
        Channel channel = connection.createChannel();
        
        //4.通過信道聲明一個交換器 第一個參數時交換器的名字 第二個參數時交換器的種類
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //5.聲明隊列
        String queueName = "producer_confirm";
        channel.queueDeclare(queueName,false,false,
                false,null);

        //6.聲明一個只消費錯誤日志的路由鍵error
        String routingKey = "error";
        
        //7.隊列通過路由鍵綁定到交換器上
        channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
        System.out.println("Waiting message.......");

        // 8.創建隊列消費者 設置一個監聽器監聽消費消息 
        final Consumer consumerB = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println( "Received ["+ envelope.getRoutingKey() + "] "+message);
            }
        };
        //9.消費者自動確認:autoAck參數為true
        channel.basicConsume(queueName, true, consumerB);
    }

}

4. 啟動發送方確認消費者ProducerConfirmConsumer,再分別啟動生產者(發送方)確認同步模式的類ProducerConfirm和生產者(發送方)確認異步模式ProducerConfirmAsync

啟動發送方確認消費者ProducerConfirmConsumer:

 啟動生產者(發送方)確認同步模式的類ProducerConfirm:

 

查看發送方確認消費者ProducerConfirmConsumer的狀態:

 

 啟動生產者(發送方)確認異步模式ProducerConfirmAsync:

 

查看發送方確認消費者ProducerConfirmConsumer的狀態:

 

 

示例代碼獲取地址


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM