JUC並發包基本使用


一、簡介

  傳統的Java多線程開發中,wait、notify、synchronized等如果不注意使用的話,很容易引起死鎖、臟讀問題。Java1.5 版本開始增加 java.util.concurrent 並發編程包,簡化了多線程開發難度。添加了很多的多線程操作工具類,可根據實際需求去選擇使用。

  JUC 常用工具類:

  Semaphore - 信號量

  ReentrantLock - 可重入鎖。之前有做過簡介使用,詳見 https://www.cnblogs.com/eric-fang/p/8991208.html

  ReadWriteLock - 讀寫鎖

  BlockingQueue - 阻塞隊列。詳見 https://www.cnblogs.com/eric-fang/p/8989860.html

  CountDownLatch - 計數器。在計數器歸零后,允許之前阻塞的若干線程繼續執行

  CyclicBarrier - 柵欄。在某一條件達成之前,所有線程均阻塞等待

  AtomicXXXXXXX - 原子操作類,常見的有:AtomicInteger、AtomicLong、AtomicBoolean。

  TimeUnit - 時間枚舉類,提供一些時間的便捷操作

  Executor、ExecutorService、Future : 之前有做過簡介使用,詳見 https://www.cnblogs.com/eric-fang/p/9004020.html

二、使用

  2.1、信號量Semaphore 

    一般用於限定同時訪問操作的線程數量。例如:有時候可以很好的限制公共資源的使用,例如如果開啟幾十個線程去讀取一些文件,然后讀取到的數據需要入庫的話,由於數據庫的連接資源是稀缺資源,可能遠小於讀取文件的線程數,這時候可以利用信號量去限制每次並發獲取數據庫連接資源的線程數。

    如下示例代碼,雖然同時有10個線程執行,但是只能允許2個線程的並發執行。

package com.cfang.prebo.thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TestSemaphore2 {
    
    private static Semaphore semaphore = new Semaphore(2);
    
    private static ExecutorService executorService = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        for(int i = 0; i < 10; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //申請通行證
                        semaphore.acquire();    
                        // 模擬業務邏輯
                        TimeUnit.SECONDS.sleep(2);
                        log.info("{} 處理完成", Thread.currentThread().getName());
                        //釋放通行證
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        executorService.shutdown();
    }   
}

  2.2、計數器CountDownLatch 

     同步計數器,構造方法傳值,用來限定計數器的次數。

    countDown方法每次調用,計數器值減 1。CountDownLatch會一直阻塞着調用await方法的線程,直到計數器值變為0。

package com.cfang.prebo.thread;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TestCountDownLatch {

    private static CountDownLatch countDownLatch = new CountDownLatch(4);
    
    private static AtomicInteger integerVal = new AtomicInteger();
    
    public static void main(String[] args) throws Exception{
        for(int i = 0; i < 4; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    //業務處理邏輯
                    try {
                        int size = new Random().nextInt(100);
                        integerVal.getAndAdd(size);
                        TimeUnit.SECONDS.sleep(2);
                        log.info("{} 處理完成,{}", Thread.currentThread().getName(), size);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    countDownLatch.countDown();
                }
            }, "thread-" + i).start();
        }
        String threadName = Thread.currentThread().getName();
        log.info("{} thread waiting...", threadName);
        countDownLatch.await();
        log.info("{} doing, value: {}",threadName, integerVal.get());
    }
    
    
}

  2.3、柵欄CyclicBarrier 

  柵欄屏障,構造方法傳值來設定一個閾值。線程調用 await 方法的時候,線程就會被阻塞。當阻塞的線程數達到閾值的時候,所有阻塞線程全部放行。可重置重復使用。

package com.cfang.prebo.thread;

import java.util.Random;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TestCyclicBarrier implements Runnable{

    private CyclicBarrier barrier = new CyclicBarrier(4, this);
    
    private static AtomicInteger integerVal = new AtomicInteger();
    
    public void count() {
        for(int i = 0; i < 4; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    //業務處理邏輯
                    try {
                        int size = new Random().nextInt(100);
                        integerVal.getAndAdd(size);
                        TimeUnit.SECONDS.sleep(2);
                        log.info("{} 處理完成,{}", Thread.currentThread().getName(), size);
                        barrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "thread-" + i).start();
        }
    }
    
    @Override
    public void run() {
        //業務邏輯處理完成后調用
        log.info("{} 統計完成,{}", Thread.currentThread().getName(), integerVal.get());
    }
    
    public static void main(String[] args) {
        TestCyclicBarrier testCyclicBarrier = new TestCyclicBarrier();
        testCyclicBarrier.count();
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM