在上一篇博客《RabbitMQ入門:Hello RabbitMQ 代碼實例》中,我們通過指定的隊列發送和接收消息,代碼還算是比較簡單的。
假設有這一些比較耗時的任務,按照上一次的那種方式,我們要一直等前面的耗時任務完成了之后才能接着處理后面耗時的任務,那要等多久才能處理完?別擔心,我們今天的主角--工作隊列就可以解決該問題。我們將圍繞下面這個索引展開:
- 什么是工作隊列
- 代碼准備
- 循環分發
- 消息確認
- 公平分發
- 消息持久化
廢話少說,直接展開。
一、什么是工作隊列
工作隊列--用來將耗時的任務分發給多個消費者(工作者),主要解決這樣的問題:處理資源密集型任務,並且還要等他完成。有了工作隊列,我們就可以將具體的工作放到后面去做,將工作封裝為一個消息,發送到隊列中,一個工作進程就可以取出消息並完成工作。如果啟動了多個工作進程,那么工作就可以在多個進程間共享。
二、代碼准備
- 生產者類:NewTask.java
public class NewTask { //隊列名稱 public static final String QUEUE_NAME = "TASK_QUEUE"; //隊列是否需要持久化 public static final boolean DURABLE = false; //需要發送的消息列表 public static final String[] msgs = {"task 1", "task 2", "task 3", "task 4", "task 5", "task 6"}; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.connection & channel connection = factory.newConnection(); channel = connection.createChannel(); // 2.queue channel.queueDeclare(QUEUE_NAME, DURABLE, false, false, null); // 3.publish msg for (int i = 0; i < msgs.length; i++) { channel.basicPublish("", QUEUE_NAME, null, msgs[i].getBytes()); System.out.println("** new task ****:" + msgs[i]); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
- 消費者類:Work.java
public class Work { public static void main(String[] args) { System.out.println("*** Work ***"); ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { //1.connection & channel final Channel channel = factory.newConnection().createChannel(); //2.queue channel.queueDeclare(NewTask.QUEUE_NAME, NewTask.DURABLE, false, false, null); //3. consumer instance Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); //deal task doWork(msg); } }; //4.do consumer boolean autoAck = true; channel.basicConsume(NewTask.QUEUE_NAME, autoAck, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } private static void doWork(String msg) { try { System.out.println("**** deal task begin :" + msg); //假裝task比較耗時,通過sleep()來模擬需要消耗的時間 if ("sleep".equals(msg)) { Thread.sleep(1000 * 60); } else { Thread.sleep(1000); } System.out.println("**** deal task finish :" + msg); } catch (InterruptedException e) { e.printStackTrace(); } } }
- 再來一個消費者類:Work2.java,代碼同Work.java一模一樣。
三、循環分發
我們先啟動Work和Work2,然后啟動NewTask,運行結果如下:
NewTask運行結果:
Work運行結果:
Work2運行結果:
我們發現,消息生產者發送了6條消息,消費者work和work2分別分到了3個消息,而且是循環輪流分發到的,這種分發的方式就是循環分發。
四、消息確認
假如我們在發送的消息里面添加“sleep"
//需要發送的消息列表 public static final String[] msgs = {"sleep", "task 1", "task 2", "task 3", "task 4", "task 5", "task 6"};
根據代碼中的實現,這個sleep要耗時1分鍾,萬一在這1分鍾之內,工作進程崩潰了或者被kill了,會發生什么情況呢?根據上面的代碼:
//4.do consumer boolean autoAck = true; channel.basicConsume(NewTask.QUEUE_NAME, autoAck, consumer);
自動確認為true,每次RabbitMQ向消費者發送消息之后,會自動發確認消息(我工作你放心,不會有問題),這個時候消息會立即從內存中刪除。如果工作者掛了,那將會丟失它正在處理和未處理的所有工作,而且這些工作還不能再交由其他工作者處理,這種丟失屬於客戶端丟失。
我們來驗證下,和剛才的步驟一樣執行程序:
1.NewTask的控制台打印結果: ** new task ****:sleep ** new task ****:task 1 ** new task ****:task 2 ** new task ****:task 3 ** new task ****:task 4 ** new task ****:task 5 ** new task ****:task 6 2.Work的控制台打印結果: **** deal task begin :sleep 3.Work2的控制台打印結果: **** deal task begin :task 1 **** deal task finish :task 1 **** deal task begin :task 3 **** deal task finish :task 3 **** deal task begin :task 5 **** deal task finish :task 5
根據上面的內容,消息生產者發送了7條消息, work2消費了1、3、5 三條,那剩下的sleep、2、4、6 這四條消息肯定是work來處理,只是sleep耗時一分鍾 ,時間差后面的還沒來得及處理,這個時候我們kill掉work,去看下RabbitMQ 管理頁面,沒有未處理的消息,消息隨着work被kill也跟着丟失了。
是不是很可怕?
為了應對這種情況,RabbitMQ支持消息確認。消費者處理完消息之后,會發送一個確認消息告訴RabbitMQ,消息處理完了,你可以刪掉它了。
代碼修改(Work.java和Work2.java同步修改):1.將自動確認改為false,2.消息處理之后再通過channel.basicAck進行消息確認
修改完后,執行程序:
1.NewTask的控制台打印結果: ** new task ****:sleep ** new task ****:task 1 ** new task ****:task 2 ** new task ****:task 3 ** new task ****:task 4 ** new task ****:task 5 ** new task ****:task 6 2.Work的控制台打印結果: **** deal task begin :sleep 3.Work2的控制台打印結果: **** deal task begin :task 1 **** deal task finish :task 1 **** deal task begin :task 3 **** deal task finish :task 3 **** deal task begin :task 5 **** deal task finish :task 5
然后kill掉work,去看RabbitMQ管理頁面,會發現有4條未確認:
再去看下work2的控制台,work2將work未處理完和未來得及處理的消息都給處理了:
等work2處理完后,你再去看RabbitMQ管理頁面,會發現頁面的消息數值也都變成0 了。
五、公平分發
按照上面那種循環分發的方式,每個消費者會分到相同數量的任務,這樣會有一個問題:假如有一些task非常耗時,之前的任務還沒有完成,后面又來了那么多任務,來不及處理,那咋辦? 有的消費者忙的不可開交,有的消費者卻很快處理完事情然后無所事事浪費資源,那咋整?答案就是:公平分發。 怎么實現呢?
發生上述問題的原因就是RabbitMQ收到消息后就立即分發出去,而沒有確認各個工作者未返回確認的消息數量。因此我們可以使用basicQos
方法,並將參數prefetchCount
設為1,告訴RabbitMQ 我每次值處理一條消息,你要等我處理完了再分給我下一個。這樣RabbitMQ就不會輪流分發了,而是尋找空閑的工作者進行分發。
代碼修改(work和Work2同步修改):
執行代碼:
1.NewTask的控制台打印結果: ** new task ****:sleep ** new task ****:task 1 ** new task ****:task 2 ** new task ****:task 3 ** new task ****:task 4 ** new task ****:task 5 ** new task ****:task 6 2.Work的控制台打印結果: **** deal task begin :sleep **** deal task finish :sleep 3.Work2的控制台打印結果: **** deal task begin :task 1 **** deal task finish :task 1 **** deal task begin :task 2 **** deal task finish :task 2 **** deal task begin :task 3 **** deal task finish :task 3 **** deal task begin :task 4 **** deal task finish :task 4 **** deal task begin :task 5 **** deal task finish :task 5 **** deal task begin :task 6 **** deal task finish :task 6
Work只處理了sleep,Work2處理了1、2、3、4、5、6 這個六條消息。
六、消息持久化
上面說到消息確認的時候,提到了工作者被kill的情況。那如果RabbitMQ被stop掉了呢?我們來看下:
這次只啟動Work和NewTask,不啟動Work2,所有消息都交給Work來處理,控制台打印信息:
1.NewTask的控制台打印結果: ** new task ****:sleep ** new task ****:task 1 ** new task ****:task 2 ** new task ****:task 3 ** new task ****:task 4 ** new task ****:task 5 ** new task ****:task 6 2.Work的控制台打印結果: **** deal task begin :sleep
在work處理sleep的過程中,我們停掉RabbitMQ服務
然后重新start服務並執行rabbitmq-plugins enable rabbitmq_management命令,然后查看管理頁面:
你會發現,所有消息都將被清空了。這種丟失屬於服務端丟失。
因此需要將消息進行持久化來應對這種情況。
持久化需要做兩件事情:
- 隊列持久化,在聲明隊列的時候,將第二個參數設為true
另外,由於RabbitMQ不允許重新定義已經存在的隊列,否則就會報錯(上一篇博客中已經提到過了),因此我們將這次的隊列名改下:
- 消息持久化,在發送消息的時候,將第三個參數設為2
然后運行代碼,在work處理sleep的時候將服務停掉,並重新啟動且執行rabbitmq-plugins enable rabbitmq_management命令,然后查看管理頁面:
一共7條消息,未確認的1條(sleep)和ready的6條(1、2、3、4、5、6)。消息被保存了下來。
重新啟動Work,所有消息被消費: