Java實現生產者消費者問題與讀者寫者問題


摘要: Java實現生產者消費者問題與讀者寫者問題

1、生產者消費者問題

    生產者消費者問題是研究多線程程序時繞不開的經典問題之一,它描述是有一塊緩沖區作為倉庫,生產者可以將產品放入倉庫,消費者則可以從倉庫中取走產品。解決生產者/消費者問題的方法可分為兩類:(1)采用某種機制保護生產者和消費者之間的同步;(2)在生產者和消費者之間建立一個管道。第一種方式有較高的效率,並且易於實現,代碼的可控制性較好,屬於常用的模式。第二種管道緩沖區不易控制,被傳輸數據對象不易於封裝等,實用性不強。

    同步問題核心在於:如何保證同一資源被多個線程並發訪問時的完整性。常用的同步方法是采用信號或加鎖機制,保證資源在任意時刻至多被一個線程訪問。Java語言在多線程編程上實現了完全對象化,提供了對同步機制的良好支持。在Java中一共有五種方法支持同步,其中前四個是同步方法,一個是管道方法。

  • wait() / notify()方法
  • await() / signal()方法
  • BlockingQueue阻塞隊列方法
  • Semaphore方法
  • PipedInputStream / PipedOutputStream

1.1 wait() / notify()方法

wait() / nofity()方法是基類Object的兩個方法,也就意味着所有Java類都會擁有這兩個方法,這樣,我們就可以為任何對象實現同步機制。

wait()方法:當緩沖區已滿/空時,生產者/消費者線程停止自己的執行,放棄鎖,使自己處於等等狀態,讓其他線程執行。

notify()方法:當生產者/消費者向緩沖區放入/取出一個產品時,向其他等待的線程發出可執行的通知,同時放棄鎖,使自己處於等待狀態。

各起了4個生產者,4個消費者 

 

package test;

public class Hosee {
    private static Integer count = 0;
    private final Integer  FULL  = 10;
    private static String  LOCK  = "LOCK";

    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                synchronized (LOCK) {
                    while (count == FULL) {
                        try {
                            LOCK.wait();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    count++;
                    System.out.println(Thread.currentThread().getName() + "生產者生產,目前總共有" + count);
                    LOCK.notifyAll();
                }
            }
        }
    }

    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                synchronized (LOCK) {
                    while (count == 0) {
                        try {
                            LOCK.wait();
                        } catch (Exception e) {
                        }
                    }
                    count--;
                    System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有" + count);
                    LOCK.notifyAll();
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Hosee hosee = new Hosee();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
    }
}

 

(需要注意的是,用什么加鎖就用什么notify和wait,實例中使用的是LOCK)

部分打印結果:

由於生產者和消費者說明一致,所以最多都是在2左右,當減少一個消費者時,則會加到10。

1.2 await() / signal()方法

首先,我們先來看看await()/signal()與wait()/notify()的區別:

  1. wait()和notify()必須在synchronized的代碼塊中使用 因為只有在獲取當前對象的鎖時才能進行這兩個操作 否則會報異常 而await()和signal()一般與Lock()配合使用。
  2. wait是Object的方法,而await只有部分類有,如Condition。
  3. await()/signal()和新引入的鎖定機制Lock直接掛鈎,具有更大的靈活性。

那么為什么有了synchronized還要提出Lock呢?

1.2.1 對synchronized的改進

    synchronized並不完美,它有一些功能性的限制 —— 它無法中斷一個正在等候獲得鎖的線程,也無法通過投票得到鎖,如果不想等下去,也就沒法得到鎖。同步還要求鎖的釋放只能在與獲得鎖所在的堆棧幀相同的堆棧幀中進行,多數情況下,這沒問題(而且與異常處理交互得很好),但是,確實存在一些非塊結構的鎖定更合適的情況。

1.2.2 ReentrantLock 類

java.util.concurrent.lock 中的 Lock 框架是鎖定的一個抽象,它允許把鎖定的實現作為 Java 類,而不是作為語言的特性來實現(更加面向對象)。這就為 Lock 的多種實現留下了空間,各種實現可能有不同的調度算法、性能特性或者鎖定語義。 ReentrantLock 類實現了 Lock ,它擁有與 synchronized 相同的並發性和內存語義,但是添加了類似鎖投票、定時鎖等候和可中斷鎖等候的一些特性。此外,它還提供了在激烈爭用情況下更佳的性能。(換句話說,當許多線程都想訪問共享資源時,JVM 可以花更少的時候來調度線程,把更多時間用在執行線程上。)

reentrant 鎖意味着什么呢?簡單來說,它有一個與鎖相關的獲取計數器,如果擁有鎖的某個線程再次得到鎖,那么獲取計數器就加1,然后鎖需要被釋放兩次才能獲得真正釋放(重入鎖)。這模仿了 synchronized 的語義;如果線程進入由線程已經擁有的監控器保護的 synchronized 塊,就允許線程繼續進行,當線程退出第二個(或者后續) synchronized 塊的時候,不釋放鎖,只有線程退出它進入的監控器保護的第一個synchronized 塊時,才釋放鎖。

簡單解釋下重入鎖:

 

public class Child extends Father implements Runnable{
    final static Child child = new Child();//為了保證鎖唯一
    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            new Thread(child).start();
        }
    }

    public synchronized void doSomething() {
        System.out.println("1child.doSomething()");
        doAnotherThing(); // 調用自己類中其他的synchronized方法
    }

    private synchronized void doAnotherThing() {
        super.doSomething(); // 調用父類的synchronized方法
        System.out.println("3child.doAnotherThing()");
    }

    @Override
    public void run() {
        child.doSomething();
    }
}
class Father {
    public synchronized void doSomething() {
        System.out.println("2father.doSomething()");
    }
}

 

上述代碼的鎖都是child對象,當執行child.doSomething時,該線程獲得child對象的鎖,在doSomething方法內執行doAnotherThing時再次請求child對象的鎖,因為synchronized是重入鎖,所以可以得到該鎖,繼續在doAnotherThing里執行父類的doSomething方法時第三次請求child對象的鎖,同理可得到,如果不是重入鎖的話,那這后面這兩次請求鎖將會被一直阻塞,從而導致死鎖。

在查看下面代碼示例時,可以看到 Lock 和 synchronized 有一點明顯的區別 —— lock 必須在 finally 塊中釋放。否則,如果受保護的代碼將拋出異常,鎖就有可能永遠得不到釋放!這一點區別看起來可能沒什么,但是實際上,它極為重要。忘記在 finally 塊中釋放鎖,可能會在程序中留下一個定時炸彈,當有一天炸彈爆炸時,您要花費很大力氣才有找到源頭在哪。而使用同步,JVM 將確保鎖會獲得自動釋放。

Lock lock = new ReentrantLock();
lock.lock();
try { 
  // update object state
}
finally {
  lock.unlock(); 
}

 

除此之外,與目前的 synchronized 實現相比,爭用下的 ReentrantLock 實現更具可伸縮性。(在未來的 JVM 版本中,synchronized 的爭用性能很有可能會獲得提高。)這意味着當許多線程都在爭用同一個鎖時,使用 ReentrantLock 的總體開支通常要比 synchronized 少得多。

 

1.2.3 什么時候選擇用 ReentrantLock 代替 synchronized

在 Java1.5 中,synchronized 是性能低效的。因為這是一個重量級操作,需要調用操作接口,導致有可能加鎖消耗的系統時間比加鎖以外的操作還多。相比之下使用 Java 提供的 Lock 對象,性能更高一些。但是到了 Java1.6,發生了變化。synchronized 在語義上很清晰,可以進行很多優化,有適應自旋,鎖消除,鎖粗化,輕量級鎖,偏向鎖等等。導致在 Java1.6 上 synchronized 的性能並不比 Lock 差。官方也表示,他們也更支持 synchronized,在未來的版本中還有優化余地。

所以在確實需要一些 synchronized 所沒有的特性的時候,比如時間鎖等候、可中斷鎖等候、無塊結構鎖、多個條件變量或者鎖投票使用ReentrantLock。ReentrantLock 還具有可伸縮性的好處,應當在高度爭用的情況下使用它,但是請記住,大多數 synchronized 塊幾乎從來沒有出現過爭用,所以可以把高度爭用放在一邊。我建議用 synchronized 開發,直到確實證明 synchronized 不合適,而不要僅僅是假設如果使用 ReentrantLock “性能會更好”。請記住,這些是供高級用戶使用的高級工具。(而且,真正的高級用戶喜歡選擇能夠找到的最簡單工具,直到他們認為簡單的工具不適用為止。)。一如既往,首先要把事情做好,然后再考慮是不是有必要做得更快。

1.2.4 接下來我們使用ReentrantLock來實現生產者消費者問題

package test;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Hosee {
    private static Integer count = 0;
    private final Integer FULL = 10;
    final Lock lock = new ReentrantLock();
    final Condition NotFull = lock.newCondition();
    final Condition NotEmpty = lock.newCondition();

    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                lock.lock();
                try {
                    while (count == FULL) {
                        try {
                            NotFull.await();
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                    count++;
                    System.out.println(Thread.currentThread().getName()
                            + "生產者生產,目前總共有" + count);
                    NotEmpty.signal();
                } finally {
                    lock.unlock();
                }

            }
        }
    }

    class Consumer implements Runnable {

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                lock.lock();
                try {
                    while (count == 0) {
                        try {
                            NotEmpty.await();
                        } catch (Exception e) {
                            // TODO: handle exception
                            e.printStackTrace();
                        }
                    }
                    count--;
                    System.out.println(Thread.currentThread().getName()
                            + "消費者消費,目前總共有" + count);
                    NotFull.signal();
                } finally {
                    lock.unlock();
                }

            }

        }

    }

    public static void main(String[] args) throws Exception {
        Hosee hosee = new Hosee();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();

        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
    }

}

 

運行結果與第一個類似。上述代碼用了兩個Condition,其實用一個也是可以的,只不過要signalall()。

1.3 BlockingQueue阻塞隊列方法

    BlockingQueue是JDK5.0的新增內容,它是一個已經在內部實現了同步的隊列,實現方式采用的是我們第2種await() / signal()方法。它可以在生成對象時指定容量大小。它用於阻塞操作的是put()和take()方法。

put()方法:類似於我們上面的生產者線程,容量達到最大時,自動阻塞。

take()方法:類似於我們上面的消費者線程,容量為0時,自動阻塞。

package test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Hosee {
    private static Integer count = 0;
    final BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(10);
    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                try {
                    bq.put(1);
                    count++;
                    System.out.println(Thread.currentThread().getName()
                            + "生產者生產,目前總共有" + count);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer implements Runnable {

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                try {
                    bq.take();
                    count--;
                    System.out.println(Thread.currentThread().getName()
                            + "消費者消費,目前總共有" + count);
                } catch (Exception e) {
                    // TODO: handle exception
                    e.printStackTrace();
                }
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Hosee hosee = new Hosee();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();

        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
    }

}

 

 

其實這個BlockingQueue比較難用代碼來演示,因為put()與take()方法無法與輸出語句保證同步,當然你可以自己去實現 BlockingQueue(BlockingQueue是用await()/signal() 實現的)。所以在輸出結果上你會發現不匹配。

例如:當緩沖區已滿,生產者在put()操作時,put()內部調用了await()方法,放棄了線程的執行,然后消費者線程執行,調用take()方法,take()內部調用了signal()方法,通知生產者線程可以執行,致使在消費者的println()還沒運行的情況下生產者的println()先被執行,所以有了輸出不匹配的情況。

對於BlockingQueue大家可以放心使用,這可不是它的問題,只是在它和別的對象之間的同步有問題。

1.4 Semaphore方法

Semaphore 信號量,就是一個允許實現設置好的令牌。也許有1個,也許有10個或更多。  
誰拿到令牌(acquire)就可以去執行了,如果沒有令牌則需要等待。  
執行完畢,一定要歸還(release)令牌,否則令牌會被很快用光,別的線程就無法獲得令牌而執行下去了。
package test;

import java.util.concurrent.Semaphore;

public class Hosee
{
    int count = 0;
    final Semaphore notFull = new Semaphore(10);
    final Semaphore notEmpty = new Semaphore(0);
    final Semaphore mutex = new Semaphore(1);

    class Producer implements Runnable
    {
        @Override
        public void run()
        {
            for (int i = 0; i < 10; i++)
            {
                try
                {
                    Thread.sleep(3000);
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }
                try
                {
                    notFull.acquire();//順序不能顛倒,否則會造成死鎖。
                    mutex.acquire();
                    count++;
                    System.out.println(Thread.currentThread().getName()
                            + "生產者生產,目前總共有" + count);
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }
                finally
                {
                    mutex.release();
                    notEmpty.release();
                }

            }
        }
    }

    class Consumer implements Runnable
    {

        @Override
        public void run()
        {
            for (int i = 0; i < 10; i++)
            {
                try
                {
                    Thread.sleep(3000);
                }
                catch (InterruptedException e1)
                {
                    e1.printStackTrace();
                }
                try
                {
                    notEmpty.acquire();//順序不能顛倒,否則會造成死鎖。
                    mutex.acquire();
                    count--;
                    System.out.println(Thread.currentThread().getName()
                            + "消費者消費,目前總共有" + count);
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }
                finally
                {
                    mutex.release();
                    notFull.release();
                }

            }

        }

    }

    public static void main(String[] args) throws Exception
    {
        Hosee hosee = new Hosee();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();

        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
    }

}

 

注意notFull.acquire()與mutex.acquire()的位置不能互換,如果先得到互斥鎖再發生等待,會造成死鎖。

1.5 PipedInputStream / PipedOutputStream

這個類位於java.io包中,是解決同步問題的最簡單的辦法,一個線程將數據寫入管道,另一個線程從管道讀取數據,這樣便構成了一種生產者/消費者的緩沖區編程模式。PipedInputStream/PipedOutputStream只能用於多線程模式,用於單線程下可能會引發死鎖。

 

package test;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class Hosee {
    final PipedInputStream pis = new PipedInputStream();
    final PipedOutputStream pos = new PipedOutputStream();
    {
        try {
            pis.connect(pos);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    class Producer implements Runnable {
        @Override
        public void run() {
            try{
                while(true){
                    int b = (int) (Math.random() * 255);
                    System.out.println("Producer: a byte, the value is " + b);
                    pos.write(b);
                    pos.flush();
                }
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                try{
                    pos.close();
                    pis.close();
                }catch(IOException e){
                    System.out.println(e);
                }
            }
        }
    }

    class Consumer implements Runnable {

        @Override
        public void run() {
            try{
                while(true){
                    int b = pis.read();
                    System.out.println("Consumer: a byte, the value is " + String.valueOf(b));
                }
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                try{
                    pos.close();
                    pis.close();
                }catch(IOException e){
                    System.out.println(e);
                }
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Hosee hosee = new Hosee();
        new Thread(hosee.new Producer()).start();
        new Thread(hosee.new Consumer()).start();
    }

}

 

與阻塞隊列一樣,由於read()/write()方法與輸出方法不一定同步,輸出結果方面會發生不匹配現象,為了使結果更加明顯,這里只有1個消費者和1個生產者。

2、讀者寫者問題

讀者—寫者問題(Readers-Writers problem)也是一個經典的並發程序設計問題,是經常出現的一種同步問題。計算機系統中的數據(文件、記錄)常被多個進程共享,但其中某些進程可能只要求讀數據(稱為讀者Reader);另一些進程則要求修改數據(稱為寫者Writer)。就共享數據而言,Reader和Writer是兩組並發進程共享一組數據區,要求:

(1)允許多個讀者同時執行讀操作;

(2)不允許讀者、寫者同時操作;

(3)不允許多個寫者同時操作。

Reader和Writer的同步問題分為讀者優先、弱寫者優先(公平競爭)和強寫者優先三種情況,它們的處理方式不同。

首先我們都只考慮公平競爭的情況下,看看Java有哪些方法可以實現讀者寫者問題

2.1 讀寫鎖

ReentrantReadWriteLock會使用兩把鎖來解決問題,一個讀鎖,一個寫鎖
線程進入讀鎖的前提條件:
    沒有其他線程的寫鎖,
    沒有寫請求或者有寫請求,但調用線程和持有鎖的線程是同一個
線程進入寫鎖的前提條件:
    沒有其他線程的讀鎖
    沒有其他線程的寫鎖

到ReentrantReadWriteLock,首先要做的是與ReentrantLock划清界限。它和后者都是單獨的實現,彼此之間沒有繼承或實現的關系。然后就是總結這個鎖機制的特性了: 

  1. 重入(在上文ReentrantLock處已經介紹了)方面其內部的WriteLock可以獲取ReadLock,但是反過來ReadLock想要獲得WriteLock則永遠都不要想。 
  2. WriteLock可以降級為ReadLock,順序是:先獲得WriteLock再獲得ReadLock,然后釋放WriteLock,這時候線程將保持Readlock的持有。反過來ReadLock想要升級為WriteLock則不可能,為什么?參看(1),呵呵. 
  3. ReadLock可以被多個線程持有並且在作用時排斥任何的WriteLock,而WriteLock則是完全的互斥。這一特性最為重要,因為對於高讀取頻率而相對較低寫入的數據結構,使用此類鎖同步機制則可以提高並發量。 
  4. 不管是ReadLock還是WriteLock都支持Interrupt,語義與ReentrantLock一致。 
  5. WriteLock支持Condition並且與ReentrantLock語義一致,而ReadLock則不能使用Condition,否則拋出UnsupportedOperationException異常。 

看下ReentrantReadWriteLock這個類的兩個構造函數

public ReentrantReadWriteLock() {
        this(false);
    }

    /**
     * Creates a new {@code ReentrantReadWriteLock} with
     * the given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = (fair)? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

 

fair這個參數表示是否是創建一個公平的讀寫鎖,還是非公平的讀寫鎖。也就是搶占式還是非搶占式。

公平和非公平:公平表示獲取的鎖的順序是按照線程加鎖的順序來分配獲取到鎖的線程時最先加鎖的線程,是按照FIFO的順序來分配鎖的;非公平表示獲取鎖的順序是無需的,后來加鎖的線程可能先獲得鎖,這種情況就導致某些線程可能一直沒獲取到鎖。

公平鎖為啥會影響性能,從code上來看看公平鎖僅僅是多了一項檢查是否在隊首會影響性能,如不是,那么又是在什么地方影響的?假如是闖入的線程,會排在隊尾並睡覺(parking)等待前任節點喚醒,這樣勢必會比非公平鎖添加很多paking和unparking的操作

一般的應用場景是: 如果有多個讀線程,一個寫線程,而且寫線程在操作的時候需要阻塞讀線程,那么此時就需要使用公平鎖,要不然可能寫線程一直獲取不到鎖,導致線程餓死。

 

再簡單說下鎖降級

重入還允許從寫入鎖降級為讀取鎖,其實現方式是:先獲取寫入鎖,然后獲取讀取鎖,最后釋放寫入鎖。但是,從讀取鎖升級到寫入鎖是不可能的。
rwl.readLock().lock();
      if (!cacheValid) {
         // Must release read lock before acquiring write lock
         rwl.readLock().unlock();
         rwl.writeLock().lock();
        
         if (!cacheValid) {
           data = ...
           cacheValid = true;
         }
       
         rwl.readLock().lock();
         rwl.writeLock().unlock(); // 降級:先獲取讀鎖再釋放寫鎖
      }

 

下面我們用讀寫鎖來實現讀者寫者問題

import java.util.Random;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockTest {
    public static void main(String[] args) {
        final Queue3 q3 = new Queue3();
        for (int i = 0; i < 3; i++) {
            new Thread() {
                public void run() {
                    while (true) {
                        q3.get();
                    }
                }
            }.start();
        }
        for (int i = 0; i < 3; i++) {
            new Thread() {
                public void run() {
                    while (true) {
                        q3.put(new Random().nextInt(10000));
                    }
                }
            }.start();
        }
    }
}

class Queue3 {
    private Object data = null;// 共享數據,只能有一個線程能寫該數據,但可以有多個線程同時讀該數據。
    private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    public void get() {
        rwl.readLock().lock();// 上讀鎖,其他線程只能讀不能寫
        System.out.println(Thread.currentThread().getName()
                + " be ready to read data!");
        try {
            Thread.sleep((long) (Math.random() * 1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()
                + "have read data :" + data);
        rwl.readLock().unlock(); // 釋放讀鎖,最好放在finnaly里面
    }

    public void put(Object data) {
        rwl.writeLock().lock();// 上寫鎖,不允許其他線程讀也不允許寫
        System.out.println(Thread.currentThread().getName()
                + " be ready to write data!");
        try {
            Thread.sleep((long) (Math.random() * 1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.data = data;
        System.out.println(Thread.currentThread().getName()
                + " have write data: " + data);
        rwl.writeLock().unlock();// 釋放寫鎖
    }
}

 

運行結果:

 

Thread-0 be ready to read data!
Thread-1 be ready to read data!
Thread-2 be ready to read data!
Thread-0have read data :null
Thread-2have read data :null
Thread-1have read data :null
Thread-5 be ready to write data!
Thread-5 have write data: 6934
Thread-5 be ready to write data!
Thread-5 have write data: 8987
Thread-5 be ready to write data!
Thread-5 have write data: 8496

 

2.2 Semaphore信號量

在1.4中已經介紹了用信號量來實現生產者消費者問題,現在我們將用信號量來實現讀者寫者問題,信號量的相關知識不再重復,直接看代碼

package test;

import java.util.Random;
import java.util.concurrent.Semaphore;

public class ReadWrite
{
    public static void main(String[] args)
    {
        final Queue3 q3 = new Queue3();
        for (int i = 0; i < 3; i++)
        {
            new Thread()
            {
                public void run()
                {
                    while (true)
                    {
                        try
                        {
                            Thread.sleep((long) (Math.random() * 1000));
                        }
                        catch (InterruptedException e)
                        {
                            e.printStackTrace();
                        }
                        q3.get();
                    }
                }
            }.start();
            
        }
        for (int i = 0; i < 3; i++)
        {
            new Thread()
            {
                public void run()
                {
                    while (true)
                    {
                        try
                        {
                            Thread.sleep((long) (Math.random() * 1000));
                        }
                        catch (InterruptedException e)
                        {
                            e.printStackTrace();
                        }
                        q3.put(new Random().nextInt(10000));
                    }
                }
            }.start();
        }
    }
}

class Queue3
{
    private Object data = null;// 共享數據,只能有一個線程能寫該數據,但可以有多個線程同時讀該數據。
    private Semaphore wmutex = new Semaphore(1);
    private Semaphore rmutex = new Semaphore(2);
    private int count = 0;

    public void get()
    {
        try
        {
            rmutex.acquire();
            if (count == 0)
                wmutex.acquire();// 當第一讀進程欲讀數據庫時,阻止寫進程寫
            count++;
            System.out.println(Thread.currentThread().getName()
                    + " be ready to read data!");
            try
            {
                Thread.sleep((long) (Math.random() * 1000));
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()
                    + "have read data :" + data);
            count--;
            if (count == 0)
                wmutex.release();
            rmutex.release();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

    public void put(Object data)
    {
        try
        {
            wmutex.acquire();
            System.out.println(Thread.currentThread().getName()
                    + " be ready to write data!");
            try
            {
                Thread.sleep((long) (Math.random() * 1000));
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            this.data = data;
            System.out.println(Thread.currentThread().getName()
                    + " have write data: " + data);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            wmutex.release();
        }
    }
}

 

單純使用信號量不能解決讀者與寫者問題,必須引入計數器count(可以用CountDownLatch代替 )對讀進程計數; count與wmutex結合使用,使讀讀能同時進行,讀寫排斥。count為0時表示讀進程開始,此時寫進程阻塞(wmutex被讀進程獲取),當count不為0時,表示有多個讀進程,就不用操作 wmutex了,因為第一個讀進程已經獲得了 wmutex。count表示有多少個讀進程在讀,每次有一個就+1,讀完了-1,當count==0時,表示讀進程都結束了。此時 wmutex釋放,寫進程才有機會獲得wmutex。為了使讀進程不要一直占有 wmutex,最好讓讀進程sleep一下,讓寫進程有機會獲得wmutex,使效果更明顯。

總結:

就此用Java實現生產者消費者問題(5種)和讀者寫者問題(2種)已經闡述完了,歡迎大家討論以及給出不同的解決方案。如有什么差錯,請留言或者私信我。如有什么新的想法,我會及時補充。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM