三.RabbitMQ之異步消息隊列(Work Queue)


  上一篇文章簡要介紹了RabbitMQ的基本知識點,並且寫了一個簡單的發送和接收消息的demo.這一篇文章繼續介紹關於Work Queue(工作隊列)方面的知識點,用於實現多個工作進程的分發式任務。

  一.Work Queues:我們可以把它翻譯成工作隊列,他有什么用呢?它的主要作用就是規避了實時的執行資源密集型任務( resource-intensive task),因為這會造成響應時間過長,影響用戶體驗。Work Queues通過把一些實時性不強的任務存儲到消息隊列中,然后后台的工作者(worker)會在特定的情況下完成這些任務。

  舉個例子來說,用戶注冊是一個資源密集型的任務,因為它需要經過存儲用戶基本信息(用戶名,郵箱,密碼),發送郵箱驗證碼、或者更有甚者,存入注冊日志(操作日志)等步驟。傳統的串行做法如下所示。

  

 

  可以看到,在用戶填寫完注冊信息並點擊提交以后,需要經歷3個步驟,其中第一個步驟,判斷注冊信息是否合法,合法則存入數據庫,這是注冊的核心步驟,而后面兩個步驟並不是十分迫切,無需在這個請求中馬上完成。而傳統的串行模式一般都是在一個請求中塞滿邏輯處理,無論是否迫切的邏輯請求。這樣會大大加重一個請求的負擔,無論是用戶等待時間,程序的壓力上,都不是一種好的做法。

  尤其是對於web應用,我們知道一個web請求是一個短連接,在一個短連接中做過於復雜的邏輯運算操作,顯然是不合適的。所以消息分布隊列在web應用中尤為有用。

 

  我們將上述串行的的方式改為用消息隊列的形式來實現,可以看到此時我把一個請求里面做的事情分解到三個請求來實現,這樣每個請求的時間都降低了,特別對於用戶而言,他的等待時間大大減少,而這樣也可以充分利用了cpu的性能。

以上便是工作隊列主要的原理及優點。

二.一個work queues的demo

  延續上一個demo的軌跡,並結合我們舉的注冊的例子,模擬用戶注冊業務。

  1.首先,我們編寫一個生產者,它除了執行將注冊數據存儲進數據庫的方法外,還向RabbitMQ隊列里發送了兩條消息,分別用於存儲有關郵箱驗證和日志存儲的內容。代碼如下。

  

package com.xdx.learn;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import net.sf.json.JSONObject;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class NewTask {
    private final static String QUEUE_NAME="register";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.1.195");//服務器ip
        factory.setPort(5672);//端口
        factory.setUsername("xdx");//登錄名
        factory.setPassword("xxxxx");//密碼
        Connection connection=factory.newConnection();//建立連接
        Channel channel=connection.createChannel();//建立頻道
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);//建立一個隊列
        System.out.println("首先,保存用戶注冊數據到數據庫");
        JSONObject jsonObjet1=new JSONObject();
        jsonObjet1.put("msgType", "email");//該消息是針對發送驗證郵件的。
        jsonObjet1.put("content", "執行發送驗證郵件到郵箱操作");
        String message1=jsonObjet1.toString();
        channel.basicPublish("", QUEUE_NAME, null, message1.getBytes());//發布第一個異步消息
        System.out.println(channel+" Sent '"+message1+"'");
        JSONObject jsonObject2=new JSONObject();
        jsonObject2.put("msgType", "log");//該消息針對存儲操作日志
        jsonObject2.put("content", "執行存儲操作日志的操作");
        String message2=jsonObject2.toString();
        channel.basicPublish("", QUEUE_NAME, null, message2.getBytes());//發布第二個異步消息
        System.out.println(channel+" Sent '"+message2+"'");
        channel.close();
        connection.close();
    }
}

  由上面的代碼我們知道我們可以傳輸較為復雜的消息,我們用一個json類型對象來封裝消息,並將該消息存儲到消息隊列中。執行上述代碼,得到結果如下。

  首先,保存用戶注冊數據到數據庫
  AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"執行發送驗證郵件到郵箱操作"}'
  AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"執行存儲操作日志的操作"}'

  然后我們再到RabbitMQ的后台看看現在的queue的情況,發現多了一個名叫register的queue,並且在該queue中有兩個消息,如下圖所示。

  2.接下來我們編寫一個消費者worker1,在worker1中,根據接收到的消息類型,調用不同的處理方法來處理消息中的任務。如下所示。

  

package com.xdx.learn;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import net.sf.json.JSONObject;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

public class Worker1 {
    private final static String QUEUE_NAME="register";

    public static void main(String[] args) throws IOException, TimeoutException {
        //下面的配置與生產者相對應
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.1.195");//服務器ip
        factory.setPort(5672);//端口
        factory.setUsername("xdx");//登錄名
        factory.setPassword("xxxxx");//密碼
        Connection connection=factory.newConnection();
        final Channel channel=connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" worker1 Waiting for messages. To exit press CTRL+C");
        //每次從隊列獲取的數量
        channel.basicQos(1);
        //defaultConsumer實現了Consumer,我們將使用它來緩存生產者發送過來儲存在隊列中的消息。當我們可以接收消息的時候,從中獲取。
        Consumer consumer=new DefaultConsumer(channel){
             @Override
              public void handleDelivery(String consumerTag, Envelope envelope,
                                         AMQP.BasicProperties properties, byte[] body)
                  throws IOException {
                String message = new String(body, "UTF-8");
                try {
                     JSONObject jsonObject=JSONObject.fromObject(message);
                        String msgType=jsonObject.get("msgType").toString();
                        System.out.println(" wokrer1 Received message,msgType is " + msgType);
                        if(msgType.equals("email")){
                            //調用郵箱驗證代碼
                            System.out.println("worker1 do "+jsonObject.get("content"));
                        }else{
                            //調用日志保存代碼
                            System.out.println("worker1 do "+jsonObject.get("content"));
                        }
                } catch (Exception e) {
                     channel.abort();
                }finally{
                    System.out.println("Worker1 Done");
                    //注意這句為必須,否則會造成RabbitMQ因為重復的重新發送已處理的消息而內存溢出
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
               
              }
        };
        //接收到消息以后,推送給RabbitMQ,確認收到了消息。第二個參數為false,表示手動確認消息處理完畢
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

}

  執行上述的代碼,可以得到如下結果

   worker1 Waiting for messages. To exit press CTRL+C
   wokrer1 Received message,msgType is email
  worker1 do 執行發送驗證郵件到郵箱操作
  Worker1 Done
   wokrer1 Received message,msgType is log
  worker1 do 執行存儲操作日志的操作
  Worker1 Done

  可以看到我們能夠解析到消息里面的內容,並且根據不同的消息類別調用不同的處理邏輯,上述代碼需要注意的知識點均有注釋。執行完畢后,再到RabbitMQ后台查看,發現待處理消息已經為0.

  3.並發處理,我們稍微改動一下NewTask方法,讓它一次性發送多條消息到隊列中。

  

package com.xdx.learn;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import net.sf.json.JSONObject;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class NewTask {
    private final static String QUEUE_NAME="register";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.1.195");//服務器ip
        factory.setPort(5672);//端口
        factory.setUsername("xdx");//登錄名
        factory.setPassword("xxxxx");//密碼
        Connection connection=factory.newConnection();//建立連接
        Channel channel=connection.createChannel();//建立頻道
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);//建立一個隊列
        System.out.println("向消息隊列中插入10條郵箱驗證消息和10條日志存儲消息");
        for(int i=0;i<10;i++){
            JSONObject jsonObjet1=new JSONObject();
            jsonObjet1.put("msgType", "email");//該消息是針對發送驗證郵件的。
            jsonObjet1.put("content", "執行發送驗證郵件到郵箱操作"+i);
            String message1=jsonObjet1.toString();
            channel.basicPublish("", QUEUE_NAME, null, message1.getBytes());//發布第一個異步消息
            System.out.println(channel+" Sent '"+message1+"'");
            JSONObject jsonObject2=new JSONObject();
            jsonObject2.put("msgType", "log");//該消息針對存儲操作日志
            jsonObject2.put("content", "執行存儲操作日志的操作"+i);
            String message2=jsonObject2.toString();
            channel.basicPublish("", QUEUE_NAME, null, message2.getBytes());
            System.out.println(channel+" Sent '"+message2+"'");
        }
        
        channel.close();
        connection.close();
    }
}

  執行結果如下:

向消息隊列中插入10條郵箱驗證消息和10條日志存儲消息
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"執行發送驗證郵件到郵箱操作0"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"執行存儲操作日志的操作0"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"執行發送驗證郵件到郵箱操作1"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"執行存儲操作日志的操作1"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"執行發送驗證郵件到郵箱操作2"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"執行存儲操作日志的操作2"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"執行發送驗證郵件到郵箱操作3"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"執行存儲操作日志的操作3"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"執行發送驗證郵件到郵箱操作4"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"執行存儲操作日志的操作4"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"執行發送驗證郵件到郵箱操作5"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"執行存儲操作日志的操作5"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"執行發送驗證郵件到郵箱操作6"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"執行存儲操作日志的操作6"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"執行發送驗證郵件到郵箱操作7"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"執行存儲操作日志的操作7"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"執行發送驗證郵件到郵箱操作8"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"執行存儲操作日志的操作8"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"email","content":"執行發送驗證郵件到郵箱操作9"}'
AMQChannel(amqp://xdx@192.168.1.195:5672/,1) Sent '{"msgType":"log","content":"執行存儲操作日志的操作9"}'

  這樣以后,現在隊列中已經有了20條的數據,如下所示。

  

  可以看到生產者已經生產成功,接下來我再編寫一個消費者Worker2,用於分擔Worker1的負擔,它的代碼與worker2基本類似,我們修改了worker1和worker2的代碼,加入睡眠機制,每一個worker執行完消息的任務以后,如下。

package com.xdx.learn;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import net.sf.json.JSONObject;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

public class Worker2 {
    private final static String QUEUE_NAME="register";

    public static void main(String[] args) throws IOException, TimeoutException {
        //下面的配置與生產者相對應
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.1.195");//服務器ip
        factory.setPort(5672);//端口
        factory.setUsername("xdx");//登錄名
        factory.setPassword("xxxx");//密碼
        Connection connection=factory.newConnection();
        final Channel channel=connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //每次從隊列獲取的數量
        channel.basicQos(1);
        //defaultConsumer實現了Consumer,我們將使用它來緩存生產者發送過來儲存在隊列中的消息。當我們可以接收消息的時候,從中獲取。
        Consumer consumer=new DefaultConsumer(channel){
             @Override
              public void handleDelivery(String consumerTag, Envelope envelope,
                                         AMQP.BasicProperties properties, byte[] body)
                  throws IOException {
                String message = new String(body, "UTF-8");
                try {
                     JSONObject jsonObject=JSONObject.fromObject(message);
                        String msgType=jsonObject.get("msgType").toString();
                        if(msgType.equals("email")){
                            //調用郵箱驗證代碼
                            System.out.println("worker2 do "+jsonObject.get("content"));
                        }else{
                            //調用日志保存代碼
                            System.out.println("worker2 do "+jsonObject.get("content"));
                        }
                } catch (Exception e) {
                     channel.abort();
                }finally{
                    channel.basicAck(envelope.getDeliveryTag(),false);
                  //執行以后睡一會,好讓其他的worker有機會執行任務
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
               
              }
        };
        //接收到消息以后,推送給RabbitMQ,告訴他確認收到了消息。第二個參數為false,表示手動確認消息處理完畢
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }

}

 

  現在我同時執行worker1和worker2的代碼。

這樣以后,在RabbitMQ的控制台,已經沒有未處理的消息了。

   可以看到worker1和worker2確實分工合作,共同處理了這些消息隊列中的任務。

  三.擴展

  1.message acknowledgment(消息確認):如果消費者在沒有處理完一個消息就掛掉了,則這個消息就會遺失,所以必須在消費者代碼中通知給RabbitMQ。默認是手動通知的,這樣可以確保消息不會遺失。如果沒有接收到確認,RabbitMQ會指派另外一個消費者處理任務。 channel.basicAck(envelope.getDeliveryTag(),false);和channel.basicConsume(QUEUE_NAME, false, consumer);都是必須的,否則會造成RabbitMQ無法釋放已經處理過的消息和導致內存溢出。

  2.Message durability(消息持久化):可修改channel.queueDeclare("task_queue", durable, false, false, null);及channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());來使消息能持久化。需要注意的是,如果一個queue已經定義為非持久化,則不能再改為持久化,會出錯,此時必須定義一個新的queue(換個名字)

  3.Fair dispatch(分配策略):默認情況下,RabbitMQ會平均的分配消息給消費者,它不會管這個消費者目前手上有多少未完成的任務,這可能會造成有的消費者很忙,有的消費者很閑。通過channel.basicQos(1);可以指定消費者每次只接收一條消息,只有當這條消息已經處理完畢,並且確認以后,才接收下一條的消息。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM