java中延遲任務的處理方式


1、利用延遲隊列

延時隊列,第一他是個隊列,所以具有對列功能第二就是延時,這就是延時對列,功能也就是將任務放在該延時對列中,只有到了延時時刻才能從該延時對列中獲取任務否則獲取不到……

應用場景比較多,比如延時1分鍾發短信,延時1分鍾再次執行等,下面先看看延時隊列demo之后再看延時隊列在項目中的使用:

簡單的延時隊列要有三部分:第一實現了Delayed接口的消息體、第二消費消息的消費者、第三存放消息的延時隊列,那下面就來看看延時隊列demo。

一、消息體

package com.delqueue;  
  
import java.util.concurrent.Delayed;  
import java.util.concurrent.TimeUnit;  
  
/** 
 * 消息體定義 實現Delayed接口就是實現兩個方法即compareTo 和 getDelay最重要的就是getDelay方法,這個方法用來判斷是否到期…… */  
public class Message implements Delayed {  
    private int id;  
    private String body; // 消息內容  
    private long excuteTime;// 延遲時長,這個是必須的屬性因為要按照這個判斷延時時長。  
  
    public int getId() {  
        return id;  
    }  
  
    public String getBody() {  
        return body;  
    }  
  
    public long getExcuteTime() {  
        return excuteTime;  
    }  
  
    public Message(int id, String body, long delayTime) {  
        this.id = id;  
        this.body = body;  
        this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();  
    }  
  
    // 自定義實現比較方法返回 1 0 -1三個參數  
    @Override  
    public int compareTo(Delayed delayed) {  
        Message msg = (Message) delayed;  
        return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1  
                : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);  
    }  
  
    // 延遲任務是否到時就是按照這個方法判斷如果返回的是負數則說明到期否則還沒到期  
    @Override  
    public long getDelay(TimeUnit unit) {  
        return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);  
    }  
}  

 

二、消息消費者

package com.delqueue;  
  
import java.util.concurrent.DelayQueue;  
  
public class Consumer implements Runnable {  
    // 延時隊列 ,消費者從其中獲取消息進行消費  
    private DelayQueue<Message> queue;  
  
    public Consumer(DelayQueue<Message> queue) {  
        this.queue = queue;  
    }  
  
    @Override  
    public void run() {  
        while (true) {  
            try {  
                Message take = queue.take();  
                System.out.println("消費消息id:" + take.getId() + " 消息體:" + take.getBody());  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}  

三、延時隊列

package com.delqueue;  
  
import java.util.concurrent.DelayQueue;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
  
public class DelayQueueTest {  
     public static void main(String[] args) {    
            // 創建延時隊列    
            DelayQueue<Message> queue = new DelayQueue<Message>();    
            // 添加延時消息,m1 延時3s    
            Message m1 = new Message(1, "world", 3000);    
            // 添加延時消息,m2 延時10s    
            Message m2 = new Message(2, "hello", 10000);    
            //將延時消息放到延時隊列中  
            queue.offer(m2);    
            queue.offer(m1);    
            // 啟動消費線程 消費添加到延時隊列中的消息,前提是任務到了延期時間   
            ExecutorService exec = Executors.newFixedThreadPool(1);  
            exec.execute(new Consumer(queue));  
            exec.shutdown();  
        }    
}  

將消息體放入延遲隊列中,在啟動消費者線程去消費延遲隊列中的消息,如果延遲隊列中的消息到了延遲時間則可以從中取出消息否則無法取出消息也就無法消費。

這就是延遲隊列demo,下面我們來說說在真實環境下的使用。

使用場景描述:

在打車軟件中對訂單進行派單的流程,當有訂單的時候給該訂單篩選司機,然后給當訂單綁定司機,但是有時運氣沒那么好,訂單進來后第一次沒有篩選到合適的司機,但我們也不能就此結束派單,而是將該訂單的信息放到延時隊列中過個2秒鍾在進行一次,其實這個2秒鍾就是一個延遲,所以這里我們就可以使用延時隊列來實現……

下面看看簡單的流程圖:

 

下面來看看具體代碼實現:

在項目中有如下幾個類:第一 、任務類   第二、按照任務類組裝的消息體類  第三、延遲隊列管理類

任務類即執行篩選司機、綁單、push消息的任務類

package com.test.delayqueue;  
/** 
 * 具體執行相關業務的業務類 
 * @author whd 
 * @date 2017年9月25日 上午12:49:32 
 */  
public class DelayOrderWorker  implements Runnable {  
  
    @Override  
    public void run() {  
        // TODO Auto-generated method stub  
        //相關業務邏輯處理  
        System.out.println(Thread.currentThread().getName()+" do something ……");  
    }  
}  

消息體類,在延時隊列中這個實現了Delayed接口的消息類是比不可少的,實現接口時有一個getDelay(TimeUnit unit)方法,這個方法就是判斷是否到期的

這里定義的是一個泛型類,所以可以將我們上面的任務類作為其中的task,這樣就將任務類分裝成了一個消息體

package com.test.delayqueue;  
  
import java.util.concurrent.Delayed;  
import java.util.concurrent.TimeUnit;  
  
/** 
 * 延時隊列中的消息體將任務封裝為消息體 
 *  
 * @author whd 
 * @date 2017年9月25日 上午12:48:30 
 * @param <T> 
 */  
public class DelayOrderTask<T extends Runnable> implements Delayed {  
    private final long time;  
    private final T task; // 任務類,也就是之前定義的任務類  
  
    /** 
     * @param timeout 
     *            超時時間(秒) 
     * @param task 
     *            任務 
     */  
    public DelayOrderTask(long timeout, T task) {  
        this.time = System.nanoTime() + timeout;  
        this.task = task;  
    }  
  
    @Override  
    public int compareTo(Delayed o) {  
        // TODO Auto-generated method stub  
        DelayOrderTask other = (DelayOrderTask) o;  
        long diff = time - other.time;  
        if (diff > 0) {  
            return 1;  
        } else if (diff < 0) {  
            return -1;  
        } else {  
            return 0;  
        }  
    }  
  
    @Override  
    public long getDelay(TimeUnit unit) {  
        // TODO Auto-generated method stub  
        return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);  
    }  
  
    @Override  
    public int hashCode() {  
        return task.hashCode();  
    }  
  
    public T getTask() {  
        return task;  
    }  
}  

延時隊列管理類,這個類主要就是將任務類封裝成消息並並添加到延時隊列中,以及輪詢延時隊列從中取出到時的消息體,在獲取任務類放到線程池中執行任務

package com.test.delayqueue;  
  
import java.util.Map;  
import java.util.concurrent.DelayQueue;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.atomic.AtomicLong;  
  
/** 
 * 延時隊列管理類,用來添加任務、執行任務 
 *  
 * @author whd 
 * @date 2017年9月25日 上午12:44:59 
 */  
public class DelayOrderQueueManager {  
    private final static int DEFAULT_THREAD_NUM = 5;  
    private static int thread_num = DEFAULT_THREAD_NUM;  
    // 固定大小線程池  
    private ExecutorService executor;  
    // 守護線程  
    private Thread daemonThread;  
    // 延時隊列  
    private DelayQueue<DelayOrderTask<?>> delayQueue;  
    private static final AtomicLong atomic = new AtomicLong(0);  
    private final long n = 1;  
    private static DelayOrderQueueManager instance = new DelayOrderQueueManager();  
  
    private DelayOrderQueueManager() {  
        executor = Executors.newFixedThreadPool(thread_num);  
        delayQueue = new DelayQueue<>();  
        init();  
    }  
  
    public static DelayOrderQueueManager getInstance() {  
        return instance;  
    }  
  
    /** 
     * 初始化 
     */  
    public void init() {  
        daemonThread = new Thread(() -> {  
            execute();  
        });  
        daemonThread.setName("DelayQueueMonitor");  
        daemonThread.start();  
    }  
  
    private void execute() {  
        while (true) {  
            Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();  
            System.out.println("當前存活線程數量:" + map.size());  
            int taskNum = delayQueue.size();  
            System.out.println("當前延時任務數量:" + taskNum);  
            try {  
                // 從延時隊列中獲取任務  
                DelayOrderTask<?> delayOrderTask = delayQueue.take();  
                if (delayOrderTask != null) {  
                    Runnable task = delayOrderTask.getTask();  
                    if (null == task) {  
                        continue;  
                    }  
                    // 提交到線程池執行task  
                    executor.execute(task);  
                }  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
        }  
    }  
  
    /** 
     * 添加任務 
     *  
     * @param task 
     * @param time 
     *            延時時間 
     * @param unit 
     *            時間單位 
     */  
    public void put(Runnable task, long time, TimeUnit unit) {  
        // 獲取延時時間  
        long timeout = TimeUnit.NANOSECONDS.convert(time, unit);  
        // 將任務封裝成實現Delayed接口的消息體  
        DelayOrderTask<?> delayOrder = new DelayOrderTask<>(timeout, task);  
        // 將消息體放到延時隊列中  
        delayQueue.put(delayOrder);  
    }  
  
    /** 
     * 刪除任務 
     *  
     * @param task 
     * @return 
     */  
    public boolean removeTask(DelayOrderTask task) {  
  
        return delayQueue.remove(task);  
    }  
}  

測試類

package com.delqueue;  
  
import java.util.concurrent.TimeUnit;  
  
import com.test.delayqueue.DelayOrderQueueManager;  
import com.test.delayqueue.DelayOrderWorker;  
  
public class Test {  
    public static void main(String[] args) {  
        DelayOrderWorker work1 = new DelayOrderWorker();// 任務1  
        DelayOrderWorker work2 = new DelayOrderWorker();// 任務2  
        DelayOrderWorker work3 = new DelayOrderWorker();// 任務3  
        // 延遲隊列管理類,將任務轉化消息體並將消息體放入延遲對列中等待執行  
        DelayOrderQueueManager manager = DelayOrderQueueManager.getInstance();  
        manager.put(work1, 3000, TimeUnit.MILLISECONDS);  
        manager.put(work2, 6000, TimeUnit.MILLISECONDS);  
        manager.put(work3, 9000, TimeUnit.MILLISECONDS);  
    }  
  
}  

OK 這就是項目中的具體使用情況,當然具體內容被忽略,整體框架就是這樣,還有這里使用java的延時隊列但是這種方式是有問題的如果如果down機則會出現任務丟失,所以也可以考慮使用mq、redis來實現

 

 

2、mq實現延遲消息

在rabbitmq 3.5.7及以上的版本提供了一個插件(rabbitmq-delayed-message-exchange)來實現延遲隊列功能。同時插件依賴Erlang/OPT 18.0及以上。
插件源碼地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

插件下載地址:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

安裝:

進入插件安裝目錄
{rabbitmq-server}/plugins/(可以查看一下當前已存在的插件)
下載插件

rabbitmq_delayed_message_exchange

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

(如果下載的文件名稱不規則就手動重命名一下如:

rabbitmq_delayed_message_exchange-0.0.1.ez)

啟用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

關閉插件

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

插件使用

通過聲明一個x-delayed-message類型的exchange來使用delayed-messaging特性
x-delayed-message是插件提供的類型,並不是rabbitmq本身的,發送消息的時候通過在header添加”x-delay”參數來控制消息的延時時間

 

直接在maven工程的pom.xml文件中加入

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Spring Boot的版本我使用的是 2.0.1.RELEASE .

接下來在 application.properties 文件中加入redis配置:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

定義ConnectionFactory和RabbitTemplate

也很簡單,代碼如下:

package com.mq.rabbitmq;
 
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
  private String host;
  private int port;
  private String userName;
  private String password;
 
  @Bean
  public ConnectionFactory connectionFactory() {
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);
    cachingConnectionFactory.setUsername(userName);
    cachingConnectionFactory.setPassword(password);
    cachingConnectionFactory.setVirtualHost("/");
    cachingConnectionFactory.setPublisherConfirms(true);
    return cachingConnectionFactory;
  }
 
  @Bean
  public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    return rabbitTemplate;
  }
 
  public String getHost() {
    return host;
  }
 
  public void setHost(String host) {
    this.host = host;
  }
 
  public int getPort() {
    return port;
  }
 
  public void setPort(int port) {
    this.port = port;
  }
 
  public String getUserName() {
    return userName;
  }
 
  public void setUserName(String userName) {
    this.userName = userName;
  }
 
  public String getPassword() {
    return password;
  }
 
  public void setPassword(String password) {
    this.password = password;
  }
}

Exchange和Queue配置

package com.mq.rabbitmq;
 
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
public class QueueConfig {
 
  @Bean
  public CustomExchange delayExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange("test_exchange", "x-delayed-message",true, false,args);
  }
 
  @Bean
  public Queue queue() {
    Queue queue = new Queue("test_queue_1", true);
    return queue;
  }
 
  @Bean
  public Binding binding() {
    return BindingBuilder.bind(queue()).to(delayExchange()).with("test_queue_1").noargs();
  }
}

這里要特別注意的是,使用的是 CustomExchange ,不是 DirectExchange ,另外 CustomExchange 的類型必須是 x-delayed-message 。

實現消息發送

package com.mq.rabbitmq;
 
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import java.text.SimpleDateFormat;
import java.util.Date;
 
@Service
public class MessageServiceImpl {
 
  @Autowired
  private RabbitTemplate rabbitTemplate;
 
  public void sendMsg(String queueName,String msg) {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    System.out.println("消息發送時間:"+sdf.format(new Date()));
    rabbitTemplate.convertAndSend("test_exchange", queueName, msg, new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setHeader("x-delay",3000);
        return message;
      }
    });
  }
}

注意在發送的時候,必須加上一個header

x-delay

在這里我設置的延遲時間是3秒。

消息消費者

package com.mq.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class MessageReceiver {
 
  @RabbitListener(queues = "test_queue_1")
  public void receive(String msg) {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    System.out.println("消息接收時間:"+sdf.format(new Date()));
    System.out.println("接收到的消息:"+msg);
  }
}

運行Spring Boot程序和發送消息

直接在main方法里運行Spring Boot程序,Spring Boot會自動解析 MessageReceiver 類的。

接下來只需要用Junit運行一下發送消息的接口即可。

package com.mq.rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
  @Autowired
  private MessageServiceImpl messageService;
  @Test
  public void send() {
    messageService.sendMsg("test_queue_1","hello i am delay msg");
  }
}

運行完后,可以看到如下信息:

消息發送時間:2018-05-03 12:44:53
3秒鍾后,Spring Boot控制台會輸出:
消息接收時間:2018-05-03 12:44:56
接收到的消息:hello i am delay msg

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM