java並發編程實踐——王寶令(極客時間)學習筆記


1、並發

分工:如何高效地拆解任務並分配給線程

同步:線程之間如何協作

互斥:保證同一時刻只允許一個線程訪問共享資源

Fork/Join 框架就是一種分工模式,CountDownLatch 就是一種典型的同步方式,而可重入鎖則是一種互斥手段。

2、可見性、原子性、有序性

(1)可見性:緩存導致

(2)原子性:線程切換

count+=1

 

(3)有序性:編譯優化

3、java內存模型

(1)可見性:緩存導致-----按需禁用緩存

(2)有序性:編譯優化-----按需禁用

volatile int x=0;(該變量的讀寫,不使用cpu緩存,直接使用內存讀取或者寫入)

(3)原子性:同一時刻,只有一個線程執行,互斥。

synchronized

4、死鎖

死鎖發生的條件:

(1)互斥,共享資源x和y只能被一個線程占用

(2)占有且等待,線程 T1 已經取得共享資源 X,在等待共享資源 Y 的時候,不釋放共享資源 X;

破壞占用且等待條件:一次性申請所有資源

(3)不可搶占,其他線程不能強行搶占線程 T1 占有的資源;

破壞不可強占條件

(4)循環等待,線程 T1 等待線程 T2 占有的資源,線程 T2 等待線程 T1 占有的資源,就是循環等待。

 破壞循環等待條件:

 

wait和sleep區別
1:wait釋放資源,sleep不釋放資源
2:wait需要被喚醒,sleep不需要
3:wait需要獲取到監視器,否則拋異常,sleep不需要
4:wait是object頂級父類的方法,sleep則是Thread的方法

5.CountDownLatch和CyclicBarrier:如何讓多線程步調一致?(主線程等待子線程結束)

Thread t1 = new Thread(() -> {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        t1.start();
        Thread t2 = new Thread(() -> {
            try {
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        t2.start();
//實現等待 t1.join();
t2.join(); System.out.println(
"=============");

 線程池

Executor executor = Executors.newFixedThreadPool(2);
        CountDownLatch latch = new CountDownLatch(2);
        executor.execute(()->{
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName());
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        });
        executor.execute(()->{
            try {
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName());
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        latch.await();
        System.out.println("=============");

CountDownLatch 主要用來解決一個線程等待多個線程的場景。(CountDownLatch 的計數器是不能循環利用的,也就是說一旦計數器減到 0,再有線程調用 await(),該線程會直接通過。)

CyclicBarrier   ---------- A線程執行,B線程執行,A、B其中一個線程等到AB執行完成再執行(不是主線程,且是異步的)

參考:https://www.cnblogs.com/dolphin0520/p/3920397.html

6.並發容器

List、Map、Set、Queue

非線程安全:ArrayList、HashMap

 

 

 7.原子類

8.線程池、Executor

ThreadPoolExecutor

線程池實際上是生產者 - 消費者模式

ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  ThreadFactory threadFactory,
  RejectedExecutionHandler handler) 

9.Future

ExecutorService executor 
  = Executors.newFixedThreadPool(1);
// 創建 Result 對象 r
Result r = new Result();
r.setAAA(a);
// 提交任務
Future<Result> future = 
  executor.submit(new Task(r), r);  
Result fr = future.get();
// 下面等式成立
fr === r;
fr.getAAA() === a;
fr.getXXX() === x

class Task implements Runnable{
  Result r;
  // 通過構造函數傳入 result
  Task(Result r){
    this.r = r;
  }
  void run() {
    // 可以操作 result
    a = r.getAAA();
    r.setXXX(x);
  }
}

 

// 創建 FutureTask
FutureTask<Integer> futureTask
  = new FutureTask<>(()-> 1+2);
// 創建線程池
ExecutorService es = 
  Executors.newCachedThreadPool();
// 提交 FutureTask 
es.submit(futureTask);
// 獲取計算結果
Integer result = futureTask.get();

 

// 創建 FutureTask
FutureTask<Integer> futureTask
  = new FutureTask<>(()-> 1+2);
// 創建並啟動線程
Thread T1 = new Thread(futureTask);
T1.start();
// 獲取計算結果
Integer result = futureTask.get();

// 創建任務 T2 的 FutureTask
FutureTask<String> ft2
  = new FutureTask<>(new T2Task());
// 創建任務 T1 的 FutureTask
FutureTask<String> ft1
  = new FutureTask<>(new T1Task(ft2));
// 線程 T1 執行任務 ft1
Thread T1 = new Thread(ft1);
T1.start();
// 線程 T2 執行任務 ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待線程 T1 執行結果
System.out.println(ft1.get());

// T1Task 需要執行的任務:
// 洗水壺、燒開水、泡茶
class T1Task implements Callable<String>{
  FutureTask<String> ft2;
  // T1 任務需要 T2 任務的 FutureTask
  T1Task(FutureTask<String> ft2){
    this.ft2 = ft2;
  }
  @Override
  String call() throws Exception {
    System.out.println("T1: 洗水壺...");
    TimeUnit.SECONDS.sleep(1);
    
    System.out.println("T1: 燒開水...");
    TimeUnit.SECONDS.sleep(15);
    // 獲取 T2 線程的茶葉  
    String tf = ft2.get();
    System.out.println("T1: 拿到茶葉:"+tf);

    System.out.println("T1: 泡茶...");
    return " 上茶:" + tf;
  }
}
// T2Task 需要執行的任務:
// 洗茶壺、洗茶杯、拿茶葉
class T2Task implements Callable<String> {
  @Override
  String call() throws Exception {
    System.out.println("T2: 洗茶壺...");
    TimeUnit.SECONDS.sleep(1);

    System.out.println("T2: 洗茶杯...");
    TimeUnit.SECONDS.sleep(2);

    System.out.println("T2: 拿茶葉...");
    TimeUnit.SECONDS.sleep(1);
    return " 龍井 ";
  }
}
// 一次執行結果:
T1: 洗水壺...
T2: 洗茶壺...
T1: 燒開水...
T2: 洗茶杯...
T2: 拿茶葉...
T1: 拿到茶葉: 龍井
T1: 泡茶...
上茶: 龍井

10.CompletableFuture

// 任務 1:洗水壺 -> 燒開水
CompletableFuture<Void> f1 = 
  CompletableFuture.runAsync(()->{
  System.out.println("T1: 洗水壺...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T1: 燒開水...");
  sleep(15, TimeUnit.SECONDS);
});
// 任務 2:洗茶壺 -> 洗茶杯 -> 拿茶葉
CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
  System.out.println("T2: 洗茶壺...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T2: 洗茶杯...");
  sleep(2, TimeUnit.SECONDS);

  System.out.println("T2: 拿茶葉...");
  sleep(1, TimeUnit.SECONDS);
  return " 龍井 ";
});
// 任務 3:任務 1 和任務 2 完成后執行:泡茶
CompletableFuture<String> f3 = 
  f1.thenCombine(f2, (__, tf)->{
    System.out.println("T1: 拿到茶葉:" + tf);
    System.out.println("T1: 泡茶...");
    return " 上茶:" + tf;
  });
// 等待任務 3 執行結果
System.out.println(f3.join());

void sleep(int t, TimeUnit u) {
  try {
    u.sleep(t);
  }catch(InterruptedException e){}
}
// 一次執行結果:
T1: 洗水壺...
T2: 洗茶壺...
T1: 燒開水...
T2: 洗茶杯...
T2: 拿茶葉...
T1: 拿到茶葉: 龍井
T1: 泡茶...
上茶: 龍井

11.CompletionService

參考: http://blog.csdn.net/lmj623565791/article/details/27250059   https://www.cnblogs.com/hrhguanli/p/3998865.html

普通情況下,我們使用Runnable作為主要的任務表示形式,可是Runnable是一種有非常大局限的抽象,run方法中僅僅能記錄日志,打印,或者把數據匯總入某個容器(一方面內存消耗大,還有一方面須要控制同步,效率非常大的限制),總之不能返回運行的結果;比方同一時候1000個任務去網絡上抓取數據,然后將抓取到的數據進行處理(處理方式不定),我認為最好的方式就是提供回調接口,把處理的方式最為回調傳進去;可是如今我們有了更好的方式實現:CompletionService + Callable

Callable的call方法能夠返回運行的結果;

CompletionService將Executor(線程池)和BlockingQueue(堵塞隊列)結合在一起,同一時候使用Callable作為任務的基本單元,整個過程就是生產者不斷把Callable任務放入堵塞對了,Executor作為消費者不斷把任務取出來運行,並返回結果;

優勢:

a、堵塞隊列防止了內存中排隊等待的任務過多,造成內存溢出(畢竟一般生產者速度比較快,比方爬蟲准備好網址和規則,就去運行了,運行起來(消費者)還是比較慢的)

b、CompletionService能夠實現,哪個任務先運行完畢就返回,而不是按順序返回,這樣能夠極大的提升效率;

package com.zhy.concurrency.completionService;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;

/**
 * 將Executor和BlockingQueue功能融合在一起,能夠將Callable的任務提交給它來運行, 然后使用take()方法獲得已經完畢的結果
 * 
 * @author zhy
 * 
 */
public class CompletionServiceDemo
{

    public static void main(String[] args) throws InterruptedException,
            ExecutionException
    {
        /**
         * 內部維護11個線程的線程池
         */
        ExecutorService exec = Executors.newFixedThreadPool(11);
        /**
         * 容量為10的堵塞隊列
         */
        final BlockingQueue<Future<Integer>> queue = new LinkedBlockingDeque<Future<Integer>>(
                10);
        //實例化CompletionService
        final CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
                exec, queue);

        /**
         * 模擬瞬間產生10個任務,且每一個任務運行時間不一致
         */
        for (int i = 0; i < 10; i++)
        {
            completionService.submit(new Callable<Integer>()
            {
                @Override
                public Integer call() throws Exception
                {
                    int ran = new Random().nextInt(1000);
                    Thread.sleep(ran);
                    System.out.println(Thread.currentThread().getName()
                            + " 歇息了 " + ran);
                    return ran;
                }
            });
        }
        
        /**
         * 馬上輸出結果
         */
        for (int i = 0; i < 10; i++)
        {
            try
            {    
                //誰最先運行完畢,直接返回
                Future<Integer> f = completionService.take();
                System.out.println(f.get());
            } catch (InterruptedException e)
            {
                e.printStackTrace();
            } catch (ExecutionException e)
            {
                e.printStackTrace();
            }
        }

        exec.shutdown();

    }

}
輸出結果:
pool-1-thread-4 歇息了 52
52
pool-1-thread-1 歇息了 59
59
pool-1-thread-10 歇息了 215
215
pool-1-thread-9 歇息了 352
352
pool-1-thread-5 歇息了 389
389
pool-1-thread-3 歇息了 589
589
pool-1-thread-2 歇息了 794
794
pool-1-thread-7 歇息了 805
805
pool-1-thread-6 歇息了 909
909
pool-1-thread-8 歇息了 987
987

2.ExecutorService.invokeAll

ExecutorService的invokeAll方法也能批量運行任務,並批量返回結果,可是呢,有個我認為非常致命的缺點,必須等待全部的任務運行完畢后統一返回,一方面內存持有的時間長;還有一方面響應性也有一定的影響,畢竟大家都喜歡看看刷刷的運行結果輸出,而不是苦苦的等待;

package com.zhy.concurrency.executors;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestInvokeAll
{

    public static void main(String[] args) throws InterruptedException,
            ExecutionException
    {
        ExecutorService exec = Executors.newFixedThreadPool(10);

        List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
        Callable<Integer> task = null;
        for (int i = 0; i < 10; i++)
        {
            task = new Callable<Integer>()
            {
                @Override
                public Integer call() throws Exception
                {
                    int ran = new Random().nextInt(1000);
                    Thread.sleep(ran);
                    System.out.println(Thread.currentThread().getName()+" 歇息了 " + ran );
                    return ran;
                }
            };

            tasks.add(task);
        }
        
        long s = System.currentTimeMillis();
        
        
        List<Future<Integer>> results = exec.invokeAll(tasks);
        
        System.out.println("運行任務消耗了 :" + (System.currentTimeMillis() - s) +"毫秒");
        
        for (int i = 0; i < results.size(); i++)
        {
            try
            {
                System.out.println(results.get(i).get());
            } catch (Exception e)
            {
                e.printStackTrace();
            }
        }

        
        exec.shutdown();

    }

}

運行結果:
pool-1-thread-10 歇息了 1
pool-1-thread-5 歇息了 59
pool-1-thread-6 歇息了 128
pool-1-thread-1 歇息了 146
pool-1-thread-3 歇息了 158
pool-1-thread-7 歇息了 387
pool-1-thread-9 歇息了 486
pool-1-thread-8 歇息了 606
pool-1-thread-4 歇息了 707
pool-1-thread-2 歇息了 817
運行任務消耗了 :819毫秒
146
817
158
707
59
128
387
606
486
1

12.Fork/Join 單機版本的MapReduce

分治任務

static void main(String[] args){
  // 創建分治任務線程池  
  ForkJoinPool fjp = 
    new ForkJoinPool(4);
  // 創建分治任務
  Fibonacci fib = 
    new Fibonacci(30);   
  // 啟動分治任務  
  Integer result = 
    fjp.invoke(fib);
  // 輸出結果  
  System.out.println(result);
}
// 遞歸任務
static class Fibonacci extends 
    RecursiveTask<Integer>{
  final int n;
  Fibonacci(int n){this.n = n;}
  protected Integer compute(){
    if (n <= 1)
      return n;
    Fibonacci f1 = 
      new Fibonacci(n - 1);
    // 創建子任務  
    f1.fork();
    Fibonacci f2 = 
      new Fibonacci(n - 2);
    // 等待子任務結果,並合並結果  
    return f2.compute() + f1.join();
  }
}

 13.ThreadLocal

http://www.iocoder.cn/JUC/sike/ThreadLocal/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM