Java線程間怎么實現同步


1、Object#wait(), Object#notify()讓兩個線程依次執行

/**
 * 類AlternatePrintDemo.java的實現描述:交替打印
 */
class NumberPrint implements Runnable {
    private int       number;
    public byte       res[];
    public static int count = 5;

    public NumberPrint(int number, byte a[]) {
        this.number = number;
        res = a;
    }

    public void run() {
        synchronized (res) {
            while (count-- > 0) {
                try {
                    res.notify();//喚醒等待res資源的線程,把鎖交給線程(該同步鎖執行完畢自動釋放鎖)
                    System.out.println(" " + number);
                    res.wait();//釋放CPU控制權,釋放res的鎖,本線程阻塞,等待被喚醒。
                    System.out.println("------線程" + Thread.currentThread().getName() + "獲得鎖,wait()后的代碼繼續運行:" + number);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

public class AlternatePrintDemo {
    public static void main(String args[]) {
        final byte a[] = { 0 };//以該對象為共享資源
        new Thread(new NumberPrint(1, a), "1").start();
        new Thread(new NumberPrint(2, a), "2").start();
    }
}

 

2、Condition#signal(), Condition#wait()讓兩個線程依次執行

/**
 * 
 * 類ConditionDemo.java的實現描述:Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成截然不同的對象,以便通過將這些對象與任意 Lock 實現組合使用,
 * 為每個對象提供多個等待 set (wait-set)。其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。
 */
public class ConditionDemo {
    public static void main(String[] args) {
        final Business business = new Business();
        new Thread(new Runnable() {
            @Override
            public void run() {
                threadExecute(business, "sub");
            }
        }).start();
        threadExecute(business, "main");
    }

    public static void threadExecute(Business business, String threadType) {
        for (int i = 0; i < 10; i++) {
            try {
                if ("main".equals(threadType)) {
                    business.main(i);
                } else {
                    business.sub(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Business {
    private boolean   bool      = true;
    private Lock      lock      = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public /* synchronized */ void main(int loop) throws InterruptedException {
        lock.lock();
        try {
            while (bool) {
                condition.await();//this.wait();
            }
            System.out.println("main thread seq  loop of " + loop);
            
            bool = true;
            condition.signal();//this.notify();
        } finally {
            lock.unlock();
        }
    }

    public /* synchronized */ void sub(int loop) throws InterruptedException {
        lock.lock();
        try {
            while (!bool) {
                condition.await();//this.wait();
            }

            System.out.println("sub thread seq loop of " + loop);
   
            bool = false;
            condition.signal();//this.notify();
        } finally {
            lock.unlock();
        }
    }
}

 

Lock.Condition同理

import java.util.concurrent.locks.*;

class BoundedBuffer {
    final Lock      lock     = new ReentrantLock();                          //鎖對象
    final Condition notFull  = lock.newCondition();                          //寫線程條件 
    final Condition notEmpty = lock.newCondition();                          //讀線程條件 

    final Object[]  items    = new Object[100];                              //緩存隊列
    int             putptr/* 寫索引 */, takeptr/* 讀索引 */, count/* 隊列中存在的數據個數 */;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)//如果隊列滿了 
                notFull.await();//阻塞寫線程
            items[putptr] = x;//賦值 
            if (++putptr == items.length)
                putptr = 0;//如果寫索引寫到隊列的最后一個位置了,那么置為0
            ++count;//個數++
            notEmpty.signal();//喚醒讀線程
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)//如果隊列為空
                notEmpty.await();//阻塞讀線程
            Object x = items[takeptr];//取值 
            if (++takeptr == items.length)
                takeptr = 0;//如果讀索引讀到隊列的最后一個位置了,那么置為0
            --count;//個數--
            notFull.signal();//喚醒寫線程
            return x;
        } finally {
            lock.unlock();
        }
    }
}

 

3、兩個線程使用Object#wait(), Object#notify()實現生產消費者模式。

/**
 * 
 * 類ProducerConsumerDemo.java的實現描述:生產消費者模式
 */
public class ProducerConsumerDemo {

    public static void main(String args[]) {

        final Queue<Integer> sharedQ = new LinkedList<>();

        Thread producer = new Producer(sharedQ);
        Thread consumer = new Consumer(sharedQ);

        producer.start();
        consumer.start();

    }
}

class Producer extends Thread {
    private static final int MAX_COUNT = 10;
    private Queue<Integer>   sharedQ;

    public Producer(Queue<Integer> sharedQ) {
        super("Producer");
        this.sharedQ = sharedQ;
    }

    @Override
    public void run() {
        for (int i = 0; i < MAX_COUNT; i++) {
            synchronized (sharedQ) {
                //waiting condition - wait until Queue is not empty
                while (sharedQ.size() >= 1) {
                    try {
                        System.out.println("Queue is full, waiting");
                        sharedQ.wait();
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
                System.out.println("producing : " + i);
                sharedQ.add(i);
                sharedQ.notify();
            }
        }
    }
}

class Consumer extends Thread {
    private Queue<Integer> sharedQ;

    public Consumer(Queue<Integer> sharedQ) {
        super("Consumer");
        this.sharedQ = sharedQ;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (sharedQ) {
                //waiting condition - wait until Queue is not empty
                while (sharedQ.size() == 0) {
                    try {
                        System.out.println("Queue is empty, waiting");
                        sharedQ.wait();
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
                int number = (int) sharedQ.poll();
                System.out.println("consuming : " + number);
                sharedQ.notify();

                //termination condition
                if (number == 3) {
                    break;
                }
            }
        }
    }
}

 

4、CountDownLatch實現類似計數器的功能。

/**
 * 
 * 類CountDownLatchDemo.java的實現描述:CountDownLatch類位於java.util.concurrent包下,利用它可以實現類似計數器的功能.
 * 調用await()方法的線程會被掛起,它會等待直到count值為0才繼續執行
 */
public class CountDownLatchDemo {
    public static void main(String[] args) {
        final CountDownLatch latch = new CountDownLatch(2);

        new Thread() {
            public void run() {
                try {
                    System.out.println("子線程" + Thread.currentThread().getName() + "正在執行");
                    Thread.sleep(3000);
                    System.out.println("子線程" + Thread.currentThread().getName() + "執行完畢");
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
        }.start();

        new Thread() {
            public void run() {
                try {
                    System.out.println("子線程" + Thread.currentThread().getName() + "正在執行");
                    Thread.sleep(3000);
                    System.out.println("子線程" + Thread.currentThread().getName() + "執行完畢");
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
        }.start();

        try {
            System.out.println("等待2個子線程執行完畢...");
            latch.await();
            System.out.println("2個子線程已經執行完畢");
            System.out.println("繼續執行主線程");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

5、 CyclicBarrier(回環柵欄)可以實現讓一組線程等待至某個狀態之后再全部同時執行。

/**
 * 類CyclicBarrierDemo.java的實現描述:字面意思回環柵欄,通過它可以實現讓一組線程等待至某個狀態之后再全部同時執行。
 * 叫做回環是因為當所有等待線程都被釋放以后,CyclicBarrier可以被重用。我們暫且把這個狀態就叫做barrier,當調用await()方法之后,
 * 線程就處於barrier了。
 */
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int N = 4;
        //所有線程寫入操作完之后,進行額外的其他操作可以為CyclicBarrier提供Runnable參數
        CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() {
            @Override
            public void run() {
                System.out.println("當前線程" + Thread.currentThread().getName());
            }
        });
        for (int i = 0; i < N; i++) {
            if (i < N - 1) {
                new Writer(barrier).start();
            } else {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                new Writer(barrier).start();
            }
        }

        System.out.println("CyclicBarrier重用");

        for (int i = 0; i < N; i++) {
            new Writer(barrier).start();
        }
    }

    static class Writer extends Thread {
        private CyclicBarrier cyclicBarrier;

        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("線程" + Thread.currentThread().getName() + "正在寫入數據...");
            try {
                Thread.sleep(5000); //以睡眠來模擬寫入數據操作
                System.out.println("線程" + Thread.currentThread().getName() + "寫入數據完畢,等待其他線程寫入完畢");
                try {
                    cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("所有線程寫入完畢,繼續處理其他任務...");
        }
    }
}

 

6、Semaphore用來控制同時訪問某一資源的操作數量,或控制同時執行某個指定操作的數量。

/**
 * 類SemaphoreDemo.java的實現描述:Semaphore用來控制同時訪問某一資源的操作數量,或控制同時執行某個指定操作的數量。
 * 主要通過控制一組虛擬的“許可”,當需要執行操作時首先申請獲取許可,如果還有剩余的許可 並且獲取成功,就執行操作;如果剩余許可為0,就阻塞當前線程;
 * 操作執行完成后釋放許可,排隊的阻塞線程可以被喚醒重新獲取許可繼續執行。這里提到排隊,其實就是利用AQS的隊列進行排隊。
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        // 線程池
        ExecutorService exec = Executors.newCachedThreadPool();

        // 只能5個線程同時訪問
        final Semaphore semp = new Semaphore(5);

        // 模擬20個客戶端訪問
        for (int index = 0; index < 20; index++) {
            final int NO = index;
            Runnable run = new Runnable() {
                public void run() {
                    try {
                        // 獲取許可
                        semp.acquire();
                        System.out.println("Accessing: " + NO);
                        Thread.sleep((long) (Math.random() * 10000));
                        // 訪問完后,釋放
                        semp.release();
                    } catch (InterruptedException e) {
                    }
                }
            };
            exec.execute(run);
        }

        // 退出線程池
        exec.shutdown();
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM