JVM優先級線程池做任務隊列


前言

我們都知道 web 服務的工作大多是接受 http 請求,並返回處理后的結果。服務器接受的每一個請求又可以看是一個任務。一般而言這些請求任務會根據請求的先后有序處理,如果請求任務的處理比較耗時,往往就需要排隊了。而同時不同的任務直接可能會存在一些優先級的變化,這時候就需要引入任務隊列並進行管理了。可以做任務隊列的東西有很多,Java 自帶的線程池,以及其他的消息中間件都可以。

同步與異步

這個問題在之前已經提過很多次了,有些任務是需要請求后立即返回結果的,而有的則不需要。設想一下你下單購物的場景,付完錢后,系統只需要返回一個支付成功即可,后續的積分增加、優惠券發放、安排發貨等等業務都不需要實時返回給用戶的,這些就是異步的任務。大量的異步任務到達我們部署的服務上,由於處理效率的瓶頸,無法達到實時處理,因此與需要用隊列將他們暫時保存起來,排隊處理。

線程池

在 Java 中提到隊列,我們除了想到基本的數據結構之外,應該還有線程池。線程池自帶一套機制可以實現任務的排隊和執行,可以滿足單點環境下絕大多數異步化的場景。下面是典型的一個處理流程:

// 注入合適類型的線程池
@Autowired
private final ThreadPoolExecutor asyncPool;
@RequestMapping(value = "/async/someOperate", method = RequestMethod.POST)
public RestResult someOperate(HttpServletRequest request, String params,String callbackUrl {
    // 接受請求后 submit 到線程池排隊處理
    asyncPool.submit(new Task(params,callbackUrl);
    return new RestResult(ResultCode.SUCCESS.getCode(), null) {{
        setMsg("successful!" + prop.getShowMsg());
    }};
}

// 異步任務處理
@Slf4j
public class Task extends Callable<RestResult> {
    private String params;
    private String callbackUrl;
    private final IAlgorithmService algorithmService = SpringUtil.getBean(IAlgorithmServiceImpl.class);
    private final ServiceUtils serviceUtils = SpringUtil.getBean(ServiceUtils.class);
    public ImageTask(String params,String callbackUrl) {
        this.params = params;
        this.callbackUrl = callbackUrl;
    }

    @Override
    public RestResult call() {
        try {
            // 業務處理
            CarDamageResult result = algorithmService.someOperate(this.params);
            // 回調
            return serviceUtils.callback(this.callbackUrl, this.caseNum, ResultCode.SUCCESS.getCode(), result, this.isAsync);
        } catch (ServiceException e) {
            return serviceUtils.callback(this.callbackUrl, this.caseNum, e.getCode(), null, this.isAsync);
        }
    }
}

對於線程池這里就不具體展開講了,僅僅簡單理了下具體的流程:

  1. 收到請求后,參數校驗后傳入線程池排隊。
  2. 返回結果:“請求成功,正在處理”。
  3. 任務排到后由相應的線程處理,處理完后進行接口回調。

上面的例子描述了一個生產速度遠遠大於消費速度的模型,普通面向數據庫開發的企業級應用,由於數據庫的連接池開發的連接數較大,一般不需要這樣通過線程池來處理,而一些 GPU 密集型的應用場景,由於顯存的瓶頸導致消費速度慢時,就需要隊列來作出調整了。

帶優先級的線程池

更復雜的,例如考慮到任務的優先級,還需要對線程池進行重寫,通過 PriorityBlockingQueue 來替換默認的阻塞隊列。直接上代碼。

import lombok.Data;

import java.util.concurrent.Callable;

/**
 * @author Fururur
 * @create 2020-01-14-10:37
 */
@Data
public abstract class PriorityCallable<T> implements Callable<T> {
    private int priority;
}
import lombok.Getter;
import lombok.Setter;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 優先級線程池的實現
 *
 * @author Fururur
 * @create 2019-07-23-10:19
 */
public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {

    private ThreadLocal<Integer> local = ThreadLocal.withInitial(() -> 0);

    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue());
    }

    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                      long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory);
    }

    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                      long keepAliveTime, TimeUnit unit, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), handler);
    }

    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                      long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory,
                                      RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, getWorkQueue(), threadFactory, handler);
    }

    private static PriorityBlockingQueue getWorkQueue() {
        return new PriorityBlockingQueue();
    }

    @Override
    public void execute(Runnable command) {
        int priority = local.get();
        try {
            this.execute(command, priority);
        } finally {
            local.set(0);
        }
    }

    public void execute(Runnable command, int priority) {
        super.execute(new PriorityRunnable(command, priority));
    }

    public <T> Future<T> submit(PriorityCallable<T> task) {
        local.set(task.getPriority());
        return super.submit(task);
    }

    public <T> Future<T> submit(Runnable task, T result, int priority) {
        local.set(priority);
        return super.submit(task, result);
    }

    public Future<?> submit(Runnable task, int priority) {
        local.set(priority);
        return super.submit(task);
    }

    @Getter
    @Setter
    protected static class PriorityRunnable implements Runnable, Comparable<PriorityRunnable> {
        private final static AtomicLong seq = new AtomicLong();
        private final long seqNum;
        private Runnable run;

        private int priority;

        PriorityRunnable(Runnable run, int priority) {
            seqNum = seq.getAndIncrement();
            this.run = run;
            this.priority = priority;
        }

        @Override
        public void run() {
            this.run.run();
        }

        @Override
        public int compareTo(PriorityRunnable other) {
            int res = 0;
            if (this.priority == other.priority) {
                if (other.run != this.run) {
                    // ASC
                    res = (seqNum < other.seqNum ? -1 : 1);
                }
            } else {
                // DESC
                res = this.priority > other.priority ? -1 : 1;
            }
            return res;
        }
    }
}

要點如下:

  1. 替換線程池默認的阻塞隊列為 PriorityBlockingQueue,響應的傳入的線程類需要實現 Comparable<T> 才能進行比較。
  2. PriorityBlockingQueue 的數據結構決定了,優先級相同的任務無法保證 FIFO,需要自己控制順序。
  3. 需要重寫線程池的 execute() 方法。看過線程池源碼的會發現,執行 submit(task) 方法后,都會轉化成 RunnableFuture<T> 再進一步執行,由於傳入的 task 雖然實現了 Comparable<T> 到,但是內部轉換成的 RunnableFuture<T> 並未實現,因此直接 submit 會拋出 Caused by: java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable 這樣一個異常,所以需要重寫 execute() 方法,構造一個 PriorityRunnable 作為中轉。

總結

JVM 線程池是實現異步任務隊列最簡單最原生的一種方式,本文介紹了基本的使用流程和帶有優先隊列需求的用法。這種方法可有滿足到一些簡單的業務場景,但也存在一定的局限性:

  • JVM 線程池是單機的,橫向擴展多個服務下做負載均衡時,就會存在多個線程池了他們是分開工作的,無法很好的統一和管理,不太適合分布式場景。
  • JVM 線程池是基於內存的,一旦服務掛了,會出現任務丟失的情況,可靠性低。
  • 缺少作為任務隊列的 ack 機制,一旦任務失敗不會重新執行,且無法很好地對線程池隊列進行監控。

顯然簡單的 JVM 線程池是無法 handle 到負載的業務場景的,這就需要引入其他中間件了,在接下來的文章中我們會繼續探討。

參考文獻


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM