rabbitmq 學習筆記


1。消息會處於阻塞狀態,可以通過(man rabbitmqctl 可以獲得更多使用方法,常用的有list_queues,list_consumers.list_connections,close_connection,add_vhost,...)

list_channels pid connection ; close_connection [connection] "" ,查看和解決阻塞 

rabbitmqctl list_queues -p [vhost] name messages_ready messages_unacknowledged

解決阻塞的辦法,可以在subscribe消息隊列是設置autoAck=true,這樣會避免消息隊列中消息阻塞,這種情況是worker接到消息后,就會把消息從消息隊列刪除,不管消息是否被正確處理,另一種是設置autoAck=false,這樣worker在接受消息后,必須給予服務端一個ack響應,該消息才會從消息隊列中刪除,這樣會防止消息的意外丟失,但要注意的是,消息隊列如果沒有接收到ack響應,該消息對了的消息就會一直阻塞,對於rabbitmq-server來說,他是沒有超時存在的,即除非重啟rabbitmq,否則該消息隊列會一直阻塞,直到收到響應,但如果與該消息隊列的subscirbe斷開的話,則表明過期,即該消息隊列中消息會嘗試重新發消息給一個訂閱者進行處理。

2。關於健壯的消息處理
當rabbitmq server重啟,或意外當掉的話,所用消息的訂閱都會跟着壞掉(當然也可以設置持久化的消息隊列設置),解決辦法是捕獲ShutdownSignalException異常(對rabbitmq)出現該異常說明消息服務無法連到,故可以進行相應的處理,另每次消息發送,消息訂閱之前都要進行一次消息隊列,exchange,綁定的重定義,防止消息對了重啟后改消息隊列/exchang已消失。

組合命令

list_consumers -p [vhost]
list_channels -p [vhost] pid connection
list_connections 
close_connection

3。關於消息處理的管理
每個worker都應該有一個可標識的tag,盡量不使用系統生成的,這樣便於以后的debug

Connection conn = null;
Channel channel = null;
QueueingConsumer consumer = null;
try {
    conn = qFactory.newConnection();
    channel = conn.createChannel();
    channel.queueDeclare(queueName, false, false, false, null);

    /**
    * This tells RabbitMQ not to give more than one message to a
    * worker at a time. Or, in other words, don't dispatch a new
     * message to a worker until it has processed and acknowledged
    * the previous one. Instead, it will dispatch it to the next
    * worker that is not still busy
    */
        channel.basicQos(1);

    consumer = new QueueingConsumer(channel);
    //Use hostname as consume tagname , So that We can monitor who consume this Queue
                 channel.basicConsume(this.queueName, false, hostName, consumer);
    } catch (IOException e) {
        e.printStackTrace();
    }

    while (true) {
        try {
            //Get next message
            delivery = consumer.nextDelivery();
        } catch (ShutdownSignalException e) {
        //If rabbitmq-server has closed , out of loop
            e.printStackTrace();
            isSignalBroken = true ;
            break;
        } catch (ConsumerCancelledException e) {
            e.printStackTrace();
            log.warn("The consumer has cancelled , Try to re-consume");
            //If the channel and conn have closed .
            try{
            //Sleep 1s and reconnect to rabbitmq-server
                Thread.sleep(1000);
                conn = qFactory.newConnection();
                channel = conn.createChannel();
                            channel.queueDeclare(queueName, false, false, false, null);
                            
                channel.basicQos(1);
                            
                consumer = new  QueueingConsumer(channel);
                        channel.basicConsume(this.queueName, false, consumer);
                continue;
            }catch (IOException e1) {
                e1.printStackTrace();
            } catch (InterruptedException e2) {
                e2.printStackTrace();
            }
                        
        } catch (InterruptedException e) {
            e.printStackTrace();
        }  
                    
                    
        try{
                        
            //process message

        }catch (Exception e) {
            //If throw exception when process message , close channel and conn , make sure this message not block . then re-work
            e.printStackTrace();
            try {
                            channel.basicCancel(hostName);
                channel.close();
                conn.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            continue;
        }
                    
    /**
    * If a consumer dies without sending an ack, RabbitMQ will
    * understand that a message wasn't processed fully and will
    * redeliver it to another consumer 
    * There aren't any message timeouts; 
    * RabbitMQ will redeliver the message only when
    * the worker connection dies. It's fine even if processing
    * a message takes a very, very long time
    */
        try {
                         channel.basicAck(delivery.getEnvelope()
                    .getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //If because of the rabbitmq-server stop ,We will re-try connect to rabbtimq-server after 60s
    if(isSignalBroken){
        log.warn("The rabbitmq Server have broken , We Try to re-connect again After 60 seconds");
        try {
            Thread.sleep(1000*60);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.run();
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM