原創:ThreadPoolExecutor線程池深入解讀(一)----原理+應用


本文檔,適合於對多線程有一定基礎的開發人員。對多線程的一些基礎性的解讀,請參考《java並發編程》的前5章。
對於源代碼的解讀,本人認為可讀可不讀。如果你想成為一位頂級的程序員,那就培養自己底層的邏輯能力,自己寫算法,然后讓別人學習你的源代碼。研究源代碼這件事,更多的是針對於初學者。貢獻源碼的人,也是程序員,只不過是級別不同,或者在理論上,更加高屋建瓴。在現實中,能夠兼顧理論和編程的程序員不多,如果誰想成為一流程序員的話,建議從理論上入手,代碼量不能代表全部。對於多線程,本人仍然認為,理論很重要。

多線程編程,在軟件開發中占有十分重要的地位。本人對線程同步的本質的理解是:把對一個或者多個的共享狀態的復合操作轉變為原子性的操作,同時保證共享狀態在內存中的可見性。抽象起來就是原子性和可見性。

1.多線程並發時,會存在競態條件。常見的競態條件包括先檢查后執行機制的競爭和原子性操作競爭,比如同時對一個整數++操作,這個操作可以分割為三個步驟:讀取、加法操作與寫入(生效)。解決先檢查后執行機制的競態條件的有效手段是采用雙檢索。對方法加鎖,會大大滴降低吞吐量和性能,因此,不建議直接對方法加鎖,常見的做法是,對多個線程同時競爭的變量加鎖,或者采用ReentrantLock底層的CAS算法(free-lock).如果想深入理解ReentrantLock的原理,請查看java.util.concurrent包下的源代碼。
2.任務執行策略與中斷策略和飽和策略:在多線程環境中,當定義好了公共資源類,與執行任務時(比如生產者與消費者任務),接下來就要考慮任務執行策略與中斷策略和飽和策略,以提升系統的吞吐量和性能,同時在運行時,要考慮吞吐量與CPU占有率的折中。在多線程中,最重要的就是以上三種策略的定制。采用默認的,不一定能滿足要求。線程池底層,調用的是ThreadPoolExecutor這個類,我們可以擴展他,實現自己的需求。在這里,先講一下,默認的任務執行策略。(任務執行策略包括:是否為每一個任務開啟一個線程,還是所有任務在一個線程中執行,任務執行的順序,比如FIFO,還是按照優先級等等),所以, 這里涉及到兩個比較重要的東西:一是數量問題,包括線程池的基本容量,最大容量以及BlockingQueue<Runnable> 是采用有界的還是無界的,二是BlockingQueue的數據結構,如果執行順序是FIFO,就采用非優先級的Queue,如果是按優先級,那就使用PriorityLinkedQueue。下面,結合一下ThreadPoolExecutor源代碼講解一下:
在使用時,我們一般會這樣:
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(Runnable);
先從execute方法開始,一層一層剖析:
ThreadPoolExecutor中的幾個重要變量:
 
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
 
The workerCount is the number of workers that have been permitted to start and not permitted to stop. 
ctl是一個重要的變量,主要包裝兩個重要的概念:一是workerCount:effective number of threads,二是runState:   indicating whether running, shutting down etc   
英文解釋:
The main pool control state, ctl, is an atomic integer packing
two conceptual fields
workerCount, indicating the effective number of threads
runState,    indicating whether running, shutting down etc
在以上狀態變量中,RUNNING可以接受新的task,並且可以處理queue中的task,SHUTDOWN不可以接受新的task,但是可以處理queue中的task,其他的全都不可以。還是英文解釋比較好,研究源代碼,最好是看英文原版的,不要看漢語版的:
RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { ---------------------①
            if (addWorker(command, true)) //如果添加失敗,返回false,可能是由於創建線程時遇到意外,比如terminated,重新調用ctl.get()計算wc
                return;
            c = ctl.get();
        } //如果當前執行的線程數量小於corePoolSize,但是添加任務時,遇到了意外,或者,當前執行的線程數量大於corePoolSize,這兩種情況,都會進入②處代碼
        if (isRunning(c) && workQueue.offer(command)) { ------------------②//如果當前線程池中的線程正處於RUNNING狀態,並且阻塞隊列的容量沒有達到上限,重新檢查ctl.get()返回的狀態
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command); //如果此處狀態不是RUNNING,也不是SHUTDOWN,那么,拒絕任務
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false); //由於任務放到了BlockingQueue中,此處,在Worker中,不添加task,而是運行任務時,從queue取出task
        }
        else if (!addWorker(command, false)) -------------------------③//除了以上情況以外,比如BlockingQueue飽和了,線程池容量也飽和了,執行飽和策略,默認為AbortPolicy,拒絕任務
            reject(command);
    }
 
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
         for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
 
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
 
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))// 此處判斷非常重要
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
 
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask); ------------------------------①//把firstTask加到Worker中,並創建一個線程
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);
 
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);------------------------------②//把worker加到Set<Worker>中
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;------------------------------③//添加成功
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();------------------------------④執行任務
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);------------------------------⑤//添加失敗,從Set<Worker>中移除Worker
        }
        return workerStarted;
    }
接下來,看看Woker:
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
 
        Worker(Runnable firstTask) {
            setState(-1);  // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
 
 
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
 
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
 
         public void run() { ----------------①
            runWorker(this);----------------②
         }
 
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0); //重新置為0
            return true;
        }
 
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
 
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
Worker的本質是Runnable,因此在addWorker()中的t.start()中,實際是調用worker的run()方法,看②處的runWorker()方法:
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) { ---------------------①
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task); -------------------------②
                    Throwable thrown = null;
                    try {
                        task.run();-------------------------③
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);-------------------------④
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
最重要的地方,已經做了標識。對於①處,(task = getTask()) != null,這是在execute方法中,當workerCountof(recheck)== 0時,把task放到BlockingQueue中,所以用getTask()取出task。在execute之前和之后,可以做一些事情,自定義擴展,比如實現統計和計時功能。
以上為ThreadPoolExecutor源代碼的關鍵地方的比較粗淺的解讀,下面,來進入應用階段:
Executors.newFixedThreadPool(x)中,默認的,BlockingQueue為無界的LinkedBlockingQueue,使用無界的queue,會因為queue的無限制擴展,而導致資源被耗盡,Executors.newCachedThreadPool()中,線程池的大小沒有限制,隊列采用的是SynchronousQueue,SynchronousQueue本質上並不是一個隊列,而是基於線程間傳遞機制的一種運行策略。當向SynchronousQueue中添加task時,必須保證線程在等待接收task,可以與運行的線程直接交互。如果需要實現線程池的容量和queue的容量都有限制,並且需要自定義執行策略和飽和策略時,可以擴展ThreadPoolExecutor。ThreadPoolExecutor的構造器中結束如下參數:
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
其中有:colrePoolSize,線程池的基本大小, maximumPoolSize,線程池中能夠同時運行的線程數量的上限,keepAliveTime,超過此時間,空閑線程將被回收,阻塞隊列Blockin共Queue,還有RejectedExecutionHandler,任務拒絕處理類。
下面, 自定義線程池,實現計時和統計功能,並且自定義有界隊列以及飽和策略
package httpClient;
 
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
 
/**
 * 自定義線程池,實現計時和統計功能,並且自定義有界隊列以及飽和策略
 * @author TongXueQiang
 * @date 2016/05/19
 */
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
    private final Logger log = Logger.getLogger("MyThreadPoolExecutor");
    private final AtomicLong numTasks = new AtomicLong(1);
    private final AtomicLong totalTime = new AtomicLong();
 
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
 
    }
    /**
     * 任務執行前
     */
    protected void beforeExecute(Thread t,Runnable r){
        super.beforeExecute(t, r);
        log.fine(String.format("Thread %s: start %s",t,r));
        startTime.set((long) (System.nanoTime()/Math.pow(10, 9)));
    }
    /**
     * 任務執行后
     * @param r 任務
     * @param t 執行任務的線程
     */
    protected void afterExecutor(Runnable r,Throwable t){
        try {
            Long endTime = (long) (System.nanoTime() / Math.pow(10,9));
            Long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            log.fine(String.format("Thread %s: end%s,time=%ds", taskTime));
        } finally {
            super.afterExecute(r, t);
        }
    }
 
    protected void terminated () {
        try {
            log.info(String.format("Terminated: avg time=%ds", totalTime.get() / numTasks.get()));
        } finally {
            super.terminated();
        }        
    }
}
 
//自定義簡易爬蟲
package httpClient;
 
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * 網頁抓取
 * @author TongXueQiang
 * @date 2016/05/16
 */
public class UrlHanding {
    private final int THREADS = 10;
    private final ExecutorService producerExecutor = Executors.newSingleThreadExecutor();
    BlockingQueue<Runnable> q = new ArrayBlockingQueue<Runnable>(10);
    private final ExecutorService consumerExecutor = new MyThreadPoolExecutor(10, 10, 1000,TimeUnit.MILLISECONDS, q, new ThreadPoolExecutor.CallerRunsPolicy());//調用者執行的飽和策略
    private final CountDownLatch startLatch = new CountDownLatch(1);
    private final CountDownLatch endLatch = new CountDownLatch(THREADS);
    private static UrlQueue queue;
 
    public void urlHanding(String[] seeds) throws InterruptedException {        
        queue = getUrlQueue();
        System.out.println("處理器數量:"+Runtime.getRuntime().availableProcessors());
        long start = (long) (System.nanoTime() / Math.pow(10, 9));
        producerExecutor.execute(new GetSeedUrlTask(queue,seeds,startLatch));        
        producerExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
        producerExecutor.shutdown();
        startLatch.await();
 
        UrlDataHandingTask []url_handings = new UrlDataHandingTask[THREADS];
        for (int i = 0;i < THREADS;i++) {
            url_handings[i] = new UrlDataHandingTask(startLatch,endLatch,queue);
            consumerExecutor.execute(url_handings[i]);            
        }
        consumerExecutor.shutdown();
        startLatch.countDown();
        doSomething();
        endLatch.await();
 
        long end = (long) (System.nanoTime() / Math.pow(10,9) - start);
        System.out.println("耗時: " + end + "秒");
    }
 
    private void doSomething() {
 
 
    }
 
    private UrlQueue getUrlQueue() {
        if (queue == null) {
            synchronized(UrlQueue.class){
                if (queue == null) {
                    queue = new UrlQueue();
                    return queue;
                }
            }
        }
        return queue;
    }
}
上面,是典型的生產者和消費者線程模式,把ArrayBlockingQueue當做公共資源,這里,要處理好消費者線程無限期阻塞的問題,通過在queue的最后加入“毒丸”對象,當每個線程從queue中取出的對象為“毒丸”對象時,停止迭代。
以下為消費者線程:
package httpClient;
 
import java.util.concurrent.CountDownLatch;
 
public class UrlDataHandingTask implements Runnable {
    private CountDownLatch startLatch;
    private CountDownLatch endLatch;
    private UrlQueue queue;
 
    public UrlDataHandingTask(CountDownLatch latch, CountDownLatch endLatch, UrlQueue queue) {
        this.startLatch = latch;
        this.endLatch = endLatch;
        this.queue = queue;        
    }
 
    /**
     * 下載對應的頁面並抽取出鏈接,放入待處理隊列中
     * 
     * @param url
     * @throws InterruptedException
     */
    public void dataHanding(String url) throws InterruptedException {
        getHrefOfContent(DownPage.getContentFromUrl(url));
        for (String url0 : VisitedUrlQueue.visitedUrlQueue) {
            System.out.println(url0);
        }
    }
 
    @Override
    public void run() {
        try {
            startLatch.await();
        } catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
        }
 
        while (!queue.isEmpty()) {
            try {
                String url = queue.outElem();
                if ("".equals(url.trim())) {//“毒丸”對象為空
                    queue.addElem(url);
                    break;
                }
                dataHanding(url);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        endLatch.countDown();
 
    }
 
    /**
     * 獲取頁面源代碼中的超鏈接
     * 
     * @param content
     * @throws InterruptedException
     */
    public void getHrefOfContent(String content) throws InterruptedException {
        System.out.println("開始");
        String[] contents = content.split("<a href=\"");
        for (int i = 1; i < contents.length; i++) {
            int endHref = contents[i].indexOf("\"");
            String aHref = FunctionUtils.getHrefOfInOut(contents[i].substring(0, endHref));
            if (aHref != null) {
                String href = FunctionUtils.getHrefOfInOut(aHref);
                if (queue.isContains(href) && !VisitedUrlQueue.isContains(href)
                        && href.indexOf("/code/explore") != -1) {
                    // 放入待抓取隊列中
                    queue.addElem(href);
                }
            }
        }
        System.out.println(queue.size() + "--抓取到的連接數");
        System.out.println(VisitedUrlQueue.size() + "--已處理的頁面數");
    }
 
}
生產者線程:
package httpClient;
 
import java.util.concurrent.CountDownLatch;
 
public class GetSeedUrlTask implements Runnable {
    private UrlQueue queue;
    private String[] seeds;
    private CountDownLatch startLatch;
 
    public GetSeedUrlTask(UrlQueue queue, String[] seeds,CountDownLatch startLatch) {
        this.queue = queue;
        this.seeds = seeds;
        this.startLatch = startLatch;
    }
 
    public void addUrl() {
        try {
            for (String url : seeds) {
                queue.addElem(url);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
 
    @Override
    public void run() {
        addUrl();        
        try {
            queue.addElem("");//加入“毒丸”對象
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        startLatch.countDown();
    }
}
未完待續……
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM