1。消息會處於阻塞狀態,可以通過(man rabbitmqctl 可以獲得更多使用方法,常用的有list_queues,list_consumers.list_connections,close_connection,add_vhost,...)
list_channels pid connection ; close_connection [connection] "" ,查看和解決阻塞
rabbitmqctl list_queues -p [vhost] name messages_ready messages_unacknowledged
解決阻塞的辦法,可以在subscribe消息隊列是設置autoAck=true,這樣會避免消息隊列中消息阻塞,這種情況是worker接到消息后,就會把消息從消息隊列刪除,不管消息是否被正確處理,另一種是設置autoAck=false,這樣worker在接受消息后,必須給予服務端一個ack響應,該消息才會從消息隊列中刪除,這樣會防止消息的意外丟失,但要注意的是,消息隊列如果沒有接收到ack響應,該消息對了的消息就會一直阻塞,對於rabbitmq-server來說,他是沒有超時存在的,即除非重啟rabbitmq,否則該消息隊列會一直阻塞,直到收到響應,但如果與該消息隊列的subscirbe斷開的話,則表明過期,即該消息隊列中消息會嘗試重新發消息給一個訂閱者進行處理。
2。關於健壯的消息處理
當rabbitmq server重啟,或意外當掉的話,所用消息的訂閱都會跟着壞掉(當然也可以設置持久化的消息隊列設置),解決辦法是捕獲ShutdownSignalException異常(對rabbitmq)出現該異常說明消息服務無法連到,故可以進行相應的處理,另每次消息發送,消息訂閱之前都要進行一次消息隊列,exchange,綁定的重定義,防止消息對了重啟后改消息隊列/exchang已消失。
組合命令
list_consumers -p [vhost]
list_channels -p [vhost] pid connection
list_connections
close_connection
3。關於消息處理的管理
每個worker都應該有一個可標識的tag,盡量不使用系統生成的,這樣便於以后的debug
Connection conn = null; Channel channel = null; QueueingConsumer consumer = null; try { conn = qFactory.newConnection(); channel = conn.createChannel(); channel.queueDeclare(queueName, false, false, false, null); /** * This tells RabbitMQ not to give more than one message to a * worker at a time. Or, in other words, don't dispatch a new * message to a worker until it has processed and acknowledged * the previous one. Instead, it will dispatch it to the next * worker that is not still busy */ channel.basicQos(1); consumer = new QueueingConsumer(channel); //Use hostname as consume tagname , So that We can monitor who consume this Queue channel.basicConsume(this.queueName, false, hostName, consumer); } catch (IOException e) { e.printStackTrace(); } while (true) { try { //Get next message delivery = consumer.nextDelivery(); } catch (ShutdownSignalException e) { //If rabbitmq-server has closed , out of loop e.printStackTrace(); isSignalBroken = true ; break; } catch (ConsumerCancelledException e) { e.printStackTrace(); log.warn("The consumer has cancelled , Try to re-consume"); //If the channel and conn have closed . try{ //Sleep 1s and reconnect to rabbitmq-server Thread.sleep(1000); conn = qFactory.newConnection(); channel = conn.createChannel(); channel.queueDeclare(queueName, false, false, false, null); channel.basicQos(1); consumer = new QueueingConsumer(channel); channel.basicConsume(this.queueName, false, consumer); continue; }catch (IOException e1) { e1.printStackTrace(); } catch (InterruptedException e2) { e2.printStackTrace(); } } catch (InterruptedException e) { e.printStackTrace(); } try{ //process message }catch (Exception e) { //If throw exception when process message , close channel and conn , make sure this message not block . then re-work e.printStackTrace(); try { channel.basicCancel(hostName); channel.close(); conn.close(); } catch (IOException e1) { e1.printStackTrace(); } continue; } /** * If a consumer dies without sending an ack, RabbitMQ will * understand that a message wasn't processed fully and will * redeliver it to another consumer * There aren't any message timeouts; * RabbitMQ will redeliver the message only when * the worker connection dies. It's fine even if processing * a message takes a very, very long time */ try { channel.basicAck(delivery.getEnvelope() .getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); } } //If because of the rabbitmq-server stop ,We will re-try connect to rabbtimq-server after 60s if(isSignalBroken){ log.warn("The rabbitmq Server have broken , We Try to re-connect again After 60 seconds"); try { Thread.sleep(1000*60); } catch (InterruptedException e) { e.printStackTrace(); } this.run(); } }