一,詳解並發包使用及其原理之線程池篇


線程池

一 , java.util.concurrent

 

1,首先,為什么要用線程池包?

 

1,用線程池包和數據庫連接池一樣,為了節省線程的創建和關閉時間

2,擴充了返回類型,實現runable只能通過共享數據和主線程通訊,通過callable 可以接受返回類型,並可以拋出異常在主線程捕獲

3,擴充了些工具類

4,atomic支持計數 

 

 

線程池最常用代碼應用方式,

1,實現Callable

2.  創建線程池

3.  執行並接收future參數

4.  關閉線程池,停止接收新的線程task

代碼如下

 

 

package org.benson.concurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 
 * @author qq277803242
 *
 */
public class Test4ConcurrentPool implements Callable<String> {
	public static int result = 0;
	public final static int LOOP_COUNT = 500000;

	@Override
	public String call() throws Exception {
		System.out.println("run..");
		for (int i = 0; i < LOOP_COUNT; i++)
			result++;
		if(result>4500000)
		throw new Exception("my exception");
		return "result is " + result;
	}

	public static void main(String[] args) throws ExecutionException {
		ExecutorService execu = Executors.newFixedThreadPool(10);
		Future<String> future = null;
		for (int i = 0; i < 10; i++) {
			future = execu.submit(new Test4ConcurrentPool());
			try {
				System.out.println(future.get());
			} catch (Exception e) {
				System.out.println("here can catch Exception,the exception is "+e.getMessage());
				execu.shutdownNow();
				
			}
		}
		execu.shutdown();//reduce accept new task
//		execu.shutdownNow();//reduce accept new task and try to stop exit task
	}
}

 

  

看代碼這段可看到是調用了線程池創建了線程

 

//        ExecutorService execu = Executors.newFixedThreadPool(10);
        ExecutorService execu = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());

 

 

線程池的原理,工作流程如下

1,檢查基本線程是否已滿,如果未滿創建新的線程

2,檢查線程隊列是否已滿,如果未滿創建新的等待線程填入隊列

3,檢查最大線程數是否已滿,如果滿了執行飽和線程策略

 

如此再返回看線程池的參數,很明確

 /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

按參數順序解釋如下:

corePoolSize 在線程池中的基本大小數
maximumPoolSize 線程池所允許的最大線程數目(包括隊列中的線程數和基本線程中的)
keepAliveTime 線程空閑后,線程在線程池的保存時間。 線程池的作用就是為了減少創建和銷毀線程池的時間,所以當運行線程很短,但很多,間隔時間長時,此個值可以設置大點

workQueue 線程池隊列,runnableTaskQueue,阻塞線程,可選如下
runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。 可以選擇以下幾個阻塞隊列。
  • ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
  • LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
  • PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
ThreadFactory,用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字。

RejectedExecutionHandler,飽和處理策略,用於處理線程飽和后的機制,有如下類型
  • AbortPolicy:直接拋出異常。
  • CallerRunsPolicy:只用調用者所在線程來運行任務。
  • DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務。
  • DiscardPolicy:不處理,丟棄掉。


關於shutdownNow會循環線程池所有的線程,然后調用中斷方法,中斷方法拋出異常中斷當前線程。




2,並發包提供的一些工具類,使用方法及其原理

 

java.util.concurrent.Semaphore信號燈,固定執行線程數量,每次執行5個線程,應用例子如下

package org.benson.concurrent;

import java.util.concurrent.Semaphore;

/**
 * 
 * @author qq277803242
 *
 */
public class Test4Semaphore implements Runnable {
	Semaphore semaphore=new Semaphore(5);
	@Override
	public void run() {
		try {
			semaphore.acquire();
		} catch (InterruptedException e) {
			System.out.println(e.getMessage());
		}
		System.out.println(Thread.currentThread().getName()
				+ " start runing 。。。");
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			System.out.println(e.getMessage());
		}
		semaphore.release();
		System.out.println(Thread.currentThread().getName() + " finished");
	}
	public static void main(String[] args) {
		Runnable runable=new Test4Semaphore();
		for(int i=0;i<100;i++){
			new Thread(runable).start();
		}
	}
}

  

 

最近把工作給辭了, 有時間了,想想還是分開篇幅寫,總結下, 呵呵,到時也整理份目錄出來,當作紀念

 

下篇打算講講基於Future 構建緩存

 

二 , java.util.concurrent.atomic

三 , java.util.concurrent.locks


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM