1。消息会处于阻塞状态,可以通过(man rabbitmqctl 可以获得更多使用方法,常用的有list_queues,list_consumers.list_connections,close_connection,add_vhost,...)
list_channels pid connection ; close_connection [connection] "" ,查看和解决阻塞
rabbitmqctl list_queues -p [vhost] name messages_ready messages_unacknowledged
解决阻塞的办法,可以在subscribe消息队列是设置autoAck=true,这样会避免消息队列中消息阻塞,这种情况是worker接到消息后,就会把消息从消息队列删除,不管消息是否被正确处理,另一种是设置autoAck=false,这样worker在接受消息后,必须给予服务端一个ack响应,该消息才会从消息队列中删除,这样会防止消息的意外丢失,但要注意的是,消息队列如果没有接收到ack响应,该消息对了的消息就会一直阻塞,对于rabbitmq-server来说,他是没有超时存在的,即除非重启rabbitmq,否则该消息队列会一直阻塞,直到收到响应,但如果与该消息队列的subscirbe断开的话,则表明过期,即该消息队列中消息会尝试重新发消息给一个订阅者进行处理。
2。关于健壮的消息处理
当rabbitmq server重启,或意外当掉的话,所用消息的订阅都会跟着坏掉(当然也可以设置持久化的消息队列设置),解决办法是捕获ShutdownSignalException异常(对rabbitmq)出现该异常说明消息服务无法连到,故可以进行相应的处理,另每次消息发送,消息订阅之前都要进行一次消息队列,exchange,绑定的重定义,防止消息对了重启后改消息队列/exchang已消失。
组合命令
list_consumers -p [vhost]
list_channels -p [vhost] pid connection
list_connections
close_connection
3。关于消息处理的管理
每个worker都应该有一个可标识的tag,尽量不使用系统生成的,这样便于以后的debug
Connection conn = null; Channel channel = null; QueueingConsumer consumer = null; try { conn = qFactory.newConnection(); channel = conn.createChannel(); channel.queueDeclare(queueName, false, false, false, null); /** * This tells RabbitMQ not to give more than one message to a * worker at a time. Or, in other words, don't dispatch a new * message to a worker until it has processed and acknowledged * the previous one. Instead, it will dispatch it to the next * worker that is not still busy */ channel.basicQos(1); consumer = new QueueingConsumer(channel); //Use hostname as consume tagname , So that We can monitor who consume this Queue channel.basicConsume(this.queueName, false, hostName, consumer); } catch (IOException e) { e.printStackTrace(); } while (true) { try { //Get next message delivery = consumer.nextDelivery(); } catch (ShutdownSignalException e) { //If rabbitmq-server has closed , out of loop e.printStackTrace(); isSignalBroken = true ; break; } catch (ConsumerCancelledException e) { e.printStackTrace(); log.warn("The consumer has cancelled , Try to re-consume"); //If the channel and conn have closed . try{ //Sleep 1s and reconnect to rabbitmq-server Thread.sleep(1000); conn = qFactory.newConnection(); channel = conn.createChannel(); channel.queueDeclare(queueName, false, false, false, null); channel.basicQos(1); consumer = new QueueingConsumer(channel); channel.basicConsume(this.queueName, false, consumer); continue; }catch (IOException e1) { e1.printStackTrace(); } catch (InterruptedException e2) { e2.printStackTrace(); } } catch (InterruptedException e) { e.printStackTrace(); } try{ //process message }catch (Exception e) { //If throw exception when process message , close channel and conn , make sure this message not block . then re-work e.printStackTrace(); try { channel.basicCancel(hostName); channel.close(); conn.close(); } catch (IOException e1) { e1.printStackTrace(); } continue; } /** * If a consumer dies without sending an ack, RabbitMQ will * understand that a message wasn't processed fully and will * redeliver it to another consumer * There aren't any message timeouts; * RabbitMQ will redeliver the message only when * the worker connection dies. It's fine even if processing * a message takes a very, very long time */ try { channel.basicAck(delivery.getEnvelope() .getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); } } //If because of the rabbitmq-server stop ,We will re-try connect to rabbtimq-server after 60s if(isSignalBroken){ log.warn("The rabbitmq Server have broken , We Try to re-connect again After 60 seconds"); try { Thread.sleep(1000*60); } catch (InterruptedException e) { e.printStackTrace(); } this.run(); } }