1.同步關鍵字
Synchronized
2.並發包中鎖
Lock
3.object對象等待通知
ObjectMonitor
wait
notify
4.鎖對應的條件變量
並發包中鎖的條件變量
condition
await
signal
5.並發包中的阻塞隊列
BlockingQueue
6.並發包中的原子操作
Atomic
7.volatile
8.final
9.Thread.join
10.CountDownLatch
相當於線程中的方法join方法,不過功能更多。CountDownLatch能夠使一個線程在等待另外一些線程完成各自工作之后,再繼續執行。使用一個計數器進行實現。
計數器初始值為線程的數量。當每一個線程完成自己任務后,計數器的值就會減一。當計數器的值為0時,表示所有的線程都已經完成了任務,然后在CountDownLatch上等待的線程就可以恢復執行任務。
11.CyclicBarrier
CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,
所有被屏障攔截的線程才會繼續干活。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然后當前線程被阻塞。
12.Semaphore
控制並發線程數的semaphore, Semaphore翻譯成字面意思為 信號量,Semaphore可以控同時訪問的線程個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。
如下所示代碼中雖然有30個線程在執行,但是只允許10個並發執行,Semaphore(int permits) 接受一個整形數字,表示可用的許可證數量,Semaphore(10)表示允許10個線程獲取許可證,也就是並發數為10.
13.Phaser
Phaser類的特點是把多個線程協作執行的任務划分成多個階段(phase),在每個階段上都可以有任意個參與者參與。線程可以隨時注冊並參與到某個階段的執行中來。當一個階段中所有的線程都成功完成之后,Phaser類的對象會自動進入下一個階段,如此循環下去,直到Phaser類的對象中不再包含任何參與者,此時它會自動結束。功能強大,可以替代CountDownLatch和CyclicBarrier。 Phaser的構造器可指定初始的參與者的個數。 (1)register 動態添加參與者 (2)arriveAndAwaitAdvance 完成之后等待其他參與者完成,會阻塞直到Phaser類的對象成功進入下一個階段 (3)arriveAndDeregister 執行完成之后取消自己的注冊,不參與下一個階段的執行
示例代碼如下:

public class PhaserDemo { private final Phaser phaser = new Phaser(1); private final Pattern imageUrlPattern = Pattern.compile("src=['\"]?(.*?(\\.jpg|\\.gif|\\.png))['\"]?[\\s>]+", Pattern.CASE_INSENSITIVE); public void download(URL url, final Path path, Charset charset) throws IOException { if (charset == null) { charset = StandardCharsets.UTF_8; } String content = getContent(url, charset); List<URL> imageUrls = extractImageUrls(content); for (final URL imageUrl : imageUrls) { phaser.register(); new Thread() { public void run() { //等待其他線程創建完成 phaser.arriveAndAwaitAdvance(); //進入圖片下載階段 try { InputStream is = imageUrl.openStream(); Files.copy(is, getSavedPath(path, imageUrl), StandardCopyOption.REPLACE_EXISTING); } catch (IOException e) { e.printStackTrace(); } finally { phaser.arriveAndDeregister(); } } }.start(); } //等待其他下載線程創建完成 phaser.arriveAndAwaitAdvance(); //等待下載階段的下載線程執行完成 phaser.arriveAndAwaitAdvance(); //下載完成之后注銷自己 phaser.arriveAndDeregister(); } private String getContent(URL url, Charset charset) throws IOException { InputStream is = url.openStream(); return IOUtils.toString(new InputStreamReader(is, charset.name())); } private List<URL> extractImageUrls(String content) { List<URL> result = new ArrayList<URL>(); Matcher matcher = imageUrlPattern.matcher(content); while (matcher.find()) { try { result.add(new URL(matcher.group(1))); } catch (MalformedURLException e) { //忽略 } } return result; } private Path getSavedPath(Path parent, URL url) { String urlString = url.toString(); int index = urlString.lastIndexOf("/"); String fileName = urlString.substring(index + 1); return parent.resolve(fileName); } public static void main(String[] args) throws IOException { URL url = new URL("http://www.baidu.com"); PhaserDemo downloader = new PhaserDemo(); downloader.download(url, Paths.get("imgs"), Charset.forName("GB2312")); } }
14.fork-join
fork/join是java7更新的一個新的輕量級任務執行框架,其主要目的是要更好滴利用底層平台上的多核CPU和多處理器來進行並行處理,解決問題時通常采用分治(divide and conquer)算法或map/reduce算法來進行。
fork操作是把一個大問題划分為若干較小的問題,一般是遞歸進行。
join操作是把部分解收集並組織起來,得到最終的完整解,也可能是遞歸進行的。
如果某個子問題由於等待另外一個子問題的完成而無法繼續運行,那么處理該子問題的線程會主動尋找其他尚未運行的子問題來執行,減少了線程等待時間,提高了性能。
ForkJoinTask的子類:RecursiveTask(可返回結果)與RecursiveAction。
ForkJoinPool實現了ExecutorService接口。
示例代碼如下:

public class ForkJoinDemo { private static final int RANGE_LENGTH = 2000; private final ForkJoinPool forkJoinPool = new ForkJoinPool(); private static class MaxValueTask extends RecursiveTask<Long> { private final long[] array; private final int start; private final int end; MaxValueTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } protected Long compute() { long max = Long.MIN_VALUE; if (end - start <= RANGE_LENGTH) { for (int i = start; i < end; i++) { if (array[i] > max) { max = array[i]; } } } else { int mid = (start + end) / 2; MaxValueTask lowTask = new MaxValueTask(array, start, mid); MaxValueTask highTask = new MaxValueTask(array, mid, end); lowTask.fork(); highTask.fork(); max = Math.max(max, lowTask.join()); max = Math.max(max, highTask.join()); } return max; } } public void calculate(long[] array) { MaxValueTask task = new MaxValueTask(array, 0, array.length); Long result = forkJoinPool.invoke(task); System.out.println(result); } public void calculateNormal(long[] array) { long max = Long.MIN_VALUE; for (int i = 0, n = array.length; i < n; i++) { if (array[i] > max) { max = array[i]; } } System.out.println(max); } public static void main(String[] args) { Random random = new Random(); int size = Integer.MAX_VALUE / 256; long[] array = new long[size]; for (int i = 0; i < size; i++) { array[i] = random.nextLong(); } ForkJoinDemo mv = new ForkJoinDemo(); long startTime = System.currentTimeMillis(); mv.calculate(array); long midTime = System.currentTimeMillis(); System.out.println(midTime - startTime); mv.calculateNormal(array); long endTime = System.currentTimeMillis(); System.out.println(endTime - midTime); } }
15.parrelStream
Java8並行流ParallelStream和Stream的區別就是支持並行執行,提高程序運行效率。但是如果使用不當可能會發生線程安全的問題。Demo如下:

public static void concurrentFun() { List<Integer> listOfIntegers = new ArrayList<>(); for (int i = 0; i <100; i++) { listOfIntegers.add(i); } List<Integer> parallelStorage = new ArrayList<>() ; listOfIntegers .parallelStream() .filter(i->i%2==0) .forEach(i->parallelStorage.add(i)); System.out.println(); parallelStorage .stream() .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(); System.out.println("Sleep 5 sec"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } parallelStorage .stream() .forEachOrdered(e -> System.out.print(e + " ")); }