基於馬士兵老師的高並發筆記


一、分析下面程序輸出:

/**
 * 分析一下這個程序的輸出
 * @author mashibing
 */

package yxxy.c_005;

public class T implements Runnable {

    private int count = 10;
    
    public synchronized void run() { 
        count--;
        System.out.println(Thread.currentThread().getName() + " count = " + count);
    }
    
    public static void main(String[] args) {
        T t = new T();
        for(int i=0; i<5; i++) {
            new Thread(t, "THREAD" + i).start();
        }
    }
    
}
THREAD0 count = 9
THREAD4 count = 8
THREAD1 count = 7
THREAD3 count = 6
THREAD2 count = 5

分析:

啟動了5個線程,thread0先拿到這把鎖,開始執行,thread1-4都在等待准備搶這把鎖;thread0執行完之后,釋放鎖;thread4率先搶到了這把鎖,開始執行;執行完之后thread1又搶到了這把鎖,開始執行....;
所以看到每次線程訪問一次,count-1;而且thread執行的先后順序每次執行的結果不同,因為你不知道哪個線程先執行了;

二、對比上一個程序,分析這個程序的輸出:

/**
 * 對比上面一個小程序,分析一下這個程序的輸出
 * @author mashibing
 */

package yxxy.c_006;

public class T implements Runnable {

    private int count = 10;
    
    public synchronized void run() { 
        count--;
        System.out.println(Thread.currentThread().getName() + " count = " + count);
    }
    
    public static void main(String[] args) {
        
        for(int i=0; i<5; i++) {
            T t = new T();
            new Thread(t, "THREAD" + i).start();
        }
    }
    
}
THREAD0 count = 9
THREAD4 count = 9
THREAD3 count = 9
THREAD1 count = 9
THREAD2 count = 9

分析:

啟動了5個線程,因為每次都是new了一個t,每個線程都能鎖住t,一共有5個t,5個count;所以這里5個線程執行完,count都是9;
但是因為不知道哪個線程先被cpu執行,所以thread名字的順序是隨機的;

三、同步和非同步方法是否可以同時調用?

/**
 * 同步和非同步方法是否可以同時調用?
 * @author mashibing
 */

package yxxy.c_007;

public class T {

    public synchronized void m1() { 
        System.out.println(Thread.currentThread().getName() + " m1 start...");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " m1 end");
    }
    
    public void m2() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " m2 ");
    }
    
    public static void main(String[] args) {
        T t = new T();
        
        new Thread(()->t.m1(), "t1").start();
        new Thread(()->t.m2(), "t2").start();
    }
    
}
t1 m1 start...
t2 m2 
t1 m1 end

分析:

t1線程執行m1方法,開始睡10s,在這過程之中,t2線程執行m2方法,5s之后打印了m2;由此可見在m1執行的過程之中,m2是可以運行的。
同步方法的執行過程中,非同步方法是可以執行的。只有synchronized這樣的方法在運行時候才需要申請那把鎖,而別的方法是不需要申請那把鎖的。
new Thread(()->t.m1())這個寫法是java8里面的Lambda表達式,一種簡寫,還可以寫成這樣:

 

public static void main(String[] args) {
        T t = new T();
        
        new Thread(new Runnable(){
            @Override
            public void run() {
                t.m1();
            }
        }, "t1").start();
        
        new Thread(new Runnable(){
            @Override
            public void run() {
                t.m2();
            }
        }, "t2").start();
    }
public static void main(String[] args) {
        T t = new T();

        new Thread(t::m1, "t1").start();
        new Thread(t::m2, "t2").start();*/
    
}

四:對業務寫方法加鎖,對業務讀方法不加鎖,容易產生臟讀問題(dirtyRead)

臟讀:讀到沒有寫過程中沒有完成的數據

 

/**
 * 對業務寫方法加鎖
 * 對業務讀方法不加鎖
 * 容易產生臟讀問題(dirtyRead)
 */

package yxxy.c_008;

import java.util.concurrent.TimeUnit;

public class Account {
    String name;
    double balance;
    
    public synchronized void set(String name, double balance) {
        this.name = name;
        
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        this.balance = balance;
    }
    
    public double getBalance(String name) {
        return this.balance;
    }
    
    
    public static void main(String[] args) {
        Account a = new Account();
        new Thread(()->a.set("zhangsan", 100.0)).start();
        
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println(a.getBalance("zhangsan"));
        
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println(a.getBalance("zhangsan"));
    }
}
0.0
100.0

  

分析:

主線程里面第一次讀zhangsan里面的錢是0.0,第二次讀是100.0;原因是set修改錢的時候過程中,sleep了2s鍾;為什么sleep 2s就是放大了在線程的執行過程之中的時間差,set錢方法里面this.name=name和this.balance=balance之間可能是會被別的程序執行的;
在線程的執行過程set錢之中,盡管寫的這個方法set加上了synchronized鎖定了這個對象,鎖定這個對象過程之中,它仍然有可能被那些非鎖定的方法/非同步方法訪問的;
盡管對寫進行了加鎖,但是由於沒有對讀加鎖,那么有可能會讀到在寫的過程中還沒有完成的數據,產生了臟讀問題;
 
解決:
對讀方法枷鎖:
public synchronized double getBalance(String name) {
    return this.balance;
}

五、一個同步方法可以調用另外一個同步方法:  

一個線程已經擁有某個對象的鎖,再次申請的時候仍然會得到該對象的鎖.

 
/**
 * 一個同步方法可以調用另外一個同步方法,一個線程已經擁有某個對象的鎖,再次申請的時候仍然會得到該對象的鎖.
 * 也就是說synchronized獲得的鎖是可重入的.(可重入的意思就是獲得鎖之后還可以再獲得一遍)
 * @author mashibing
 */
package yxxy.c_009;

import java.util.concurrent.TimeUnit;

public class T {
    synchronized void m1() {
        System.out.println("m1 start");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        m2();
    }
    
    synchronized void m2() {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("m2");
    }
}

  

分析:

對t執行m1的時候,需要在t上面加把鎖,拿到這個鎖了,開始執行,執行鎖定的過程之中,調用了m2();
調用m2的過程中,發現m2也是需要申請一把鎖,而申請的這把鎖就是當前自己已經持有的這把鎖;
嚴格來講,這把鎖m1已經持有了,m2還能持有嗎?由於是在同一個線程里面,這個是沒關系的。它可以再去申請我自己已經擁有的這把鎖,實際上就在這把鎖上加個數字,從1變成2,鎖定了2次。總而言之,再去申請當前持有的這把鎖沒問題,仍然會得到該對象的鎖。

 

六、重入鎖的另外一種情形,繼承中子類的同步方法調用父類的同步方法

/**
 * 一個同步方法可以調用另外一個同步方法,一個線程已經擁有某個對象的鎖,再次申請的時候仍然會得到該對象的鎖.
 * 也就是說synchronized獲得的鎖是可重入的
 * 這里是繼承中有可能發生的情形,子類調用父類的同步方法
 * @author mashibing
 */
package yxxy.c_010;

import java.util.concurrent.TimeUnit;

public class T {
    synchronized void m() {
        System.out.println("m start");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("m end");
    }
    
    public static void main(String[] args) {
        new TT().m();
    }
    
}

class TT extends T {
    @Override
    synchronized void m() {
        System.out.println("child m start");
        super.m();
        System.out.println("child m end");
    }
}

七、synchronized同步方法如果遇到異常,鎖就會被釋放

/**
 * 程序在執行過程中,如果出現異常,默認情況鎖會被釋放
 * 所以,在並發處理的過程中,有異常要多加小心,不然可能會發生不一致的情況。
 * 比如,在一個web app處理過程中,多個servlet線程共同訪問同一個資源,這時如果異常處理不合適,
 * 在第一個線程中拋出異常,其他線程就會進入同步代碼區,有可能會訪問到異常產生時的數據。
 * 因此要非常小心的處理同步業務邏輯中的異常
 * @author mashibing
 */
package yxxy.c_011;

import java.util.concurrent.TimeUnit;

public class T {
    int count = 0;
    synchronized void m() {
        System.out.println(Thread.currentThread().getName() + " start");
        while(true) {
            count ++;
            System.out.println(Thread.currentThread().getName() + " count = " + count);
            try {
                TimeUnit.SECONDS.sleep(1);
                
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            if(count == 5) {
                int i = 1/0; //此處拋出異常,鎖將被釋放,要想不被釋放,可以在這里進行catch,然后讓循環繼續
            }
        }
    }
    
    public static void main(String[] args) {
        T t = new T();
        
        new Thread(()->t.m(), "t1").start();
        
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        new Thread(()->t.m(), "t2").start();
    }
    
}

  執行結果

t1 start
t1 count = 1
t1 count = 2
t1 count = 3
t1 count = 4
t1 count = 5
t2 start
t2 count = 6
Exception in thread "t1" java.lang.ArithmeticException: / by zero
    at yxxy.c_011.T.m(T.java:28)
    at yxxy.c_011.T.lambda$0(T.java:36)
    at java.lang.Thread.run(Thread.java:745)
t2 count = 7
t2 count = 8
t2 count = 9 
分析:
t1線程啟動后,如果int i=1/0這里拋了異常后,鎖不被釋放的話,t2線程就永遠啟動不起來,永遠執行不了;
但是拋出異常之后,鎖被釋放了,t2得到了執行;
 
解決:
處理異常,鎖不被釋放,循環繼續,t2線程永遠執行不了:
/**
 * 程序在執行過程中,如果出現異常,默認情況鎖會被釋放
 * 所以,在並發處理的過程中,有異常要多加小心,不然可能會發生不一致的情況。
 * 比如,在一個web app處理過程中,多個servlet線程共同訪問同一個資源,這時如果異常處理不合適,
 * 在第一個線程中拋出異常,其他線程就會進入同步代碼區,有可能會訪問到異常產生時的數據。
 * 因此要非常小心的處理同步業務邏輯中的異常
 * @author mashibing
 */
package yxxy.c_011;

import java.util.concurrent.TimeUnit;

public class T {
    int count = 0;
    synchronized void m() {
        System.out.println(Thread.currentThread().getName() + " start");
        while(true) {
            count ++;
            System.out.println(Thread.currentThread().getName() + " count = " + count);
            try {
                TimeUnit.SECONDS.sleep(1);
                
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            if(count == 5) {
                try{
                    int i = 1/0; //此處拋出異常,鎖將被釋放,要想不被釋放,可以在這里進行catch,然后讓循環繼續
                }catch(Exception e){
                    System.out.println(e.getMessage());
                }
            }
        }
    }
    
    public static void main(String[] args) {
        T t = new T();
        
        new Thread(()->t.m(), "t1").start();
        
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        new Thread(()->t.m(), "t2").start();
    }
    
}
t1 start
t1 count = 1
t1 count = 2
t1 count = 3
t1 count = 4
t1 count = 5
/ by zero
t1 count = 6
t1 count = 7
t1 count = 8
t1 count = 9
t1 count = 10
t1 count = 11
t1 count = 12

  

八、volatile關鍵字

/**
 * volatile 關鍵字,使一個變量在多個線程間可見
 * A B線程都用到一個變量,java默認是A線程中保留一份copy,這樣如果B線程修改了該變量,則A線程未必知道
 * 使用volatile關鍵字,會讓所有線程都會讀到變量的修改值
 * 
 * 在下面的代碼中,running是存在於堆內存的t對象中
 * 當線程t1開始運行的時候,會把running值從內存中讀到t1線程的工作區,在運行過程中直接使用這個copy,並不會每次都去
 * 讀取堆內存,這樣,當主線程修改running的值之后,t1線程感知不到,所以不會停止運行
 * 
 * 使用volatile,將會強制所有線程都去堆內存中讀取running的值
 * 
 * 可以閱讀這篇文章進行更深入的理解
 * http://www.cnblogs.com/nexiyi/p/java_memory_model_and_thread.html
 * 
 * volatile並不能保證多個線程共同修改running變量時所帶來的不一致問題,也就是說volatile不能替代synchronized
 * @author mashibing
 */
package yxxy.c_012;

import java.util.concurrent.TimeUnit;

public class T {
    volatile boolean running = true; //對比一下有無volatile的情況下,整個程序運行結果的區別
    void m() {
        System.out.println("m start");
        while(running) {
        }
        System.out.println("m end!");
    }
    
    public static void main(String[] args) {
        T t = new T();
        
        new Thread(t::m, "t1").start();
        
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        t.running = false;
    }
    
}

  

分析:

不加volatile是不行的,線程1沒法結束,那么volatile到底是干嘛的?
線程之間要讓running這個值進行可見,這里要涉及到java的內存模型,java對於線程處理的內存模型;
在jmm(java memory model)里面有個內存它叫主內存,我們所熟識的棧內存,堆內存都可以認為是主內存;每一個線程在執行的過程之中,它有一個線程自己的一塊內存,(實際上不能認為這塊是內存,有可能它是內存,還有cpu上的緩沖區,是一個統稱,就是線程存放它自己變量的一塊內存),如果兩個cpu在運行不同線程的話,每個線程上都有自己的一塊緩沖區,緩沖區就是把主內存JMM里面的內容讀過來在緩沖區里面進行修改,如果+1,+1加了好多次再寫回去;
現在有個running在主內存里面,值是true,占一個字節;
第一個線程啟動的時候會把這個字節copy到自己的緩沖區里面,cpu在處理的過程之中就不再去主內存里面讀了;它在運行這個線程的過程之中,由於這個cpu非常的忙,在while(running)里面,沒空再去主線程里面去刷一下running值了;它一直讀自己緩存里面的內容,running永遠是true;
第二個主線程里面,它首先也是把running讀到它自己的緩沖區,然后把running改成false,發現running已經改了那就把running寫回到主內存里面去;寫回到主內存之后,但是第一個線程它沒有在主內存重新讀啊,所以第一個線程永遠結束不了;
 
加了volatile,第一個線程運行中,不是要求你每次while(running)循環的時候都要到主內存里面讀一次running的值,而是說一旦主內存running這個值發生改變后會通知別的線程,說你們的緩沖區里面內容過期了請重新讀一下,第一個線程再去讀的時候running已經改了,所以線程結束了。
加了volatile的意思就是當running改了后會通知其他的所有線程的緩沖區,說你們那邊的值已經過期了,請你們再去主內存里面重新讀一下。
而並不是通知所有的線程cpu執行的時候每次用的時候都要去主內存讀一下,不是,是寫完之后進行緩存過期通知。
 
要保證線程之間的可見性,那么需要對兩個線程共同訪問的變量加上volatile;如果不想加volatile那只能用synchronized;但volatile的效率要比synchronized高的多;所以在很多高並發的框架里面好多的volatile關鍵字都在用;比如JDK的並發容器的源碼;能用volatile的時候就不要加鎖,程序的並發性就要提高很多;

 圖:

九、volatile並不能保證多個線程共同修改running變量時所帶來的不一致問題,也就是說volatile不能替代synchronized

/**
 * volatile並不能保證多個線程共同修改running變量時所帶來的不一致問題,也就是說volatile不能替代synchronized
 * 運行下面的程序,並分析結果
 * @author mashibing
 */
package yxxy.c_013;

import java.util.ArrayList;
import java.util.List;

public class T {
    volatile int count = 0; 
    void m() {
        for(int i=0; i<10000; i++) count++;
    }
    
    public static void main(String[] args) {
        T t = new T();
        
        List<Thread> threads = new ArrayList<Thread>();
        
        for(int i=0; i<10; i++) {
            threads.add(new Thread(t::m, "thread-"+i));
        }
        
        threads.forEach((o)->o.start());
        
        threads.forEach((o)->{
            try {
                o.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        System.out.println(t.count);
        
    }
    
}

  

volatile和synchronized區別?

volatile只保證可見性,並不保證原子性;
synchronized既保證可見性,又保證原子性;但效率要比volatile低不少;
如果只需要保證可見性的時候,使用volatile,不要使用synchronized;
 
Thread.join()方法簡單解釋:插隊,等待該線程完成后執行該線程
 

十、對比上一個程序,可以用synchronized解決

/**
 * 解決同樣的問題的更高效的方法,使用AtomXXX類
 * AtomXXX類本身方法都是原子性的,但不能保證多個方法連續調用是原子性的
 * @author mashibing
 */
package yxxy.c_015;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class T {
    AtomicInteger count = new AtomicInteger(0); 

    void m() {
        for (int i = 0; i < 10000; i++)
            count.incrementAndGet();  //count++
    }

    public static void main(String[] args) {
        T t = new T();

        List<Thread> threads = new ArrayList<Thread>();

        for (int i = 0; i < 10; i++) {
            threads.add(new Thread(t::m, "thread-" + i));
        }

        threads.forEach((o) -> o.start());

        threads.forEach((o) -> {
            try {
                o.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        System.out.println(t.count);
    }
}

  

運行結果:100000

AtomicInteger:原子性操作的int類型;
incrementAndGet(): 原子方法,你可以認為它是加了synchronized的,當然它內部實現不是用synchronized的而是用系統相當底層的實現來去完成的;它的效率要比synchronized高很多;

十二、synchronized優化

/**
 * 鎖定某對象o,如果o的屬性發生改變,不影響鎖的使用
 * 但是如果o變成另外一個對象,則鎖定的對象發生改變
 * 應該避免將鎖定對象的引用變成另外的對象
 * @author mashibing
 */
package yxxy.c_017;

import java.util.concurrent.TimeUnit;

public class T {
    Object o = new Object();

    void m() {
        synchronized(o) {
            while(true) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            }
        }
    }
    
    public static void main(String[] args) {
        T t = new T();
        //啟動第一個線程
        new Thread(t::m, "t1").start();
        
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //創建第二個線程
        Thread t2 = new Thread(t::m, "t2");
        
        t.o = new Object(); //鎖對象發生改變,所以t2線程得以執行,如果注釋掉這句話,線程2將永遠得不到執行機會
        
        t2.start();
    }
}

  

分析:

m2()的並發效率要比m1()高不少;細粒度的鎖執行效率要比粗粒度的鎖執行效率要高不少;

十三、避免將鎖定對象的引用變成另外的對象,例子:

 

/**
 * 鎖定某對象o,如果o的屬性發生改變,不影響鎖的使用
 * 但是如果o變成另外一個對象,則鎖定的對象發生改變
 * 應該避免將鎖定對象的引用變成另外的對象
 * @author mashibing
 */
package yxxy.c_017;

import java.util.concurrent.TimeUnit;

public class T {
    Object o = new Object();

    void m() {
        synchronized(o) {
            while(true) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            }
        }
    }
    
    public static void main(String[] args) {
        T t = new T();
        //啟動第一個線程
        new Thread(t::m, "t1").start();
        
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //創建第二個線程
        Thread t2 = new Thread(t::m, "t2");
        
        t.o = new Object(); //鎖對象發生改變,所以t2線程得以執行,如果注釋掉這句話,線程2將永遠得不到執行機會
        
        t2.start();
    }
}

  

分析:

t.o = new Object();鎖的對象發生改變,就不需要鎖原來的對象,直接鎖新對象就行了;而新對象還沒有鎖的,所以t2線程就被執行了;
所以,這就證明這個鎖是鎖在什么地方?是鎖在堆內存里new出來的對象上,不是鎖在棧內存里頭o的引用,不是鎖的引用,而是鎖new出來的真正的對象;
鎖的信息是記錄在堆內存里的。


十四、不要以字符串常量作為鎖定對象

/**
 * 不要以字符串常量作為鎖定對象
 * 在下面的例子中,m1和m2其實鎖定的是同一個對象
 * 這種情況還會發生比較詭異的現象,比如你用到了一個類庫,在該類庫中代碼鎖定了字符串“Hello”,
 * 但是你讀不到源碼,所以你在自己的代碼中也鎖定了"Hello",這時候就有可能發生非常詭異的死鎖阻塞,
 * 因為你的程序和你用到的類庫不經意間使用了同一把鎖
 * 
 * jetty
 * 
 * @author mashibing
 */
package yxxy.c_018;

public class T {
    
    String s1 = "Hello";
    String s2 = "Hello";

    void m1() {
        synchronized(s1) {
            
        }
    }
    
    void m2() {
        synchronized(s2) {
            
        }
    }
}

  

 十五:分析一道面試題

/**
 * 曾經的面試題:(淘寶?)
 * 實現一個容器,提供兩個方法,add,size
 * 寫兩個線程,線程1添加10個元素到容器中,線程2實現監控元素的個數,當個數到5個時,線程2給出提示並結束
 * 
 * 分析下面這個程序,能完成這個功能嗎?
 * @author mashibing
 */
package yxxy.c_019;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;


public class MyContainer1 {

    List lists = new ArrayList();

    public void add(Object o) {
        lists.add(o);
    }

    public int size() {
        return lists.size();
    }
    
    public static void main(String[] args) {
        MyContainer1 c = new MyContainer1();

        new Thread(() -> {
            for(int i=0; i<10; i++) {
                c.add(new Object());
                System.out.println("add " + i);
                
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "t1").start();
        
        new Thread(() -> {
            while(true) {
                if(c.size() == 5) {
                    break;
                }
            }
            System.out.println("t2 結束");
        }, "t2").start();
    }
}

  

分析:
不能完成這個功能;
添加volatile關鍵字,修改為如下:
添加volatile:
/**
 * 曾經的面試題:(淘寶?)
 * 實現一個容器,提供兩個方法,add,size
 * 寫兩個線程,線程1添加10個元素到容器中,線程2實現監控元素的個數,當個數到5個時,線程2給出提示並結束
 * 
 * 給lists添加volatile之后,t2能夠接到通知,但是,t2線程的死循環很浪費cpu,如果不用死循環,該怎么做呢?
 * @author mashibing
 */
package yxxy.c_019;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;


public class MyContainer2 {

    //添加volatile,使t2能夠得到通知
    volatile List lists = new ArrayList();

    public void add(Object o) {
        lists.add(o);
    }

    public int size() {
        return lists.size();
    }
    
    public static void main(String[] args) {
        MyContainer2 c = new MyContainer2();

        new Thread(() -> {
            for(int i=0; i<10; i++) {
                c.add(new Object());
                System.out.println("add " + i);
                
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "t1").start();
        
        new Thread(() -> {
            while(true) {
                if(c.size() == 5) {
                    break;
                }
            }
            System.out.println("t2 結束");
        }, "t2").start();
    }
}

  

但是上面代碼還存在兩個問題:

1)由於沒加同步,c.size()等於5的時候,假如另外一個線程又往上增加了1個,實際上這時候已經等於6了才break,所以不是很精確;

2)浪費CPU,t2線程的死循環很浪費cpu

 

 使用wait和notify

 

/**
 * 曾經的面試題:(淘寶?)
 * 實現一個容器,提供兩個方法,add,size
 * 寫兩個線程,線程1添加10個元素到容器中,線程2實現監控元素的個數,當個數到5個時,線程2給出提示並結束
 * 
 * 給lists添加volatile之后,t2能夠接到通知,但是,t2線程的死循環很浪費cpu,如果不用死循環,該怎么做呢?
 * 
 * 這里使用wait和notify做到,wait會釋放鎖,而notify不會釋放鎖
 * 需要注意的是,運用這種方法,必須要保證t2先執行,也就是首先讓t2監聽才可以
 * 
 * 閱讀下面的程序,並分析輸出結果
 * 可以讀到輸出結果並不是size=5時t2退出,而是t1結束時t2才接收到通知而退出
 * 想想這是為什么?
 * @author mashibing
 */
package yxxy.c_019;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;


public class MyContainer3 {

    //添加volatile,使t2能夠得到通知
    volatile List lists = new ArrayList();

    public void add(Object o) {
        lists.add(o);
    }

    public int size() {
        return lists.size();
    }
    
    public static void main(String[] args) {
        MyContainer3 c = new MyContainer3();
        
        final Object lock = new Object();
        
        new Thread(() -> {
            synchronized(lock) {
                System.out.println("t2啟動");
                if(c.size() != 5) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("t2 結束");
            }
            
        }, "t2").start();
        
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        new Thread(() -> {
            System.out.println("t1啟動");
            synchronized(lock) {
                for(int i=0; i<10; i++) {
                    c.add(new Object());
                    System.out.println("add " + i);
                    
                    if(c.size() == 5) {
                        lock.notify();
                    }
                    
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "t1").start();
        
        
    }
}

  

分析:

1)解釋wait和notify,notifyAll方法:

 wait:讓正在運行的線程進入等待狀態,並且釋放鎖

notify:喚醒某個正在等待的線程,不能精確換新某個線程

notifyAll:喚醒所有正在等待的線程

2)為什么size=5了,t2線程沒有結束?
由於notify不會釋放鎖,即便你通知了t2,讓它起來了,它起來之后想往下運行,wait了之后想重新繼續往下運行是需要重新得到lock這把鎖的,可是很不幸的是t1已經把這個鎖鎖定了,所以只有等t1執行完了,t2才會繼續執行。

 

/**
 * 曾經的面試題:(淘寶?)
 * 實現一個容器,提供兩個方法,add,size
 * 寫兩個線程,線程1添加10個元素到容器中,線程2實現監控元素的個數,當個數到5個時,線程2給出提示並結束
 * 
 * 給lists添加volatile之后,t2能夠接到通知,但是,t2線程的死循環很浪費cpu,如果不用死循環,該怎么做呢?
 * 
 * 這里使用wait和notify做到,wait會釋放鎖,而notify不會釋放鎖
 * 需要注意的是,運用這種方法,必須要保證t2先執行,也就是首先讓t2監聽才可以
 * 
 * 閱讀下面的程序,並分析輸出結果
 * 可以讀到輸出結果並不是size=5時t2退出,而是t1結束時t2才接收到通知而退出
 * 想想這是為什么?
 * 
 * notify之后,t1必須釋放鎖,t2退出后,也必須notify,通知t1繼續執行
 * 整個通信過程比較繁瑣
 * @author mashibing
 */
package yxxy.c_019;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;


public class MyContainer4 {

    //添加volatile,使t2能夠得到通知
    volatile List lists = new ArrayList();

    public void add(Object o) {
        lists.add(o);
    }

    public int size() {
        return lists.size();
    }
    
    public static void main(String[] args) {
        MyContainer4 c = new MyContainer4();
        
        final Object lock = new Object();
        
        new Thread(() -> {
            synchronized(lock) {
                System.out.println("t2啟動");
                if(c.size() != 5) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("t2 結束");
                //通知t1繼續執行
                lock.notify();
            }
            
        }, "t2").start();
        
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        new Thread(() -> {
            System.out.println("t1啟動");
            synchronized(lock) {
                for(int i=0; i<10; i++) {
                    c.add(new Object());
                    System.out.println("add " + i);
                    
                    if(c.size() == 5) {
                        lock.notify();
                        //釋放鎖,讓t2得以執行
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "t1").start();
        
        
    }
}

  流程圖:

 

使用門閂
CountDownLatch(1),CountDown往下數,當1變為0的時候門閂就開了,latch.countDown()調用一次數就往下-1;
latch.await(),門閂的等待是不需要鎖定任何對象的;

 

/**
 * 曾經的面試題:(淘寶?)
 * 實現一個容器,提供兩個方法,add,size
 * 寫兩個線程,線程1添加10個元素到容器中,線程2實現監控元素的個數,當個數到5個時,線程2給出提示並結束
 * 
 * 給lists添加volatile之后,t2能夠接到通知,但是,t2線程的死循環很浪費cpu,如果不用死循環,該怎么做呢?
 * 
 * 這里使用wait和notify做到,wait會釋放鎖,而notify不會釋放鎖
 * 需要注意的是,運用這種方法,必須要保證t2先執行,也就是首先讓t2監聽才可以
 * 
 * 閱讀下面的程序,並分析輸出結果
 * 可以讀到輸出結果並不是size=5時t2退出,而是t1結束時t2才接收到通知而退出
 * 想想這是為什么?
 * 
 * notify之后,t1必須釋放鎖,t2退出后,也必須notify,通知t1繼續執行
 * 整個通信過程比較繁瑣
 * 
 * 使用Latch(門閂)替代wait notify來進行通知
 * 好處是通信方式簡單,同時也可以指定等待時間
 * 使用await和countdown方法替代wait和notify
 * CountDownLatch不涉及鎖定,當count的值為零時當前線程繼續運行
 * 當不涉及同步,只是涉及線程通信的時候,用synchronized + wait/notify就顯得太重了
 * 這時應該考慮countdownlatch/cyclicbarrier/semaphore
 * @author mashibing
 */
package yxxy.c_019;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class MyContainer5 {

    // 添加volatile,使t2能夠得到通知
    volatile List lists = new ArrayList();

    public void add(Object o) {
        lists.add(o);
    }

    public int size() {
        return lists.size();
    }

    public static void main(String[] args) {
        MyContainer5 c = new MyContainer5();

        CountDownLatch latch = new CountDownLatch(1);

        new Thread(() -> {
            System.out.println("t2啟動");
            if (c.size() != 5) {
                try {
                    latch.await();
                    
                    //也可以指定等待時間
                    //latch.await(5000, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("t2 結束");

        }, "t2").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        new Thread(() -> {
            System.out.println("t1啟動");
            for (int i = 0; i < 10; i++) {
                c.add(new Object());
                System.out.println("add " + i);

                if (c.size() == 5) {
                    // 打開門閂,讓t2得以執行
                    latch.countDown();
                }

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }, "t1").start();

    }
}

  

十六:ReentrantLock

jdk里面提供了一個新的鎖,是手工鎖,它是用來替代synchronized的,叫ReentrantLock,重入鎖,其實synchronized也是可重入的,但是這把鎖是和synchronized是有區別的,ReentrantLock是用新的同步方法寫的時候經常用的一個工具;
復習之前講的synchronized同步:
/**
 * reentrantlock用於替代synchronized
 * 本例中由於m1鎖定this,只有m1執行完畢的時候,m2才能執行
 * 這里是復習synchronized最原始的語義
 * @author mashibing
 */
package yxxy.c_020;

import java.util.concurrent.TimeUnit;

public class ReentrantLock1 {
    synchronized void m1() {
        for(int i=0; i<10; i++) {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(i);
        }
        
    }
    
    synchronized void m2() {
        System.out.println("m2 ...");
    }
    
    public static void main(String[] args) {
        ReentrantLock1 rl = new ReentrantLock1();
        new Thread(rl::m1).start();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(rl::m2).start();
    }
}

 使用ReentrantLock完成同樣功能

 

/**
 * reentrantlock用於替代synchronized
 * 使用reentrantlock可以完成同樣的功能
 * 需要注意的是,必須要必須要必須要手動釋放鎖(重要的事情說三遍)
 * 使用syn鎖定的話如果遇到異常,jvm會自動釋放鎖,但是lock必須手動釋放鎖,因此經常在finally中進行鎖的釋放
 * @author mashibing
 */
package yxxy.c_020;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLock2 {
    Lock lock = new ReentrantLock();

    void m1() {
        try {
            lock.lock(); //synchronized(this)
            for (int i = 0; i < 10; i++) {
                TimeUnit.SECONDS.sleep(1);

                System.out.println(i);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    void m2() {
        lock.lock();
        System.out.println("m2 ...");
        lock.unlock();
    }

    public static void main(String[] args) {
        ReentrantLock2 rl = new ReentrantLock2();
        new Thread(rl::m1).start();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(rl::m2).start();
    }
}

 

 

 十七:RenntrantLock的tryLock

 

/**
 * 使用reentrantlock可以進行“嘗試鎖定”tryLock,這樣無法鎖定,或者在指定時間內無法鎖定,線程可以決定是否繼續等待
 * @author mashibing
 */
package yxxy.c_020;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLock3 {
    Lock lock = new ReentrantLock();

    void m1() {
        try {
            lock.lock();
            for (int i = 0; i < 10; i++) {
                TimeUnit.SECONDS.sleep(1);

                System.out.println(i);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 使用tryLock進行嘗試鎖定,不管鎖定與否,方法都將繼續執行
     * 可以根據tryLock的返回值來判定是否鎖定
     * 也可以指定tryLock的時間,由於tryLock(time)拋出異常,所以要注意unclock的處理,必須放到finally中
     */
    void m2() {
        /*
        boolean locked = lock.tryLock();
        System.out.println("m2 ..." + locked);
        if(locked) lock.unlock();
        */
        
        boolean locked = false;
        
        try {
            locked = lock.tryLock(5, TimeUnit.SECONDS);
            System.out.println("m2 ..." + locked);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(locked) lock.unlock();
        }
        
    }

    public static void main(String[] args) {
        ReentrantLock3 rl = new ReentrantLock3();
        new Thread(rl::m1).start();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(rl::m2).start();
    }
}
1
3
5
m2 ...false
7
9

十八:ReentrantLock的lockInterruptibly方法

/**
 * 使用ReentrantLock還可以調用lockInterruptibly方法,可以對線程interrupt方法做出響應,
 * 在一個線程等待鎖的過程中,可以被打斷
 * 
 * @author mashibing
 */
package yxxy.c_020;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

public class ReentrantLock4 {
        
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        
        
        Thread t1 = new Thread(()->{
            try {
                lock.lock();
                System.out.println("t1 start");
                TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
                System.out.println("t1 end");
            } catch (InterruptedException e) {
                System.out.println("interrupted!");
            } finally {
                lock.unlock();
            }
        });
        t1.start();
        
        Thread t2 = new Thread(()->{
            try {
                //lock.lock();
                lock.lockInterruptibly(); //可以對interrupt()方法做出響應
                System.out.println("t2 start");
            } catch (InterruptedException e) {
                System.out.println("interrupted!");
            } finally {
                lock.unlock();
            }
        });
        t2.start();
        
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        t2.interrupt(); //打斷線程2的等待
        
    }
}
t1 start
interrupted!
Exception in thread "Thread-1" java.lang.IllegalMonitorStateException
    at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
    at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
    at yxxy.c_020.ReentrantLock4.lambda$1(ReentrantLock4.java:42)
    at java.lang.Thread.run(Thread.java:745)

 

 

 分析:

t1線程牢牢的拿到鎖之后,一直sleep不會釋放,如果t2線程中的run方法使用lock.lock(),那么t2線程就會一直傻傻的等着這把鎖,不能被其他線程打斷;

而使用lockInterruptibly()方法是可以被打斷的,主線程main調用t2.interrupt()來打斷t2,告訴他是不會拿到這把鎖的,別等了;

報錯是因為lock.unlock()這個方法報錯的,因為都沒有拿到鎖,無法unlock();是代碼的問題,應該判斷有鎖,已經鎖定的情況下才lock.unlock();

 

 十九:ReentrantLock還可以指定為公平鎖

公平鎖:等待時間長的線程先執行

競爭鎖:多個線程一起競爭一個鎖

競爭鎖相對效率高

/**
 * ReentrantLock還可以指定為公平鎖
 * 
 * @author mashibing
 */
package yxxy.c_020;

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLock5 extends Thread {
        
    private static ReentrantLock lock = new ReentrantLock(); //參數為true表示為公平鎖,請對比輸出結果
    
    public void run() {
        for(int i=0; i<100; i++) {
            lock.lock();
            try{
                System.out.println(Thread.currentThread().getName()+"獲得鎖");
            }finally{
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) {
        ReentrantLock5 rl=new ReentrantLock5();
        Thread th1=new Thread(rl);
        Thread th2=new Thread(rl);
        th1.start();
        th2.start();
    }
}

 

二十:面試經典(生產者消費者問題) 

要求:寫一個固定容量同步容器,擁有put和get方法,以及getCount方法,能夠支持2個生產者線程以及10個消費者線程的阻塞調用

同步容器:多個線程共同訪問的時候,不能出問題,就是要加鎖了,下面這個是阻塞式的同步容器;
代碼:
復制代碼
/**
 * 面試題:寫一個固定容量同步容器,擁有put和get方法,以及getCount方法,
 * 能夠支持2個生產者線程以及10個消費者線程的阻塞調用
 * 
 * 使用wait和notify/notifyAll來實現
 * 
 * @author mashibing
 */
package yxxy.c_021;

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;

public class MyContainer1<T> {
    final private LinkedList<T> lists = new LinkedList<>();
    final private int MAX = 10; //最多10個元素
    private int count = 0;
    
    
    public synchronized void put(T t) {
        while(lists.size() == MAX) { //想想為什么用while而不是用if?
            try {
                this.wait(); //effective java
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        lists.add(t);
        ++count;
        this.notifyAll(); //通知消費者線程進行消費
    }
    
    public synchronized T get() {
        T t = null;
        while(lists.size() == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        t = lists.removeFirst();
        count --;
        this.notifyAll(); //通知生產者進行生產
        return t;
    }
    
    public static void main(String[] args) {
        MyContainer1<String> c = new MyContainer1<>();
        //啟動消費者線程
        for(int i=0; i<10; i++) {
            new Thread(()->{
                for(int j=0; j<5; j++) System.out.println(c.get());
            }, "c" + i).start();
        }
        
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        //啟動生產者線程
        for(int i=0; i<2; i++) {
            new Thread(()->{
                for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
            }, "p" + i).start();
        }
    }
}

 

1.為什么用while而不用if?

假設容器中已經滿了,如果用的是if,這個線程A發現list.size()==max已經滿了,就this.wait()住了;
如果容器中被拿走了元素,線程A被叫醒了,它會從this.wait()開始繼續往下運行,准備執行lists.add(),可是它被叫醒了之后還沒有往里扔的時候,另外一個線程往list里面扔了一個,線程A拿到鎖之后不再進行if判斷,而是繼續執行lists.add()就會出問題了;
如果用while,this.wait()繼續往下執行的時候需要在while中再檢查一遍,就不會出問題;
 
2.put()方法中為什么使用notifyAll而不是notify?
如果使用notify,notify是叫醒一個線程,那么就有可能叫醒的一個線程又是生產者,整個程序可能不動了,都wait住了;

使用wati和notify寫線程程序的時候寫起來會比較費勁,使用Lock和Condition

/**
 * 面試題:寫一個固定容量同步容器,擁有put和get方法,以及getCount方法,
 * 能夠支持2個生產者線程以及10個消費者線程的阻塞調用
 * 
 * 使用Lock和Condition來實現
 * 對比兩種方式,Condition的方式可以更加精確的指定哪些線程被喚醒
 * 
 * @author mashibing
 */
package yxxy.c_021;

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyContainer2<T> {
    final private LinkedList<T> lists = new LinkedList<>();
    final private int MAX = 10; //最多10個元素
    private int count = 0;
    
    private Lock lock = new ReentrantLock();
    private Condition producer = lock.newCondition();
    private Condition consumer = lock.newCondition();
    
    public void put(T t) {
        try {
            lock.lock();
            while(lists.size() == MAX) {
                producer.await();
            }
            
            lists.add(t);
            ++count;
            consumer.signalAll(); //通知消費者線程進行消費
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    
    public T get() {
        T t = null;
        try {
            lock.lock();
            while(lists.size() == 0) {
                consumer.await();
            }
            t = lists.removeFirst();
            count --;
            producer.signalAll(); //通知生產者進行生產
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return t;
    }
    
    public static void main(String[] args) {
        MyContainer2<String> c = new MyContainer2<>();
        //啟動消費者線程
        for(int i=0; i<10; i++) {
            new Thread(()->{
                for(int j=0; j<5; j++){
                    System.out.println(c.get());
                }
            }, "c" + i).start();
        }
        
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        //啟動生產者線程
        for(int i=0; i<2; i++) {
            new Thread(()->{
                for(int j=0; j<25; j++) {
                    c.put(Thread.currentThread().getName() + " " + j);
                }
            }, "p" + i).start();
        }
    }
}

使用lock和condition好處在於可以精確的通知那些線程被叫醒,哪些線程不必被叫醒,這個效率顯然要比notifyAll把所有線程全叫醒要高很多。

 

 二十一:ThreadLocal

/**
 * ThreadLocal線程局部變量
 */
package yxxy.c_022;

import java.util.concurrent.TimeUnit;

public class ThreadLocal1 {
    volatile static Person p = new Person();
    
    public static void main(String[] args) {
                
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            System.out.println(p.name);
        }).start();
        
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            p.name = "lisi";
        }).start();
    }
}

class Person {
    String name = "zhangsan";
}
現在這兩個線程是互相影響的;第二個線程改了名字之后,第一個線程就能讀的到了;
有的時候就想線程2的改變,不想讓線程1知道,這時候怎么做?
/**
 * ThreadLocal線程局部變量
 *
 * ThreadLocal是使用空間換時間,synchronized是使用時間換空間
 * 比如在hibernate中session就存在與ThreadLocal中,避免synchronized的使用
 *
 * 運行下面的程序,理解ThreadLocal
 */
package yxxy.c_022;

import java.util.concurrent.TimeUnit;

public class ThreadLocal2 {
    static ThreadLocal<Person> tl = new ThreadLocal<>();
    
    public static void main(String[] args) {
                
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            System.out.println(tl.get());
        }).start();
        
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            tl.set(new Person());
        }).start(); 
    }
    
    static class Person {
        String name = "zhangsan";
    }
}

 

console輸出:null

 

ThreadLocal的意思就是,tl里面的變量,自己的線程自己用;你別的線程里要想用的話,不好意思你自己往里扔;不能用我線程里面放的東西;相當於每個線程都有自己的變量,互相之間不會產生沖突;
可以理解為person對象每個線程里面拷貝了一份,改的都是自己那份,都是自己線程本地的變量,所以空間換時間;ThreadLocal在效率上會更高一些;
有一些需要加鎖的對象,如果它們在使用的時候自己進行的改變,自己維護這個狀態,不用通知其他線程,那么這個時候可以使用ThreadLocal;

 

 二十二:高並發容器

 

一、需求背景:          

有N張火車票,每張票都有一個編號,同時有10個窗口對外售票, 請寫一個模擬程序。

分析下面的程序可能會產生哪些問題?重復銷售?超量銷售?

/**
 * 有N張火車票,每張票都有一個編號
 * 同時有10個窗口對外售票
 * 請寫一個模擬程序
 * 
 * 分析下面的程序可能會產生哪些問題?
 * 重復銷售?超量銷售?
 * 
 * @author 馬士兵
 */
package yxxy.c_024;

import java.util.ArrayList;
import java.util.List;

public class TicketSeller1 {
    static List<String> tickets = new ArrayList<>();
    
    static {
        for(int i=0; i<10000; i++) tickets.add("票編號:" + i);
    }
    
    public static void main(String[] args) {
        for(int i=0; i<10; i++) {
            new Thread(()->{
                while(tickets.size() > 0) {
                    System.out.println("銷售了--" + tickets.remove(0));
                }
            }).start();
        }
    }
}

 可能賣重;一張票可能對多個線程同時remove(0),所以可能一張票被賣出去多次;也可能最后一張票的時候都被多個線程remove(),程序會報錯,總之,不加鎖是不行的。

ArrayList不是同步的,remove、add等各種方法全都不是同步的;一定會出問題;

二、使用Vector

/**
 * 使用Vector或者Collections.synchronizedXXX
 * 分析一下,這樣能解決問題嗎?
 * 
 * @author 馬士兵
 */
package yxxy.c_024;

import java.util.Vector;
import java.util.concurrent.TimeUnit;

public class TicketSeller2 {
    static Vector<String> tickets = new Vector<>();
    
    static {
        for(int i=0; i<1000; i++) tickets.add("票 編號:" + i);
    }
    
    public static void main(String[] args) {
        
        for(int i=0; i<10; i++) {
            new Thread(()->{
                while(tickets.size() > 0) {
                    
                    try {
                        TimeUnit.MILLISECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    
                    System.out.println("銷售了--" + tickets.remove(0));
                }
            }).start();
        }
    }
}

 

 Vector是一個同步容器,所有的方法都是加鎖的;

雖然說在Vector里面remove方法是原子的,但是while條件中判斷和remove是分離的;如果在while條件和remove之間被打斷的話,問題依舊;(假設剩下最后一張票,多個線程爭搶同一張票,每一個線程判斷的size大於0, 雖然size和remove都是原子性的,但是在判斷和remove中間的這段過程中,還是可能被打斷,A線程判斷了size>0,還沒有remove的時候被打斷了,B線程把票拿走了,A線程繼續往下執行的時候再remove就出問題了。)
所以只是把List換成同步容器Vector,問題依舊;

 三、使用synchronized加鎖:

 

/**
 * 就算操作A和B都是同步的,但A和B組成的復合操作也未必是同步的,仍然需要自己進行同步
 * 就像這個程序,判斷size和進行remove必須是一整個的原子操作
 * 
 * @author 馬士兵
 */
package yxxy.c_024;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class TicketSeller3 {
    static List<String> tickets = new LinkedList<>();
    
    
    static {
        for(int i=0; i<1000; i++) tickets.add("票 編號:" + i);
    }
    
    public static void main(String[] args) {
        
        for(int i=0; i<10; i++) {
            new Thread(()->{
                while(true) {
                    synchronized(tickets) {
                        if(tickets.size() <= 0) break;
                        
                        try {
                            TimeUnit.MILLISECONDS.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        
                        System.out.println("銷售了--" + tickets.remove(0));
                    }
                }
            }).start();
        }
    }
}
相當於把判斷和銷售都加到了一個原子操作里去了;可以解決問題;
不過加鎖后效率並不是很高;每銷售一張票的時候都要把整個隊列tickets鎖定;

 四、使用ConcurrentLinkedQueue提供並發性

/**
 * 使用ConcurrentQueue提高並發性
 * 
 * @author 馬士兵
 */
package yxxy.c_024;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class TicketSeller4 {
    static Queue<String> tickets = new ConcurrentLinkedQueue<>();
    
    static {
        for(int i=0; i<1000; i++) tickets.add("票 編號:" + i);
    }
    
    public static void main(String[] args) {
        
        for(int i=0; i<10; i++) {
            new Thread(()->{
                while(true) {
                    String s = tickets.poll();
                    if(s == null) {
                        break;
                    }else {
                        System.out.println("銷售了--" + s);
                    }
                }
            }).start();
        }
    }
}
這里面沒有加鎖,同樣的也有判斷,但是這個不會出問題;為什么?
因為在做了s==null判斷后,再也沒有對隊列進行修改操作;(上個程序都是做了判斷之后,需要對隊列進行修改操作remove一下)
假如A線程執行完String s = tickets.poll(),還沒有來得及執行if(s==null) break就被打斷了,另外一個線程把隊列拿空了,大不了while(true)返過頭來再拿一遍得到null,所以不會出問題;

 五、ConcurrentHashMap

 

復制代碼
/**
 * http://blog.csdn.net/sunxianghuang/article/details/52221913 
 * http://www.educity.cn/java/498061.html
 * 閱讀concurrentskiplistmap
 */
package yxxy.c_025;

import java.util.Arrays;
import java.util.Hashtable;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;

public class T01_ConcurrentMap {
    public static void main(String[] args) {
//        Map<String, String> map = new ConcurrentHashMap<>();
        Map<String, String> map = new ConcurrentSkipListMap<>(); //高並發並且排序
        
//        Map<String, String> map = new Hashtable<>();
        //Map<String, String> map = new HashMap<>(); //Collections.synchronizedXXX
        //TreeMap
        Random r = new Random();
        Thread[] ths = new Thread[100];
        CountDownLatch latch = new CountDownLatch(ths.length);
        long start = System.currentTimeMillis();
        for(int i=0; i<ths.length; i++) {
            ths[i] = new Thread(()->{
                for(int j=0; j<10000; j++) map.put("a" + r.nextInt(100000), "a" + r.nextInt(100000));
                latch.countDown();
            });
        }
        
        Arrays.asList(ths).forEach(t->t.start());
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        long end = System.currentTimeMillis();
        System.out.println(end - start);
    }
}
不同的Map容器執行完這段代碼的時間:
HashTable:445;
ConcurrentHashMap:402;
 
多線程的環境下ConcurrentHashMap的效率要比hashTable高一些,高在哪?
hashTable往里加任何一個數據的時候,都是要鎖定整個hashTable對象,而concurrentHashMap默認的是把容器分成16段,每次往里插數據的時候只鎖定16段其中的一個部分;把鎖細化了;當很多線程共同往里插數據的時候,線程A插的是其中一段,線程B是往另一段里插,那么這兩個線程就可以同時並發的往里插;因此多線程環境下要比hashTable高;
 
ConcurrentSkipListMap:是支持排序的,所以插入的時候慢了一些;
Collections.synchronizedList/Collections.synchronizedMap(Map<K, V>):往里面傳一個不加鎖的Map,將它包裝一下,返回一個加了鎖的Map;
 
注:以上所有的map,都可以換成set;因為set只是使用了map的key。
注2:currentHashMap不是絕對的線程安全(在put的時候get會出問題(生產環境實驗所得))

 六:copyOnWriteList

復制代碼
/**
 * 寫時復制容器 copy on write
 * 多線程環境下,寫時效率低,讀時效率高
 * 適合寫少讀多的環境
 * @author 馬士兵
 */
package yxxy.c_025;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Vector;
import java.util.concurrent.CopyOnWriteArrayList;

public class T02_CopyOnWriteList {
    public static void main(String[] args) {
        List<String> lists = 
                //new ArrayList<>(); //這個會出並發問題!
                //new Vector();
                new CopyOnWriteArrayList<>();
        Random r = new Random();
        Thread[] ths = new Thread[100];
        
        for(int i=0; i<ths.length; i++) {
            Runnable task = new Runnable() {
    
                @Override
                public void run() {
                    for(int i=0; i<1000; i++) lists.add("a" + r.nextInt(10000));
                }
                
            };
            ths[i] = new Thread(task);
        }
        
        runAndComputeTime(ths);
        
        System.out.println(lists.size());
    }
    
    static void runAndComputeTime(Thread[] ths) {
        long s1 = System.currentTimeMillis();
        Arrays.asList(ths).forEach(t->t.start());
        Arrays.asList(ths).forEach(t->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long s2 = System.currentTimeMillis();
        System.out.println(s2 - s1);
    }
}

 

用於讀少寫多的場景 

 七、ConcurrentLinkedQueue:

package yxxy.c_025;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class T04_ConcurrentQueue {
    public static void main(String[] args) {
        Queue<String> strs = new ConcurrentLinkedQueue<>();
        
        for(int i=0; i<10; i++) {
            strs.offer("a" + i);  //add
        }
        
        System.out.println(strs);
        
        System.out.println(strs.size());
        
        System.out.println(strs.poll());
        System.out.println(strs.size());
        
        System.out.println(strs.peek());
        System.out.println(strs.size());
        
        //雙端隊列Deque
    }
}
[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
10
a0
9
a1
9
Queue:隊列,在並發容器里面最重要的也是應用的最多的容器;有很多種實現,ConcurrentLinkedQueue,BlockingQueue;
常見操作:
offer: 類似於add方法,但是add方法加的時候會出問題,如果有容量的限制話add就會拋異常;offer不會拋異常,返回值boolean代表是否加成功;
poll(): 從頭部拿出來一個元素,同時把原來的刪掉;
peek(): 從頭部拿出來一個,但是原來的不刪;

 

八、LinkedBlockingQueue和ArrayBlockingQueue
package yxxy.c_025;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class T05_LinkedBlockingQueue {

    static BlockingQueue<String> strs = new LinkedBlockingQueue<>();

    static Random r = new Random();

    public static void main(String[] args) {
        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                try {
                    strs.put("a" + i); //如果滿了,就會等待
                    TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "p1").start();

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就會等待
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "c" + i).start();

        }
    }
}
復制代碼
Queue在高並發的情況下可以使用兩種隊列:
ConcurrentLinkedQueue:內部加鎖的
BlockingQueue:阻塞式隊列,如LinkedBlockingQueue,ArrayBlockingQueue。阻塞式的意思是,生產者消費者模式中生產者已經生產滿了直接等待wait,消費如果空了消費者就會直接等待。
LinkedBockingQueue是鏈表實現的阻塞式容器,是無界隊列(往里扔多少個元素都可以,內存滿足的情況下)
ArrayBlockingQueue:有界隊列
package yxxy.c_025;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class T06_ArrayBlockingQueue {

    static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10); //有界隊列,最多裝10個元素

    static Random r = new Random();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            strs.put("a" + i);
        }
        
        strs.put("aaa"); //滿了就會等待,程序阻塞,無限制的阻塞下去
        //strs.add("aaa");  //報異常,Queue full
        //strs.offer("aaa"); //不會報異常,但是加不進去;boolean帶表是否加成功;這是add和offer的區別 
        //strs.offer("aaa", 1, TimeUnit.SECONDS); //1s鍾之后加不進去就加不進了;按時間段阻塞
        
        System.out.println(strs);
    }
}
復制代碼

 九:DelayQueue

 

package yxxy.c_025;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class T07_DelayQueue {

    static BlockingQueue<MyTask> tasks = new DelayQueue<>();

    static Random r = new Random();
    
    static class MyTask implements Delayed {
        long runningTime;
        
        MyTask(long rt) {
            this.runningTime = rt;
        }

        @Override
        public int compareTo(Delayed o) {
            if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
                return -1;
            else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) 
                return 1;
            else 
                return 0;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        @Override
        public String toString() {
            return "" + runningTime;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        long now = System.currentTimeMillis();
        MyTask t1 = new MyTask(now + 1000);
        MyTask t2 = new MyTask(now + 2000);
        MyTask t3 = new MyTask(now + 1500);
        MyTask t4 = new MyTask(now + 2500);
        MyTask t5 = new MyTask(now + 500);
        
        tasks.put(t1);
        tasks.put(t2);
        tasks.put(t3);
        tasks.put(t4);
        tasks.put(t5);
        
        System.out.println(tasks);
        
        for(int i=0; i<5; i++) {
            System.out.println(tasks.take());
        }
    }
}

 

console

[1534606492700, 1534606493200, 1534606493700, 1534606494700, 1534606494200]
1534606492700
1534606493200
1534606493700
1534606494200
1534606494700
DelayQueue:無界隊列,加進去的每一個元素,如果理解為一個任務的話,這個元素什么時候可以讓消費者往外拿呢?每一個元素記載着我還有多長時間可以從隊列中被消費者拿走;這個隊列默認是排好順序的,等待的時間最長的排在最前面,先往外拿;
DelayQueue往里添加的元素是要實現Delayed接口;

 可以用來執行定時任務;

 

十、TransferQueue:  
TransferQueue:提供了transfer方法,一般是這種情形,有一個隊列,消費者線程先啟動,然后生產者生產一個東西的時候不是往隊列里頭仍,它首先去找有沒有消費者,如果有消費者,生產的東西不往隊列里扔了而是直接給消費者消費;如果沒有消費者的話,調用transfer線程就會阻塞;
 
比如場景:坦克大戰中多個坦克客戶端鏈接服務器,坦克A移動了,服務端需要把A移動的位置消息發送給其他客戶端,服務端存在一個消息隊列,消息都交給不同的線程處理,有一種是都往消息隊列里扔,然后再往外拿,不過這種太慢了;假如有一大推消費者線程等着,那么直接把消息扔給消費者線程就行了,不要再往隊列里扔了,效率會更高一些;所以TransferQueue是用在更高的並發的情況下。
 
例子程序:
1.先起消費者,在起生產者transfer,程序正常:
package yxxy.c_025;

import java.util.concurrent.LinkedTransferQueue;

public class T08_TransferQueue {
    public static void main(String[] args) throws InterruptedException {
        LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
        
        new Thread(() -> {
            try {
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        strs.transfer("aaa");
    }
}
復制代碼

 2.如果先起生產者transfer,然后再起消費者take,程序就會阻塞住了:

package yxxy.c_025;

import java.util.concurrent.LinkedTransferQueue;

public class T08_TransferQueue {
    public static void main(String[] args) throws InterruptedException {
        LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
        
        strs.transfer("aaa");

        new Thread(() -> {
            try {
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

3.如果transfer換成put(或者add、offer),也不會有問題,因為不會阻塞:

package yxxy.c_025;

import java.util.concurrent.LinkedTransferQueue;

public class T08_TransferQueue {
    public static void main(String[] args) throws InterruptedException {
        LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
        
        //strs.transfer("aaa");
        
        strs.put("aaa");

        new Thread(() -> {
            try {
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

十一、SynchronousQueue

package yxxy.c_025;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

public class T09_SynchronusQueue { //容量為0
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> strs = new SynchronousQueue<>();
        
        new Thread(()->{
            try {
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        
        strs.put("aaa"); //阻塞等待消費者消費
        //strs.add("aaa");
        System.out.println(strs.size());
    }
}
SynchronousQueue:同步隊列,一種特殊的transferQueue,前面說的TransferQueue如果生產者生產了東西,這時候沒有消費者,如果使用put/add,還可以扔到隊列里,這個隊列還是有一定的容量的;
而SynchronousQueue叫做沒有容量的隊列,容量為0,生產者生產的東西必須馬上消費掉,如果不消費掉就會出問題;調add拋異常(Queue full),調put程序阻塞;

 

總結:
1:對於map/set的選擇使用
HashMap 不需要多線程的情況下使用
TreeMap 不需要多線程的情況下使用
LinkedHashMap 不需要多線程的情況下使用

Hashtable 並發量比較小
Collections.sychronizedXXX 並發量比較小

ConcurrentHashMap 高並發
ConcurrentSkipListMap 高並發同時要求排好順序

2:隊列
ArrayList 不需要同步的情況
LinkedList 不需要同步的情況
Collections.synchronizedXXX 並發量低
Vector 並發量低
CopyOnWriteList 寫的時候少,讀時候多
Queue
CocurrentLinkedQueue //concurrentArrayQueue 高並發隊列
BlockingQueue 阻塞式
LinkedBQ 無界
ArrayBQ 有界
TransferQueue 直接給消費者線程,如果沒有消費者阻塞
SynchronusQueue 特殊的transferQueue,容量0
DelayQueue執行定時任務

 

 二十三:高並發線程池

一、認識Executor、ExecutorService、Callable、Executors

/**
 * 認識Executor
 */
package yxxy.c_026;

import java.util.concurrent.Executor;

public class T01_MyExecutor implements Executor {

    public static void main(String[] args) {
        new T01_MyExecutor().execute(new Runnable(){

            @Override
            public void run() {
                System.out.println("hello executor");
            }
            
        });
    }

    @Override
    public void execute(Runnable command) {
        //new Thread(command).run();
        command.run();
    }

}
Executor執行器是一個接口,只有一個方法execute執行任務,在java的線程池的框架里邊,這個是最頂層的接口;
ExecutorService:從Executor接口繼承。
Callable:里面call方法,和Runnable接口很像,設計出來都是被其他線程調用的;但是Runnable接口里面run方法是沒有返回值的也不能拋出異常;而call方法有返回值可以拋異常;
Executors: 操作Executor的一個工具類;以及操作ExecutorService,ThreadFactory,Callable等;

 二、ThreadPool:

 

/**
 * 線程池的概念
 */
package yxxy.c_026;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class T05_ThreadPool {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(5); //execute submit
        for (int i = 0; i < 6; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(service);
        
        service.shutdown();
        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());
        System.out.println(service);
        
        TimeUnit.SECONDS.sleep(5);
        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());
        System.out.println(service);
    }
}
創建了一個線程池,扔了5個線程,接下來要執行6個任務,扔進去線程池里面就啟一個線程幫你執行一個,因為這里最多就起5個線程,接下來扔第6個任務的時候,不好意思,它排隊了,排在線程池所維護的一個任務隊列里面,任務隊列大多數使用的都是BlockingQueue,這是線程池的概念;
有什么好處?好處在於如果這個任務執行完了,這個線程不會消失,它執行完任務空閑下來了,如果有新的任務來的時候,直接交給這個線程來運行就行了,不需要新啟動線程;從這個概念上講,如果你的任務和線程池線程數量控制的比較好的情況下,你不需要啟動新的線程就能執行很多很多的任務,效率會比較高,並發性好;
 
service.shutdown():關閉線程池,shutdown是正常的關閉,它會等所有的任務都執行完才會關閉掉;還有一個是shutdownNow,二話不說直接就給關了,不管線程有沒有執行完;
service.isTerminated(): 代表的是這里所有執行的任務是不是都執行完了。isShutdown()為true,注意它關了但並不代表它執行完了,只是代表正在關閉的過程之中(注意打印Shutting down)
打印5個線程名字,而且第一個線程執行完了之后,第6個任務來了,第1個線程繼續執行,不會有線程6;
 
當所有線程全部執行完畢之后,線程池的狀態為Terminated,表示正常結束,complete tasks=6
 
線程池里面維護了很多線程,等着你往里扔任務,而扔任務的時候它可以維護着一個任務列表,還沒有被執行的任務列表,同樣的它還維護着另外一個隊列,complete tasks,結束的任務隊列,任務執行結束扔到這個隊列里,所以,一個線程池維護着兩個隊列;

 三、Future 

/**
 * 認識future
 */
package yxxy.c_026;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

public class T06_Future {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        /*FutureTask<Integer> task = new FutureTask<Integer>(new Callable<Integer>(){
            @Override
            public Integer call() throws Exception {
                TimeUnit.MILLISECONDS.sleep(3000);
                return 1000;
            }
        });*/
        
        FutureTask<Integer> task = new FutureTask<>(()->{
            TimeUnit.MILLISECONDS.sleep(3000);
            return 1000;
        });
        
        new Thread(task).start();
        
        System.out.println(task.get()); //阻塞
        
        //*******************************
        ExecutorService service = Executors.newFixedThreadPool(5);
        Future<Integer> f = service.submit(()->{
            TimeUnit.MILLISECONDS.sleep(5000);
            return 1;
        });
        System.out.println(f.isDone());
        System.out.println(f.get());
        System.out.println(f.isDone());
        
    }
}

 

console

1000
false
1
true


 Future: ExecutorService里面有submit方法,它的返回值是Future類型,因為你扔一個任務進去需要執行一段時間,未來的某一個時間點上,任務執行完了產生給你一個結果,這個Future代表的就是那個Callable的返回值;

四、並行計算的例子:

/**
 * 線程池的概念
 * nasa
 */
package yxxy.c_026;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class T07_ParallelComputing {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long start = System.currentTimeMillis();
        List<Integer> results = getPrime(1, 200000); 
        long end = System.currentTimeMillis();
        System.out.println(end - start);
        
        final int cpuCoreNum = 4;
        
        ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
        
        MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20(越大的數計算是不是素數的時間越長)
        MyTask t2 = new MyTask(80001, 130000);
        MyTask t3 = new MyTask(130001, 170000);
        MyTask t4 = new MyTask(170001, 200000);
        
        Future<List<Integer>> f1 = service.submit(t1);
        Future<List<Integer>> f2 = service.submit(t2);
        Future<List<Integer>> f3 = service.submit(t3);
        Future<List<Integer>> f4 = service.submit(t4);
        
        start = System.currentTimeMillis();
        f1.get();
        f2.get();
        f3.get();
        f4.get();
        end = System.currentTimeMillis();
        System.out.println(end - start);
    }
    
    static class MyTask implements Callable<List<Integer>> {
        int startPos, endPos;
        
        MyTask(int s, int e) {
            this.startPos = s;
            this.endPos = e;
        }
        
        @Override
        public List<Integer> call() throws Exception {
            List<Integer> r = getPrime(startPos, endPos);
            return r;
        }
        
    }
    
    //判斷是否是質數
    static boolean isPrime(int num) {
        for(int i=2; i<=num/2; i++) {
            if(num % i == 0) return false;
        }
        return true;
    }
    
    static List<Integer> getPrime(int start, int end) {
        List<Integer> results = new ArrayList<>();
        for(int i=start; i<=end; i++) {
            if(isPrime(i)) results.add(i);
        }
        
        return results;
    }
}
復制代碼

 

 

 console:

2280
818

五、CachedThreadPool 

package yxxy.c_026;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class T08_CachedPool {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        System.out.println(service);
        
        for (int i = 0; i < 2; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        
        System.out.println(service);
        
        TimeUnit.SECONDS.sleep(80); //cachedthreadPool里面的線程空閑狀態默認60s后銷毀,這里保險起見
        
        System.out.println(service);
        
        
    }
}

 

 console

java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
pool-1-thread-2
pool-1-thread-1
java.util.concurrent.ThreadPoolExecutor@7852e922[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]

 

 

FixedThreadPool為固定個數的線程池;
CachedThreadPool:剛開始一個線程都沒有,來一個任務就起一個線程,假設起了兩個線程A,B,如果來了第三個任務,這時候恰好線程B任務執行完了,線程池里面有空閑的,這時候直接讓線程池里空閑的線程B來執行;最多起多少個線程?你的系統能支撐多少個為止;默認的情況下,只要一個線程空閑的狀態超過60s,這個線程就自動的銷毀了,alivetime=60s;這個值也可以自己指定。

 

 六、SingleThreadExecutor

 

package yxxy.c_026;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class T09_SingleThreadPool {
    public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        for(int i=0; i<5; i++) {
            final int j = i;
            service.execute(()->{
                
                System.out.println(j + " " + Thread.currentThread().getName());
            });
        }
    }
}
0 pool-1-thread-1
1 pool-1-thread-1
2 pool-1-thread-1
3 pool-1-thread-1
4 pool-1-thread-1
SingleThreadExecutor:線程池里就1個線程;扔5個任務,也永遠只有1個線程執行;
它能保證任務前后一定是順序執行,先扔的任務一定先執行完;只有等第一個任務執行完才執行第二個任務
用於順序執行任務

七、ScheduledThreadPool 

package yxxy.c_026;

import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class T10_ScheduledPool {
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
        service.scheduleAtFixedRate(()->{
            try {
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
}

 

用於定時重復執行 某個任務

 八、WorkStealingPool(工作竊取線程池,為精靈線程)

 

/**
 *
 */
package yxxy.c_026;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class T11_WorkStealingPool {
    public static void main(String[] args) throws IOException {
        ExecutorService service = Executors.newWorkStealingPool();
        int count = Runtime.getRuntime().availableProcessors();    //看cpu多少核的;如果是4核,默認就幫你起4個線程
        System.out.println(count);    
        
        service.execute(new R(1000));
        for(int i=0; i<count; i++){
            service.execute(new R(2000));
        }
        
        //由於產生的是精靈線程(守護線程、后台線程),主線程不阻塞的話,看不到輸出
        System.in.read(); 
    }

    static class R implements Runnable {
        int time;

        R(int t) {
            this.time = t;
        }

        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            System.out.println(time  + " " + Thread.currentThread().getName());
        }
    }
}

 

console

1000 ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-0
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-6
ForkJoinPool-1-worker-7
ForkJoinPool-1-worker-4
ForkJoinPool-1-worker-1

 

 工作竊取線程池:本來執行完自己的線程應該變為等待狀態,但是這個會去別的線程里面拿任務執行

 workStealing用於什么場景:就說任務分配的不是很均勻,有的線程維護的任務隊列比較長,有些線程執行完任務就結束了不太合適,所以他執行完了之后可以去別的線程維護的隊列里去偷任務,這樣效率更高。

 九、ForkJoinPool(有點類似歸並)

 ForkJoinPool: forkjoin的意思就是如果有一個難以完成的大任務,需要計算量特別大,時間特別長,可以把大任務切分成一個個小任務,如果小任務還是太大,它還可以繼續分,至於分成多少你可以自己指定,... 分完之后,把結果進行合並,最后合並到一起join一起,產生一個總的結果。而里面任務的切分你可以自己指定,線程的啟動根據你任務切分的規則,由ForkJoinPool這個線程池自己來維護。

 

 

package yxxy.c_026;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;

public class T12_ForkJoinPool {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();
    
    static {
        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }
        
        System.out.println(Arrays.stream(nums).sum()); //stream api 
    }
    
    
    static class AddTask extends RecursiveAction { 
        
        int start, end;
        
        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected void compute() {
            
            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                System.out.println("from:" + start + " to:" + end + " = " + sum);
            } else {
                int middle = start + (end-start)/2;
                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
            }
        }
    }
    
    public static void main(String[] args) throws IOException {
        ForkJoinPool fjp = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        fjp.execute(task);
        
        System.in.read();
        
    }
}
復制代碼
49494882
from:906250 to:937500 = 1545274
from:968750 to:1000000 = 1537201
from:593750 to:625000 = 1548289
from:718750 to:750000 = 1546396
from:468750 to:500000 = 1550373
from:843750 to:875000 = 1543421
from:218750 to:250000 = 1549856
from:93750 to:125000 = 1548384
from:562500 to:593750 = 1541814
from:812500 to:843750 = 1547885
from:187500 to:218750 = 1546831
from:687500 to:718750 = 1554064
from:437500 to:468750 = 1547434
from:937500 to:968750 = 1547676
from:875000 to:906250 = 1551839
from:62500 to:93750 = 1548576
from:531250 to:562500 = 1550943
from:656250 to:687500 = 1544991
from:156250 to:187500 = 1548367
from:406250 to:437500 = 1539881
from:125000 to:156250 = 1548128
from:500000 to:531250 = 1545229
from:781250 to:812500 = 1544296
from:625000 to:656250 = 1545283
from:375000 to:406250 = 1553931
from:31250 to:62500 = 1544024
from:750000 to:781250 = 1543573
from:343750 to:375000 = 1546407
from:0 to:31250 = 1539743
from:281250 to:312500 = 1549470
from:312500 to:343750 = 1552190
from:250000 to:281250 = 1543113
例子解釋:
對數組中100萬個數求和計算,第一種方式是普通的將所有數加在一起(for循環);
第二種方式使用ForkJoinPool計算,分而治之,它里面執行的任務必須是ForkJoinTask,這個任務可以自動進行切分,一般用的時候從RecursiveAction或RecursiveTask繼承,RecursiveTask遞歸任務,因為它切分任務還可以在切分。RecursiveAction沒有返回值,RecursiveTask有返回值。
 
例子2:
package yxxy.c_026;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;

public class T12_ForkJoinPool {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();
    
    static {
        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }
        
        System.out.println(Arrays.stream(nums).sum()); //stream api 
    }
    
    
    static class AddTask extends RecursiveTask<Long> {
        
        int start, end;
        
        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected Long compute() {
            
            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                return sum;
            } 
            
            int middle = start + (end-start)/2;
            
            AddTask subTask1 = new AddTask(start, middle);
            AddTask subTask2 = new AddTask(middle, end);
            subTask1.fork();
            subTask2.fork();
            
            return subTask1.join() + subTask2.join();
        }
    }
    
    public static void main(String[] args) throws IOException {
        ForkJoinPool fjp = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        fjp.execute(task);
        
        long result = task.join();
        System.out.println(result);
    }
}
49498457
49498457

 

和例子1差不多,唯一的區別是有返回值了,RecursiveTask<V>中的V泛型就是返回值類型。
long result = task.join(),因為join本身就是阻塞的,只有等所有的都執行完了,最后才得出總的執行結果。所以不需要System.in.read了;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM