java中線程同步的幾種方法


1.同步關鍵字

   Synchronized

2.並發包中鎖

   Lock

3.object對象等待通知

   ObjectMonitor

   wait

   notify


4.鎖對應的條件變量

   並發包中鎖的條件變量

   condition

   await

   signal


5.並發包中的阻塞隊列

   BlockingQueue
6.並發包中的原子操作

   Atomic

7.volatile


8.final

 

9.Thread.join


10.CountDownLatch

    

相當於線程中的方法join方法,不過功能更多。CountDownLatch能夠使一個線程在等待另外一些線程完成各自工作之后,再繼續執行。使用一個計數器進行實現。
計數器初始值為線程的數量。當每一個線程完成自己任務后,計數器的值就會減一。當計數器的值為0時,表示所有的線程都已經完成了任務,然后在CountDownLatch上等待的線程就可以恢復執行任務。

 


11.CyclicBarrier

CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,
所有被屏障攔截的線程才會繼續干活。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然后當前線程被阻塞。

 


12.Semaphore

控制並發線程數的semaphore, Semaphore翻譯成字面意思為 信號量,Semaphore可以控同時訪問的線程個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。
如下所示代碼中雖然有30個線程在執行,但是只允許10個並發執行,Semaphore(int permits) 接受一個整形數字,表示可用的許可證數量,Semaphore(10)表示允許10個線程獲取許可證,也就是並發數為10.

 


13.Phaser

Phaser類的特點是把多個線程協作執行的任務划分成多個階段(phase),在每個階段上都可以有任意個參與者參與。線程可以隨時注冊並參與到某個階段的執行中來。當一個階段中所有的線程都成功完成之后,Phaser類的對象會自動進入下一個階段,如此循環下去,直到Phaser類的對象中不再包含任何參與者,此時它會自動結束。功能強大,可以替代CountDownLatch和CyclicBarrier。

Phaser的構造器可指定初始的參與者的個數。

(1)register
動態添加參與者

(2)arriveAndAwaitAdvance
完成之后等待其他參與者完成,會阻塞直到Phaser類的對象成功進入下一個階段

(3)arriveAndDeregister
執行完成之后取消自己的注冊,不參與下一個階段的執行

 

示例代碼如下:

public class PhaserDemo {
 
    private final Phaser phaser = new Phaser(1);
    private final Pattern imageUrlPattern = Pattern.compile("src=['\"]?(.*?(\\.jpg|\\.gif|\\.png))['\"]?[\\s>]+", Pattern.CASE_INSENSITIVE);
    public void download(URL url, final Path path, Charset charset) throws IOException {
        if (charset == null) {
            charset = StandardCharsets.UTF_8;
        }
        String content = getContent(url, charset);
        List<URL> imageUrls = extractImageUrls(content);
        for (final URL imageUrl : imageUrls) {
            phaser.register();
            new Thread() {
                public void run() {
                    //等待其他線程創建完成
                    phaser.arriveAndAwaitAdvance();
                    //進入圖片下載階段
                    try {
                        InputStream is = imageUrl.openStream();
                        Files.copy(is, getSavedPath(path, imageUrl), StandardCopyOption.REPLACE_EXISTING);
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        phaser.arriveAndDeregister();
                    }
                }
            }.start();
        }
        //等待其他下載線程創建完成
        phaser.arriveAndAwaitAdvance();
        //等待下載階段的下載線程執行完成
        phaser.arriveAndAwaitAdvance();
        //下載完成之后注銷自己
        phaser.arriveAndDeregister();
    }
    private String getContent(URL url, Charset charset) throws IOException {
        InputStream is = url.openStream();
        return IOUtils.toString(new InputStreamReader(is, charset.name()));
    }
    private List<URL> extractImageUrls(String content) {
        List<URL> result = new ArrayList<URL>();
        Matcher matcher = imageUrlPattern.matcher(content);
        while (matcher.find()) {
            try {
                result.add(new URL(matcher.group(1)));
            } catch (MalformedURLException e) {
                //忽略
            }
        }
        return result;
    }
    private Path getSavedPath(Path parent, URL url) {
        String urlString = url.toString();
        int index = urlString.lastIndexOf("/");
        String fileName = urlString.substring(index + 1);
        return parent.resolve(fileName);
    }
 
    public static void main(String[] args) throws IOException {
        URL url = new URL("http://www.baidu.com");
        PhaserDemo downloader = new PhaserDemo();
        downloader.download(url, Paths.get("imgs"), Charset.forName("GB2312"));
    }
 
}
View Code

 


14.fork-join

fork/join是java7更新的一個新的輕量級任務執行框架,其主要目的是要更好滴利用底層平台上的多核CPU和多處理器來進行並行處理,解決問題時通常采用分治(divide and conquer)算法或map/reduce算法來進行。

fork操作是把一個大問題划分為若干較小的問題,一般是遞歸進行。

join操作是把部分解收集並組織起來,得到最終的完整解,也可能是遞歸進行的。

如果某個子問題由於等待另外一個子問題的完成而無法繼續運行,那么處理該子問題的線程會主動尋找其他尚未運行的子問題來執行,減少了線程等待時間,提高了性能。

ForkJoinTask的子類:RecursiveTask(可返回結果)與RecursiveAction。

ForkJoinPool實現了ExecutorService接口。

示例代碼如下:

public class ForkJoinDemo {
 
    private static final int RANGE_LENGTH = 2000;
    private final ForkJoinPool forkJoinPool = new ForkJoinPool();
 
    private static class MaxValueTask extends RecursiveTask<Long> {
        private final long[] array;
        private final int start;
        private final int end;
 
        MaxValueTask(long[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }
 
        protected Long compute() {
            long max = Long.MIN_VALUE;
            if (end - start <= RANGE_LENGTH) {
                for (int i = start; i < end; i++) {
                    if (array[i] > max) {
                        max = array[i];
                    }
                }
            } else {
                int mid = (start + end) / 2;
                MaxValueTask lowTask = new MaxValueTask(array, start, mid);
                MaxValueTask highTask = new MaxValueTask(array, mid, end);
                lowTask.fork();
                highTask.fork();
                max = Math.max(max, lowTask.join());
                max = Math.max(max, highTask.join());
            }
            return max;
        }
    }
 
    public void calculate(long[] array) {
        MaxValueTask task = new MaxValueTask(array, 0, array.length);
        Long result = forkJoinPool.invoke(task);
        System.out.println(result);
    }
 
    public void calculateNormal(long[] array) {
        long max = Long.MIN_VALUE;
        for (int i = 0, n = array.length; i < n; i++) {
            if (array[i] > max) {
                max = array[i];
            }
        }
        System.out.println(max);
    }
 
    public static void main(String[] args) {
        Random random = new Random();
        int size = Integer.MAX_VALUE / 256;
        long[] array = new long[size];
        for (int i = 0; i < size; i++) {
            array[i] = random.nextLong();
        }
        ForkJoinDemo mv = new ForkJoinDemo();
        long startTime = System.currentTimeMillis();
        mv.calculate(array);
        long midTime = System.currentTimeMillis();
        System.out.println(midTime - startTime);
        mv.calculateNormal(array);
        long endTime = System.currentTimeMillis();
        System.out.println(endTime - midTime);
    }
 
}
View Code

 


15.parrelStream

 

Java8並行流ParallelStream和Stream的區別就是支持並行執行,提高程序運行效率。但是如果使用不當可能會發生線程安全的問題。Demo如下:
public static void concurrentFun() {
        List<Integer> listOfIntegers =
                new ArrayList<>();
        for (int i = 0; i <100; i++) {
            listOfIntegers.add(i);
        }
        List<Integer> parallelStorage = new ArrayList<>() ;
        listOfIntegers
                .parallelStream()
                .filter(i->i%2==0)
                .forEach(i->parallelStorage.add(i));
        System.out.println();

        parallelStorage
                .stream()
                .forEachOrdered(e -> System.out.print(e + " "));

        System.out.println();
        System.out.println("Sleep 5 sec");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        parallelStorage
                .stream()
                .forEachOrdered(e -> System.out.print(e + " "));
    }
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM