rabbitmq 学习笔记


1。消息会处于阻塞状态,可以通过(man rabbitmqctl 可以获得更多使用方法,常用的有list_queues,list_consumers.list_connections,close_connection,add_vhost,...)

list_channels pid connection ; close_connection [connection] "" ,查看和解决阻塞 

rabbitmqctl list_queues -p [vhost] name messages_ready messages_unacknowledged

解决阻塞的办法,可以在subscribe消息队列是设置autoAck=true,这样会避免消息队列中消息阻塞,这种情况是worker接到消息后,就会把消息从消息队列删除,不管消息是否被正确处理,另一种是设置autoAck=false,这样worker在接受消息后,必须给予服务端一个ack响应,该消息才会从消息队列中删除,这样会防止消息的意外丢失,但要注意的是,消息队列如果没有接收到ack响应,该消息对了的消息就会一直阻塞,对于rabbitmq-server来说,他是没有超时存在的,即除非重启rabbitmq,否则该消息队列会一直阻塞,直到收到响应,但如果与该消息队列的subscirbe断开的话,则表明过期,即该消息队列中消息会尝试重新发消息给一个订阅者进行处理。

2。关于健壮的消息处理
当rabbitmq server重启,或意外当掉的话,所用消息的订阅都会跟着坏掉(当然也可以设置持久化的消息队列设置),解决办法是捕获ShutdownSignalException异常(对rabbitmq)出现该异常说明消息服务无法连到,故可以进行相应的处理,另每次消息发送,消息订阅之前都要进行一次消息队列,exchange,绑定的重定义,防止消息对了重启后改消息队列/exchang已消失。

组合命令

list_consumers -p [vhost]
list_channels -p [vhost] pid connection
list_connections 
close_connection

3。关于消息处理的管理
每个worker都应该有一个可标识的tag,尽量不使用系统生成的,这样便于以后的debug

Connection conn = null;
Channel channel = null;
QueueingConsumer consumer = null;
try {
    conn = qFactory.newConnection();
    channel = conn.createChannel();
    channel.queueDeclare(queueName, false, false, false, null);

    /**
    * This tells RabbitMQ not to give more than one message to a
    * worker at a time. Or, in other words, don't dispatch a new
     * message to a worker until it has processed and acknowledged
    * the previous one. Instead, it will dispatch it to the next
    * worker that is not still busy
    */
        channel.basicQos(1);

    consumer = new QueueingConsumer(channel);
    //Use hostname as consume tagname , So that We can monitor who consume this Queue
                 channel.basicConsume(this.queueName, false, hostName, consumer);
    } catch (IOException e) {
        e.printStackTrace();
    }

    while (true) {
        try {
            //Get next message
            delivery = consumer.nextDelivery();
        } catch (ShutdownSignalException e) {
        //If rabbitmq-server has closed , out of loop
            e.printStackTrace();
            isSignalBroken = true ;
            break;
        } catch (ConsumerCancelledException e) {
            e.printStackTrace();
            log.warn("The consumer has cancelled , Try to re-consume");
            //If the channel and conn have closed .
            try{
            //Sleep 1s and reconnect to rabbitmq-server
                Thread.sleep(1000);
                conn = qFactory.newConnection();
                channel = conn.createChannel();
                            channel.queueDeclare(queueName, false, false, false, null);
                            
                channel.basicQos(1);
                            
                consumer = new  QueueingConsumer(channel);
                        channel.basicConsume(this.queueName, false, consumer);
                continue;
            }catch (IOException e1) {
                e1.printStackTrace();
            } catch (InterruptedException e2) {
                e2.printStackTrace();
            }
                        
        } catch (InterruptedException e) {
            e.printStackTrace();
        }  
                    
                    
        try{
                        
            //process message

        }catch (Exception e) {
            //If throw exception when process message , close channel and conn , make sure this message not block . then re-work
            e.printStackTrace();
            try {
                            channel.basicCancel(hostName);
                channel.close();
                conn.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            continue;
        }
                    
    /**
    * If a consumer dies without sending an ack, RabbitMQ will
    * understand that a message wasn't processed fully and will
    * redeliver it to another consumer 
    * There aren't any message timeouts; 
    * RabbitMQ will redeliver the message only when
    * the worker connection dies. It's fine even if processing
    * a message takes a very, very long time
    */
        try {
                         channel.basicAck(delivery.getEnvelope()
                    .getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //If because of the rabbitmq-server stop ,We will re-try connect to rabbtimq-server after 60s
    if(isSignalBroken){
        log.warn("The rabbitmq Server have broken , We Try to re-connect again After 60 seconds");
        try {
            Thread.sleep(1000*60);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.run();
    }
}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM