Java 多線程並發編程


導讀

  創作不易,禁止轉載!

並發編程簡介

發展歷程

  早起計算機,從頭到尾執行一個程序,這樣就嚴重造成資源浪費。然后操作系統出現了,計算機能運行多個程序,不同的程序在不同的單獨的進程中運行,一個進程有多個線程提高資源的利用率。ok,如果以上你還不了解的話,我這里有2個腦補鏈接(點我直達1點我直達2)

簡介(百度百科)

  所謂並發編程是指在一台處理器上“同時處理多個任務。並發是在同一實體上的多個事件。多個事件在同一時間間隔發生。

目標(百度百科)

  並發編程的目標是充分利用處理器的每一個核,以達到最高處理性能

串行與並行的區別

  可能這個栗子不是很恰當仁者見仁智者見智智者get到點愚者咬文爵字,啊!你這個栗子不行,不切合實際,巴拉巴拉 .....為啥加起來2小時6分鍾吃飯不要時間麽(洗衣服:把要洗的衣服塞到洗衣機,包括倒洗衣液等等3分鍾;做飯:同理),你大爺的,吃飯的時候不能看電影嘛。好了,請出門右轉,這里不歡迎杠精,走之前把門關上!!!通過這個栗子,可以看出做相同的事情,所花費的時間不同(這就是為啥工作中每個人的工作效率有高低了叭)。

什么時候適合並發編程

  1. 任務阻塞線程,導致之后的代碼不能執行:一邊從文件中讀取,一邊進行大量計算
  2. 任務執行時間過長,可以瓜分為分工明確的子任務:分段下載文件
  3. 任務間斷性執行日志打印
  4. 任務協作執行生產者消費者問題

並發編程中的上下文切換

  以下內容,百度百科原話(點我直達)。

  上下文切換指的是內核(操作系統的核心)在CPU上對進程或者線程進行切換。上下文切換過程中的信息被保存在進程控制塊(PCB-Process Control Block)中。PCB又被稱作切換楨(SwitchFrame)。上下文切換的信息會一直被保存在CPU的內存中,直到被再次使用。

  上下文切換 (context switch) , 其實際含義是任務切換, 或者CPU寄存器切換。當多任務內核決定運行另外的任務時, 它保存正在運行任務的當前狀態, 也就是CPU寄存器中的全部內容。這些內容被保存在任務自己的堆棧中, 入棧工作完成后就把下一個將要運行的任務的當前狀況從該任務的棧中重新裝入CPU寄存器, 並開始下一個任務的運行, 這一過程就是context switch。
每個任務都是整個應用的一部分, 都被賦予一定的優先級, 有自己的一套CPU寄存器和棧空間。

  最重要的一句話:上下文頻繁的切換,會帶來一定的性能開銷。

減少上下文切換開銷方法

  • 無鎖並發編程
    • 多線程競爭鎖時,會引起上下文切換,所以多個線程處理數據時,可以用一些辦法來避免使用鎖,如將數據的ID按照Hash算法取模分段,不同的線程處理不同段的數據
  • CAS
    • Java的Atomic包使用CAS算法來更新數據,而不需要加鎖
  • 控制線程數
    • 避免創建過多不需要的線程,當任務少的時候,但是創建很多線程來處理,這樣會造成大量線程都處於等待狀態
  • 協程(GO語言)
    • 在單線程里實現多任務的調度,並在單線程里維持多個任務間的切換。

知乎上,有個人寫的不錯,推薦給大家:點我直達

死鎖(代碼演示)

  第一次執行,沒有發生死鎖第二次執行時,先讓線程A睡眠50毫秒,程序一直卡着不動,發生死鎖你不讓我,我不讓你,爭奪YB_B的資源

查看死鎖(在重要不過啦)(jdk提供的一些工具)

  1. 命令行工具:jps
  2. 查看堆棧:jstack pid
  3. 可視化工具:jconsole

jps&jstack

分析

jconsole

  控制台輸入:jconsole,然后按照gif,看線程->檢測死鎖

代碼拷貝區

package com.yb.thread;

/**
 * @ClassName:DeadLockDemo
 * @Description:死鎖代碼演示
 * @Author:chenyb
 * @Date:2020/9/7 10:23 下午
 * @Versiion:1.0
 */
public class DeadLockDemo {
    private static final Object YB_A=new Object();
    private static final Object YB_B=new Object();

    public static void main(String[] args) {
        new Thread(()->{
            synchronized (YB_A){
                try {
                    //讓線程睡眠50毫秒
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (YB_B){
                    System.out.println("線程-AAAAAAAAAAAAA");
                }
            }
        }).start();
        new Thread(()->{
            synchronized (YB_B){
                synchronized (YB_A){
                    System.out.println("線程-BBBBBBBBBBBBB");
                }
            }
        }).start();
    }
}

線程基礎

進程與線程的區別

  進程:是系統進行分配管理資源基本單位

  線程:進程的一個執行單元,是進程內調度的實體、是CPU調度和分派的基本單位,是比進程更小的獨立運行的基本單位。線程也被稱為輕量級進程,線程程序執行最小單位

  一個程序至少一個進程,一個進程至少一個線程。

線程的狀態(枚舉)

  • 初始化(NEW)
    • 新建了一個線程對象,但還沒有調用start()方法
  • 運行(RUNNABLE)
    • 處於可運行狀態的線程正在JVM中執行,但他可能正在等待來自操作系統的其他資源
  • 阻塞(BLOCKED)
    • 線程阻塞與synchronized鎖,等待獲取synchronized鎖的狀態
  • 等待(WAITING)
    • Object.wait()、join()、LockSupport.part(),進入該狀態的線程需要等待其他線程做出一些特定動作(通知|中斷)
  • 超時等待(TIME_WAITING)
    • Object.wait(long)、Thread.join()、LockSupport.parkNanos()、LockSupport.parkUntil,該狀態不同於WAITING
  • 終止(TERMINATED)
    • 該線程已經執行完畢

創建線程

方式一

方式二(推薦)

好處

  1. java只能單繼承,但是接口可以繼承多個
  2. 增加程序的健壯性,代碼可以共享

注意事項

方式三(匿名內部類)

方式四(Lambada)

方式五(線程池)

  注意:程序還未關閉!!!!

線程的掛起與恢復

方式一(不推薦)

  不推薦使用,會造成死鎖~

方式二(推薦)

wait():暫停執行,放棄已獲得的鎖,進入等待狀態

notify():隨機喚醒一個在等待鎖的線程

notifyAll():喚醒所有在等待鎖的線程,自行搶占CPU資源

線程的中斷

方式一(不推薦)

  注意:使用stop()可以中斷線程,但是會帶來線程不安全問題(stop被調用,線程立刻停止),理論上numA和numB都是1,結果numB=0;還是沒搞明白的,給你個眼神,自己體會~

方式二(推薦)

方式三(更推薦)

線程優先級

  線程的優先級告訴程序該線程重要程度有多大。如果有大量線程都被阻塞,都在等候運行,程序會盡可能地先運行優先級的那個線程。但是,這並不表示優先級較低的線程不會運行。若線程的優先級較低,只不過表示它被准許的機會小一些而已。

線程的優先級

  1. 最小=1
  2. 最大=10
  3. 默認=5

驗證

  可以看出,打印線程2的幾率比較大,因為線程優先級高。線程優先級,推薦使用(不同平台對線程的優先級支持不同):1、5、10

守護線程(不建議使用)

  任何一個守護線程都是整個程序中所有用戶線程的守護者,只要有活着的用戶線程,守護線程就活着。

線程安全性

synchronized

點我直達

修改普通方法:鎖住對象的實例

修飾靜態方法:鎖住整個類

修改代碼塊:鎖住一個對象synchronized (lock)

volatile

  修飾變量,保證該對象的可見性(多線程共享的變量),不保證原子性

用途

  1. 線程開關
  2. 單例修改對象的實例

lock的使用

lock與synchronized區別

  lock:需要手動設置加鎖和釋放鎖

  synchronized:托管給jvm執行

查看lock的實現類有哪些

多線程下調試

  注意看圖,線程1、2、3的狀態:Runnable|wailting,還沒get到點的話,你真的要反思一下了

讀寫鎖

  讀寫互斥、寫寫互斥、讀讀不互斥

  如果要想debug調試查看效果,可開2個線程,一個自增,一個輸出

package com.yb.thread.lock;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @ClassName:ReentrantReadWriteLockDemo
 * @Description:讀寫鎖
 * @Author:chenyb
 * @Date:2020/9/26 3:14 下午
 * @Versiion:1.0
 */
public class ReentrantReadWriteLockDemo {
    private int num_1 = 0;
    private int num_2 = 0;
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    //讀鎖
    private Lock readLock = lock.readLock();
    //寫鎖
    private Lock writeLock = lock.writeLock();

    public void out() {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "num1====>" + num_1 + ";num_2======>" + num_2);
        } finally {
            readLock.unlock();
        }
    }

    public void inCreate() {
        writeLock.lock();
        try {
            num_1++;
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            num_2++;
        } finally {
            writeLock.unlock();
        }
    }

    public static void main(String[] args) {
        ReentrantReadWriteLockDemo rd = new ReentrantReadWriteLockDemo();
//        for(int x=0;x<3;x++){
//            new Thread(()->{
//                rd.inCreate();
//                rd.out();
//            }).start();
//        }

        //=========讀寫互斥
        new Thread(() -> {
            rd.inCreate();
        }, "寫").start();
        new Thread(() -> {
            rd.out();
        }, "讀").start();

        //========寫寫互斥
        new Thread(() -> {
            rd.inCreate();
        }, "寫1").start();
        new Thread(() -> {
            rd.inCreate();
        }, "寫2").start();

        //==========讀讀不互斥
        new Thread(() -> {
            rd.out();
        }, "讀1").start();
        new Thread(() -> {
            rd.out();
        }, "讀2").start();
    }
}

鎖降級

  寫線程獲取寫鎖后可以獲取讀鎖,然后釋放寫鎖,這樣寫鎖變成了讀鎖,從而實現鎖降級。

  注:鎖降級之后,寫鎖不會直接降級成讀鎖,不會隨着讀鎖的釋放而釋放,因此要顯示地釋放寫鎖

用途

  用於對數據比較敏感,需要在對數據修改之后,獲取到修改后的值,並進行接下來的其他操作。理論上已經會輸入依據:“num=1”,實際多線程下沒輸出,此時可以用鎖降級解決。給你個眼神,自己體會

package com.yb.thread.lock;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @ClassName:LockDegradeDemo
 * @Description:鎖降級demo
 * @Author:chenyb
 * @Date:2020/9/26 10:53 下午
 * @Versiion:1.0
 */
public class LockDegradeDemo {
    private int num = 0;
    //讀寫鎖
    private ReentrantReadWriteLock readWriteLOck = new ReentrantReadWriteLock();
    Lock readLock = readWriteLOck.readLock();
    Lock writeLock = readWriteLOck.writeLock();

    public void doSomething() {
        //寫鎖
        writeLock.lock();
        //讀鎖
        readLock.lock();
        try {
            num++;
        } finally {
            //釋放寫鎖
             writeLock.unlock();
        }
        //模擬其他復雜操作
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            if (num == 1) {
                System.out.println("num=" + num);
            } else {
                System.out.println(num);
            }
        } finally {
            //釋放度鎖
             readLock.unlock();
        }
    }

    public static void main(String[] args) {
        LockDegradeDemo ld = new LockDegradeDemo();
        for (int i = 0; i < 4; i++) {
            new Thread(() -> {
                ld.doSomething();
            }).start();
        }
    }
}

鎖升級?

  注:從圖可以看出,線程卡着,驗證不存在先讀后寫,從而不存在鎖升級這種說法

StampedLock鎖

簡介

  一般應用,都是讀多寫少,ReentrantReadWriteLock,因為讀寫互斥,所以讀時阻塞寫性能提不上去。可能會使寫線程飢餓

特點

  1. 不可重入:一個線程已經持有寫鎖,再去獲取寫鎖的話,就會造成死鎖
  2. 支持鎖升級、降級
  3. 樂觀讀也可以悲觀讀
  4. 使用有限次自旋,增加鎖獲得的幾率,避免上下文切換帶來的開銷,樂觀讀不阻塞寫操作,悲觀讀,阻塞寫

優點

  相比於ReentrantReadWriteLock,吞吐量大幅提升

缺點

  1. api復雜,容易用錯
  2. 實現原理相比於ReentrantReadWriteLock復雜的多

demo

package com.yb.thread.lock;

import java.util.concurrent.locks.StampedLock;

/**
 * @ClassName:StampedLockDemo
 * @Description:官方例子
 * @Author:chenyb
 * @Date:2020/9/26 11:37 下午
 * @Versiion:1.0
 */
public class StampedLockDemo {
    //成員變量
    private double x, y;
    //鎖實例
    private final StampedLock sl = new StampedLock();

    //排它鎖-寫鎖(writeLock)
    void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    //樂觀讀鎖
    double distanceFromOrigin() {
        //嘗試獲取樂觀鎖1
        long stam = sl.tryOptimisticRead();
        //將全部變量拷貝到方法體棧內2
        double currentX = x, currentY = y;
        //檢查在1獲取到讀鎖票據后,鎖有沒被其他寫線程排他性搶占3
        if (!sl.validate(stam)) {
            //如果被搶占則獲取一個共享讀鎖(悲觀獲取)4
            stam = sl.readLock();
            try {
                //將全部變量拷貝到方法體棧內5
                currentX = x;
                currentY = y;
            } finally {
                //釋放共享讀鎖6
                sl.unlockRead(stam);
            }
        }
        //返回計算結果7
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    //使用悲觀鎖獲取讀鎖,並嘗試轉換為寫鎖
    void moveIfAtOrigin(double newX, double newY) {
        //這里可以使用樂觀讀鎖替換1
        long stamp = sl.readLock();
        try {
            //如果當前點遠點則移動2
            while (x == 0.0 && y == 0.0) {
                //嘗試將獲取的讀鎖升級為寫鎖3
                long ws = sl.tryConvertToWriteLock(stamp);
                //升級成功后,則更新票據,並設置坐標值,然后退出循環4
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    //讀鎖升級寫鎖失敗則釋放讀鎖,顯示獲取獨占寫鎖,然后循環重試5
                    sl.unlockRead(stamp);
                    stamp = sl.writeLock();
                }
            }
        } finally {
            //釋放鎖6
            sl.unlock(stamp);
        }
    }
}

生產者消費者模型

Consumer.java

package com.yb.thread.communication;

/**
 * 消費者
 */
public class Consumer implements Runnable {
    private Medium medium;

    public Consumer(Medium medium) {
        this.medium = medium;
    }

    @Override
    public void run() {
        while (true) {
            medium.take();
        }
    }
}

Producer.java

package com.yb.thread.communication;

/**
 * 生產者
 */
public class Producer implements Runnable {
    private Medium medium;

    public Producer(Medium medium) {
        this.medium = medium;
    }

    @Override
    public void run() {
        while (true) {
            medium.put();
        }
    }
}

Medium.java

package com.yb.thread.communication;


/**
 * 中間商
 */
public class Medium {
    //生產個數
    private int num = 0;
    //最多生產數
    private static final int TOTAL = 20;

    /**
     * 接受生產數據
     */
    public synchronized void put() {
        //判斷當前庫存,是否最大庫存容量
        //如果不是,生產完成之后,通知消費者消費
        //如果是,通知生產者進行等待
        if (num < TOTAL) {
            System.out.println("新增庫存--------當前庫存" + ++num);
            //喚醒所有線程
            notifyAll();
        } else {
            try {
                System.out.println("新增庫存-----庫存已滿" + num);
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 獲取消費數據
     */
    public synchronized void take() {
        //判斷當前庫存是否不足
        //如果充足,在消費完成之后,通知生產者進行生產
        //如果不足,通知消費者暫停消費
        if (num > 0) {
            System.out.println("消費庫存-------當前庫存容量" + --num);
            //喚醒所有線程
            notifyAll();
        } else {
            System.out.println("消費庫存--------庫存不足" + num);
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

測試

管道流通信

  以內存為媒介,用於線程之間的數據傳輸

  面向字節:PipedOutputStream、PipedInputStream

  面向字符:PipedReader、PipedWriter

Reader.java

package com.yb.thread.communication.demo;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PipedInputStream;
import java.util.stream.Collectors;

/**
 * @ClassName:Reader
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/27 10:22 下午
 * @Versiion:1.0
 */
public class Reader implements Runnable{
    private PipedInputStream pipedInputStream;
    public Reader(PipedInputStream pipedInputStream){
        this.pipedInputStream=pipedInputStream;
    }
    @Override
    public void run() {
        if (pipedInputStream!=null){
            String collect = new BufferedReader(new InputStreamReader(pipedInputStream)).lines().collect(Collectors.joining("\n"));
            System.out.println(collect);
        }
        //關閉流
        try {
            pipedInputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Main.java

package com.yb.thread.communication.demo;

import java.io.*;

/**
 * @ClassName:Main
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/27 10:22 下午
 * @Versiion:1.0
 */
public class Main {
    public static void main(String[] args) {
        PipedInputStream pipedInputStream = new PipedInputStream();
        PipedOutputStream pipedOutputStream = new PipedOutputStream();
        try {
            pipedOutputStream.connect(pipedInputStream);
        } catch (IOException e) {
            e.printStackTrace();
        }
        new Thread(new Reader(pipedInputStream)).start();
        BufferedReader bufferedReader = null;
        try {
            bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            pipedOutputStream.write(bufferedReader.readLine().getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                pipedOutputStream.close();
                if (bufferedReader!=null){
                    bufferedReader.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }
}

測試

Thread.join

  線程A執行一半,需要數據,這個數據需要線程B去執行修改,B修改完成后,A才繼續操作

演示

ThreadLocal

線程變量,是一個以ThreadLocal對象為鍵、任意對象為值的存儲結構。

1、ThreadLocal.get: 獲取ThreadLocal中當前線程共享變量的值。
2、ThreadLocal.set: 設置ThreadLocal中當前線程共享變量的值。
3、ThreadLocal.remove: 移除ThreadLocal中當前線程共享變量的值。
4、ThreadLocal.initialValue: ThreadLocal沒有被當前線程賦值時或當前線程剛調用remove方法后調用get方法,返回此方法值。

原子類

概念

  對多線程訪問同一個變量,我們需要加鎖,而鎖是比較消耗性能的,JDK1.5之后,新增的原子操作類提供了一種用法簡單、性能高效、線程安全地更新一個變量的方式,這些類同樣位於JUC包下的atomic包下,發展到JDK1.8,該包下共有17個類,囊括了原子更新基本類型、原子更新數組、原子更新屬性、原子更新引用

1.8新增的原子類

  1. DoubleAccumulator
  2. DoubleAdder
  3. LongAccumulator
  4. LongAdder
  5. Striped64

原子更新基本類型

JDK1.8之前有以下幾個

  1. AtomicBoolean
  2. AtomicInteger
  3. AtomicLong
  4. DoubleAccumulator
  5. DoubleAdder
  6. LongAccumulator
  7. LongAdder

大致3類

  1. 元老級的原子更新,方法幾乎一模一樣:AtomicBoolean、AtomicInteger、AtomicLong
  2. 對Double、Long原子更新性能進行優化提升:DoubleAdder、LongAdder
  3. 支持自定義運算:DoubleAccumulator、LongAccumulator

演示

元老級

自定義運算

原子更新數組

JDK1.8之前大概有以下幾個

  1. AtomicIntegerArray
  2. AtomicLongArray
  3. AtomicReferenceArray

原子更新屬性

  1. AtomicIntegerFieldUpdater
  2. AtomicLongFieldUpdater
  3. AtomicStampedReference
  4. AtomicReferenceFieldUpdater

原子更新引用

  1. AtomicReference:用於對引用的原子更新
  2. AtomicMarkableReference:帶版本戳的原子引用類型,版本戳為boolean類型
  3. AtomicStampedReference:帶版本戳的原子引用類型,版本戳為int類型

容器

同步容器

   Vector、HashTable:JDK提供的同步容器類

  Collections.SynchronizedXXX:對相應容器進行包裝

缺點

  在單獨使用里面的方法的時候,可以保證線程安全,但是,復合操作需要額外加鎖來保證線程安全,使用Iterator迭代容器或使用for-each遍歷容器,在迭代過程中修改容器會拋ConcurrentModificationException異常。想要避免出現這個異常,就必須在迭代過程持有容器的鎖。但是若容器較大,則迭代的時間也會較長。那么需要訪問該容器的其他線程將會長時間等待。從而極大降低性能。

  若不希望在迭代期間對容器加鎖,可以使用“克隆”容器的方式。使用線程封閉,由於其他線程不會對容器進行修改,可以避免ConcurrentModificationException。但是在創建副本的時候,存在較大性能開銷。toString、hashCode、equalse、containsAll、removeAll、retainAll等方法都會隱式的Iterate,也即可能拋出ConcurrentModificationException。

package com.yb.thread.container;

import java.util.Iterator;
import java.util.Vector;

/**
 * @ClassName:VectorDemo
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/29 9:35 下午
 * @Versiion:1.0
 */
public class VectorDemo {
    public static void main(String[] args) {
        Vector<String> strings = new Vector<>();
        for (int i = 0; i <1000 ; i++) {
            strings.add("demo"+i);
        }
        //錯誤遍歷
//        strings.forEach(e->{
//            if (e.equals("demo3")){
//                strings.remove(e);
//            }
//            System.out.println(e);
//        });

        //正確迭代---->單線程
//        Iterator<String> iterator = strings.iterator();
//        while (iterator.hasNext()){
//            String next = iterator.next();
//            if (next.equals("demo3")){
//                iterator.remove();
//            }
//            System.out.println(next);
//        }

        //正確迭代--->多線程
        Iterator<String> iterator = strings.iterator();
        for (int i = 0; i < 4; i++) {
            new Thread(()->{
                synchronized (iterator){
                    while (iterator.hasNext()){
                        String next = iterator.next();
                        if (next.equals("demo3")){
                            iterator.remove();
                        }
                    }
                }
            }).start();
        }
    }
}

並發容器

  CopyOnWrite、Concurrent、BlockingQueue:根據具體場景進行設計,盡量避免使用鎖,提高容器的並發訪問性。

  ConcurrentBlockingQueue:基於queue實現的FIFO的隊列。隊列為空,去操作會被阻塞

  ConcurrentLinkedQueue:隊列為空,取得時候就直接返回空

package com.yb.thread.container;

import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * @ClassName:Demo
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/29 9:50 下午
 * @Versiion:1.0
 */
public class Demo {
    public static void main(String[] args) {
        CopyOnWriteArrayList<String> strings=new CopyOnWriteArrayList<>();
        for (int i = 0; i < 1000; i++) {
            strings.add("demo"+i);
        }
        //正常操作--->單線程
//        strings.forEach(e->{
//            if (e.equals("demo2")){
//                strings.remove(e);
//            }
//        });

        //錯誤操作,不支持迭代器移除元素,直接拋異常
//        Iterator<String> iterator = strings.iterator();
//        while (iterator.hasNext()){
//            String next = iterator.next();
//            if (next.equals("demo2")){
//                iterator.remove();
//            }
//        }

        //正常操作--->多線程
        for (int i = 0; i < 4; i++) {
            new Thread(()->{
                strings.forEach(e -> {
                    if (e.equals("demo2")) {
                        strings.remove(e);
                    }
                });
            }).start();
        }
    }
}

LinkedBlockingQueue

  可以作為生產者消費者的中間商(使用put、take)。

package com.yb.thread.container;

import java.util.concurrent.LinkedBlockingDeque;

/**
 * @ClassName:Demo2
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/29 10:05 下午
 * @Versiion:1.0
 */
public class Demo2 {
    public static void main(String[] args) {
        LinkedBlockingDeque<String> strings = new LinkedBlockingDeque<>();
        //添加元素,3種方式
        strings.add("陳彥斌"); //隊列滿的時候,會拋異常
        strings.offer("陳彥斌"); //如果隊列滿了,直接入隊失敗
        try {
            strings.put("陳彥斌"); //隊列滿,進入阻塞狀態
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //從隊列中取元素,3種方式
        String remove = strings.remove(); //會拋出異常
        strings.poll(); //在隊列為空的時候,直接返回null
        try {
            strings.take(); //隊列為空的時候,會進入等待狀態
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

並發工具類

CountDownLatch

  1. await():進入等待狀態
  2. countDown:計算器減一

應用場景

  1. 啟動三個線程計算,需要對結果進行累加

package com.yb.thread.tool;

import java.util.concurrent.CountDownLatch;

/**
 * @ClassName:CountDownLatchDemo
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/29 10:26 下午
 * @Versiion:1.0
 */
public class CountDownLatchDemo {
    public static void main(String[] args) {
        //模擬場景,學校比較,800米,跑完之后,有跨欄
        //需要先將800米跑完,在布置跨欄,要不然跑800米的選手會被累死
        CountDownLatch countDownLatch = new CountDownLatch(8);
        new Thread(()->{
            try {
                countDownLatch.await();
                System.out.println("800米比賽結束,准備清跑道,並進行跨欄比賽");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        for (int i = 0; i < 8; i++) {
            int finalI = i;
            new Thread(()->{
                try {
                    Thread.sleep(finalI *1000L);
                    System.out.println(Thread.currentThread().getName()+",到達終點");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
    }
}

CyclicBarrier

  允許一組線程相互等待達到一個公共的障礙點,之后繼續執行

區別

  1. CountDownLatch一般用於某個線程等待若干個其他線程執行完任務之后,他才執行:不可重復使用
  2. CyclicBarrier一般用於一組線程相互等待至某個狀態,然后這一組線程再同時執行:可重用

package com.yb.thread.tool;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @ClassName:CyclicBarrierDemo
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/29 10:42 下午
 * @Versiion:1.0
 */
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        //模擬場景:學校800米跑步,等到所有選手全部到齊后,一直跑
        CyclicBarrier cyclicBarrier=new CyclicBarrier(8);
        for (int i = 0; i < 8; i++) {
            int finalI = i;
            new Thread(()->{
                try {
                    Thread.sleep(finalI *1000L);
                    System.out.println(Thread.currentThread().getName()+",准備就緒");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("選手已到齊,開始比賽");
            }).start();
        }
    }
}

Semaphore(信號量)

  控制線程並發數量

應用場景

  1. 接口限流

package com.yb.thread.tool;

import java.util.concurrent.Semaphore;

/**
 * @ClassName:SemaphoreDemo
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/29 11:11 下午
 * @Versiion:1.0
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(8);
        for (int i = 0; i < 20; i++) {
            new Thread(() -> {

                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + ",開始執行");
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //釋放
                    semaphore.release();
                }
            }).start();
        }
    }
}

Exchange

  它提供一個同步點,在這個同步點兩個線程可以交換彼此的數據(成對)。

應用場景

  1. 交換數據

package com.yb.thread.tool;

import java.util.concurrent.Exchanger;

/**
 * @ClassName:ExchangerDemo
 * @Description:TODO
 * @Author:chenyb
 * @Date:2020/9/29 11:21 下午
 * @Versiion:1.0
 */
public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> stringExchanger=new Exchanger<>();
        String str1="陳彥斌";
        String str2="ybchen";
        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"--------------初始值:"+str1);
            try {
                String exchange = stringExchanger.exchange(str1);
                System.out.println(Thread.currentThread().getName()+"--------------交換:"+exchange);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"線程A").start();
        new Thread(()->{
            System.out.println(Thread.currentThread().getName()+"--------------初始值:"+str2);
            try {
                String exchange = stringExchanger.exchange(str2);
                System.out.println(Thread.currentThread().getName()+"--------------交換:"+exchange);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"線程B").start();
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM