JDK的多線程與並發庫


1.創建多線程

public class MultiThread {

    public static void main(String[] args) {
        // 通過繼承Thread類
        Thread thread = new Thread(){
            @Override
            public void run() {
                while(true){
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("1:" + Thread.currentThread().getName());
                    System.out.println("2:" + this.getName());
                }
            }
        };
        thread.start();
        
        // 通過實現Runnable接口
        Thread thread2 = new Thread(new Runnable(){
            @Override
            public void run() {
                while(true){
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("1:" + Thread.currentThread().getName());

                }                
                
            }
        });
        thread2.start();
        
        // 如果既繼承runnable接口又實現了Thread類, 會執行哪個?
        new Thread(
                new Runnable(){
                    public void run() {
                        while(true){
                            try {
                                Thread.sleep(500);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println("runnable :" + Thread.currentThread().getName());

                        }                            
                    }
                }
        ){
            public void run() {
                while(true){
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("thread :" + Thread.currentThread().getName());

                }    
            }
        }.start();
        
    }
}

 

2.定時器Timer

定時任務就是靠多線程實現的

public class TimerTest {
    private static int count = 0;
   
public static void main(String[] args) { class MyTimerTask extends TimerTask{ @Override public void run() { count = (count+1)%2; System.out.println("bombing!"); new Timer().schedule(new MyTimerTask(),2000+2000*count); } } new Timer().schedule(new MyTimerTask(), 2000); while(true){ System.out.println(new Date().getSeconds()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }

 

3.互斥 synchronized 

保證線程安全(數據完整性)

public class MultiThreadMutex {

    public static void main(String[] args) {
        new MultiThreadMutex().init();
    }
    
    private void init(){
        final Outputer outputer = new Outputer();
        new Thread(new Runnable(){
            @Override
            public void run() {
                while(true){
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    outputer.output("javaIsAPurelyObjectOrientedProgrammingLanguage");
                }
                
            }
        }).start();
        
        new Thread(new Runnable(){
            @Override
            public void run() {
                while(true){
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    outputer.output3("c++IsAMulti-paradigmSystems-levelProgrammingLanguage");
                } 
            }
        }).start();
        
    }

    static class Outputer{
        
        public void output(String name){
            int len = name.length();
            synchronized (Outputer.class) 
            {
                for(int i=0;i<len;i++){
                    System.out.print(name.charAt(i));
                }
                System.out.println();
            }
        }
        
        public synchronized void output2(String name){
            int len = name.length();
            for(int i=0;i<len;i++){
                    System.out.print(name.charAt(i));
            }
            System.out.println();
        }
        
        public static synchronized void output3(String name){
            int len = name.length();
            for(int i=0;i<len;i++){
                    System.out.print(name.charAt(i));
            }
            System.out.println();
        }    
    }
}

 

4.同步 wait/notify

保證線程間執行次序

// 1. wait notify成對出現, 並且處於互斥鎖的范圍內
// 2. 要用while(condition)圍住mutex.wait(), 因為存在虛假喚醒
public class MultiThreadSynchronization {

    public static void main(String[] args) {
        
        final Business business = new Business();
        new Thread(
                new Runnable() {
                    @Override
                    public void run() {
                        for(int i=1;i<=50;i++){
                            business.sub(i);
                        }
                    }
                }
        ).start();
        
        for(int i=1;i<=50;i++){
            business.main(i);
        }
    }
}

class Business {
    private boolean bShouldSub = true;

    public synchronized void sub(int i) {
        while (!bShouldSub) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        for (int j = 1; j <= 10; j++) {
            System.out.println("sub thread sequence of " + j + ",loop of " + i);
        }
        bShouldSub = false;
        this.notify();
    }

    public synchronized void main(int i) {
        while (bShouldSub) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        for (int j = 1; j <= 100; j++) {
            System.out.println("main thread sequence of " + j + ",loop of " + i);
        }
        bShouldSub = true;
        this.notify();
    }
}

 5.線程間傳遞參數

共享變量

/ 多個線程共享變量      
// 以類中變量為中介; 以傳入的共同參數為中介; 匿名內部類以主線程main中變量為中介;
public class MultiThreadShareData {

    public static void main(String[] args) {
        // 傳入共享參數  每個線程執行相同的代碼
        ShareData1 data1 = new ShareData1();
        new Thread(data1).start();
        new Thread(data1).start();
        
        // 傳入共享參數
        ShareData2 data2 = new ShareData2();
        new Thread(new MyRunnable1(data2)).start();
        new Thread(new MyRunnable2(data2)).start();
        
        // 匿名內部類實現變量的寫法更簡潔, 不需要傳參
        final ShareData2 data3 = new ShareData2();
        new Thread(new Runnable(){
            @Override
            public void run() {
                data3.decrement();
            }
        }).start();
        new Thread(new Runnable(){
            @Override
            public void run() {
                data3.increment();
            }
        }).start();
    }
}
    
    
// 方式1. 如果每個線程執行相同的代碼 -> 多個Thread共享同一個runnable中的對象    少有可能
class ShareData1 implements Runnable {
    private int count = 100;
    @Override
    public void run() { 
        while (true) {
            synchronized(this) {
                count--;
            }
        }
    }
}


// 方式2.   
class ShareData2 {
    private int j = 0;
    public synchronized void increment() {   
        j++;
    }
    public synchronized void decrement() {
        j--;
    }
}
class MyRunnable1 implements Runnable {
    private ShareData2 data1;
    public MyRunnable1(ShareData2 data1) {
        this.data1 = data1;
    }
    public void run() {
        data1.decrement();
    }
}
class MyRunnable2 implements Runnable {
    private ShareData2 data1;
    public MyRunnable2(ShareData2 data1) {
        this.data1 = data1;
    }
    public void run() {
        data1.increment();
    }
}

管道

public class MultiThreadPipe {
    
    public static void main(String[] args) {
        PipedOutputStream pos = new PipedOutputStream();
        PipedInputStream pis = new PipedInputStream();
        try {
            pos.connect(pis);
        } catch (IOException e) {
            e.printStackTrace();
        }
        new Consumer(pis).start();
        new Producer(pos).start();
    }
}

class Producer extends Thread {
    private PipedOutputStream pos;
    public Producer(PipedOutputStream pos) {
        this.pos = pos;
    }

    public void run() {
        int i = 8;
        try {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            pos.write(i);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

class Consumer extends Thread {
    private PipedInputStream pis;
    public Consumer(PipedInputStream pis) {
        this.pis = pis;
    }
    public void run() {
        try {
            System.out.println(pis.read());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

 

6.ThreadLocal

該變量形式上共享, 但卻是by線程獨立

public class ThreadLocalExample {

    private static ThreadLocal<Integer> x = new ThreadLocal<Integer>(); 
    public static void main(String[] args) {
        for (int i = 0; i < 2; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    int data = new Random().nextInt();
                    System.out.println(Thread.currentThread().getName()
                            + " has put data :" + data);
                    x.set(data);
                    Person.getInstance().setName("name" + data);
                    Person.getInstance().setAge(data);
                    new A().print();
                    new B().print();
                }
            }).start();
        }
    }
    
    static class A{
        public void print(){
            int data = x.get();
            System.out.println("A from " + Thread.currentThread().getName() 
                    + " get data :" + data);
            Person myData = Person.getInstance();
            System.out.println("A from " + Thread.currentThread().getName() 
                    + " getMyData: " + myData.getName() + "," +
                    myData.getAge());
        }
    }
    
    static class B{
        public void print(){
            int data = x.get();            
            System.out.println("B from " + Thread.currentThread().getName() 
                    + " get data :" + data);
            Person myData = Person.getInstance();
            System.out.println("B from " + Thread.currentThread().getName() 
                    + " getMyData: " + myData.getName() + "," +
                    myData.getAge());            
        }        
    }
}

// javaBean的by線程的單例
class Person {
    private static ThreadLocal<Person> personThreadLocal = new ThreadLocal<Person>();   
    private Person(){}
    public static /*無需synchronized*/ Person getInstance(){
        Person instance = personThreadLocal.get();
        if(instance == null){
            instance = new Person();
            personThreadLocal.set(instance);
        }
        return instance;
    }
    
    private String name;
    private int age;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
}

ThreadLocal實現原理

public class ThreadLocalSimulation {

    private static Map<Thread, Integer> threadData = new HashMap<Thread, Integer>();  //核心

    public static void main(String[] args) {
        for (int i = 0; i < 2; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    int data = new Random().nextInt();
                    System.out.println(Thread.currentThread().getName()
                            + " has put data :" + data);
                    threadData.put(Thread.currentThread(), data);
                    new A().get();
                    new B().get();
                }
            }).start();
        }
    }
    
    static class A{
        public void get(){
            int data = threadData.get(Thread.currentThread());
            System.out.println("A from " + Thread.currentThread().getName() 
                    + " get data :" + data);
        }
    }
    
    static class B{
        public void get(){
            int data = threadData.get(Thread.currentThread());            
            System.out.println("B from " + Thread.currentThread().getName() 
                    + " get data :" + data);
        }        
    }
}

 

 7. 線程池

池化技術都是防止頻繁開關來提高系統性能, 代價是必須損耗一定空間來保存池

// 池化技術之線程池
public class ThreadPoolTest {

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(3);  // 限制線程數量
        //ExecutorService threadPool = Executors.newCachedThreadPool();  // 動態控制線程數量
        //ExecutorService threadPool = Executors.newSingleThreadExecutor();  // 跟一個線程類似, 但可以保證線程掛了有新線程接替
        for(int i=1; i<=10; i++){
            final int task = i;
            threadPool.execute(new Runnable(){
                @Override
                public void run() {
                    for(int j = 1; j <= 10; j++){
                        try {
                            Thread.sleep(20);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName() + " task:" + task + " loop:" + j);
                    }
                }
            });
        }
        System.out.println("all of 10 tasks have committed!");
        threadPool.shutdown();    // 如果是shutdownNow方法會停止正在執行的任務
        
        // 帶定時器的線程池 schedule方法:xx時間以后執行; scheduleAtFiexedRate方法:xx時間后每隔yy時間執行
        Executors.newScheduledThreadPool(3).scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("bombing!");

            }
        }, 6, 2, TimeUnit.SECONDS); 
    }

}

 

8. Callable接口與Future

能實現返回線程執行結果 的效果

// 返回結果的任務
public class CallableAndFuture {

    public static void main(String[] args) {
        // 其一
        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        Future<String> future = threadPool.submit(     // submit Callable<resultType>而非execute Runnable
                new Callable<String>() {
                    public String call() throws Exception {
                        // 模擬handling
                        Thread.sleep(2000);
                        return "hello";
                    };
                });
        System.out.println("等待結果");
        
        try {
            System.out.println("拿到結果:" + future.get());    //阻塞等待結果, 還有個get方法的重載版本,帶超時參數, 超時拋異常. future/get的特點在於, 我們可以把任務合理分解, 在需要任務結果時調用get
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
        threadPool.shutdown();  //不用該函數主線程是不會退出的
        
        
        // 其二
        // ExecutorCompletionService包裝線程池, take方法返回最先完成的Future任務
        ExecutorService threadPool2 =  Executors.newFixedThreadPool(10);
        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool2);
        for (int i = 1; i <= 10; i++) {
            final int seq = i;
            completionService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    // 模擬handling
                    Thread.sleep(new Random().nextInt(5000));
                    return seq;
                }
            });
        }
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println(completionService.take().get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        threadPool2.shutdown();
    }
}

 

9.Lock

ReentrantLock是具有synchronized功能的類
ReentrantReadWriteLock 粒度更細, 讀與讀不互斥, 寫與寫互斥,  讀與寫互斥

// 使用Lock改寫synchronized例子
public class LockTest {

    public static void main(String[] args) {
        new LockTest().init();
    }
    
    private void init(){
        final Outputer outputer = new Outputer();
        new Thread(new Runnable(){
            @Override
            public void run() {
                while(true){
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    outputer.output("javaIsAPurelyObjectOrientedProgrammingLanguage");
                }
                
            }
        }).start();
        
        
        new Thread(new Runnable(){
            @Override
            public void run() {
                while(true){
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    outputer.output("c++IsAMulti-paradigmSystems-levelProgrammingLanguage");
                }
                
            }
        }).start();
        
    }

    static class Outputer {
        Lock lock = new ReentrantLock();

        public void output(String name) {
            int len = name.length();
            lock.lock();
            try {
                for (int i = 0; i < len; i++) {
                    System.out.print(name.charAt(i));
                }
                System.out.println();
            } finally {
                lock.unlock();
            }
        }
    }
}

使用讀寫鎖模擬緩存

// 模擬緩存
// 加鎖解鎖要一致: 解沒加過的鎖會拋出異常; 加鎖不解會造成死鎖
public class CacheSimulation {

    public static void main(String[] args) {
        for (int i = 0; i < 10; ++i) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    String i = (String) getData("key");
                    // out.println()參數為常量無並發問題; 為表達式時存在並發問題
                    System.out.println(i);
                }
            }).start();
        }
    }
    
    private static Map<String, Object> cache = new HashMap<String, Object>();  //保存緩存
    private static ReadWriteLock rwl = new ReentrantReadWriteLock();
    
    public static Object getData(String key) {
        rwl.readLock().lock();       
        Object value = cache.get(key);
        if (value == null) {
            rwl.readLock().unlock(); 
            rwl.writeLock().lock();
            if (cache.get(key) == null) {    // 防止幾個線程都阻塞在writeLock.lock()
                value = "abcde";     // 模擬獲取數據
                System.out.println("get");
                cache.put(key, value);
            }
            rwl.writeLock().unlock();
        }
        return value;
    }
}

 

10.Condition

Condition具有wait/notify功能的類, 同樣要配合Lock使用. 但與synchronized的waitnotify不同, 這里同一個Lock下可以創建多個Condition對象, 來實現粒度更細的控制

一個condition

// 使用Condition改寫線程同步示例, Condition由Lock.newCondition()而來
// Condition.await/signal 對應 Mutex.wait/notify
public class ConditionTest {

    public static void main(String[] args) {

        final Business business = new Business();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 1; i <= 50; i++) {
                    business.sub(i);
                }
            }
        }).start();

        for (int i = 1; i <= 30; i++) {
            business.main(i);
        }

    }

    static class Business {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        private boolean bShouldSub = true;

        public void sub(int i) {
            lock.lock();
            try {
                while (!bShouldSub) {
                    try {
                        condition.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                for (int j = 1; j <= 10; j++) {
                    System.out.println("sub thread sequence of " + j + ",loop of " + i);
                }
                bShouldSub = false;
                condition.signal();
            } finally {
                lock.unlock();
            }
        }

        public void main(int i) {
            lock.lock();
            try {
                while (bShouldSub) {
                    try {
                        condition.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                for (int j = 1; j <= 20; j++) {
                    System.out.println("main thread sequence of " + j + ",loop of " + i);
                }
                bShouldSub = true;
                condition.signal();
            } finally {
                lock.unlock();
            }
        }

    }
}

兩個condition,  下面模擬了數組阻塞隊列

// 有界緩沖區/數組阻塞隊列 的模擬
class ArrayBlockingQueueSimulation {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();
    final Object[] items = new Object[100];  // 長度
    int putptr, takeptr, count;  // 初始為0

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[putptr] = x;
            if (++putptr == items.length) {
                putptr = 0;
            }
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();
            }
            Object x = items[takeptr];
            if (++takeptr == items.length) {
                takeptr = 0;
            }
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

三個condition,  如下實現了三個線程輪流執行

public class ThreeThreadsSynchronization {

    public static void main(String[] args) {

        final Business business = new Business();
        
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 1; i <= 50; i++) {
                    business.sub2(i);
                }

            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 1; i <= 50; i++) {
                    business.sub3(i);
                }
            }
        }).start();

        for (int i = 1; i <= 50; i++) {
            business.main(i);
        }

    }

    static class Business {
        Lock lock = new ReentrantLock();
        Condition condition1 = lock.newCondition();
        Condition condition2 = lock.newCondition();
        Condition condition3 = lock.newCondition();
        private int shouldSub = 1;

        public void sub2(int i) {
            lock.lock();
            try {
                while (shouldSub != 2) {
                    try {
                        condition2.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                for (int j = 1; j <= 20; j++) {
                    System.out.println("sub2 thread sequence of " + j + ",loop of " + i);
                }
                shouldSub = 3;
                condition3.signal();
            } finally {
                lock.unlock();
            }
        }

        public void sub3(int i) {
            lock.lock();
            try {
                while (shouldSub != 3) {
                    try {
                        condition3.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                for (int j = 1; j <= 10; j++) {
                    System.out.println("sub3 thread sequence of " + j + ",loop of " + i);
                }
                shouldSub = 1;
                condition1.signal();
            } finally {
                lock.unlock();
            }
        }

        public void main(int i) {
            lock.lock();
            try {
                while (shouldSub != 1) {
                    try {
                        condition1.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                for (int j = 1; j <= 30; j++) {
                    System.out.println("main thread sequence of " + j + ",loop of " + i);
                }
                shouldSub = 2;
                condition2.signal();
            } finally {
                lock.unlock();
            }
        }

    }
}

 

11. Semaphore

Semaphore信號量, 互斥鎖保證多個線程同時訪問同一個資源時的線程安全性, 信號量讓線程動態匹配現有資源數, 來保證同時訪問多個資源時的線程安全性, 並發更高. 

Lock是哪個線程拿哪個線程負責釋放; 信號量可以是一個線程獲取, 另一個線程釋放, 這個特性能用於死鎖恢復.

public class SemaphoreTest {
    
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 10; i++) {
            Runnable runnable = new Runnable() {
                public void run() {
                    try {
                        semaphore.acquire();  //線程進入時獲取信號量
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("線程" + Thread.currentThread().getName() + "進入,當前已有" + (3 - semaphore.availablePermits()) + "個並發");
                    try {
                        Thread.sleep((long) (Math.random() * 1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("線程" + Thread.currentThread().getName() + "即將離開");
                    semaphore.release();   //線程結束時釋放信號量
                    // 下面代碼有時候執行不准確,因為其沒有和上面的代碼合成原子單元
                    System.out.println("線程" + Thread.currentThread().getName() + "已離開,當前已有" + (3 - semaphore.availablePermits()) + "個並發");
                }
            };
            service.execute(runnable);
        }
service.shutdown(); } }

 

12. CyclicBarrier

多個線程階段點同步

public class CyclicBarrierTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final CyclicBarrier cb = new CyclicBarrier(3);
        for (int i = 0; i < 3; i++) {
            Runnable runnable = new Runnable() {
                public void run() {
                    try {
                        // 模擬handling
                        Thread.sleep((long) (Math.random() * 1000));
                        System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點1,當前已有" 
                        + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走" : "正在等候"));
                        cb.await();              //第一個同步點
                        Thread.sleep((long) (Math.random() * 1000));
                        System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點2,當前已有"
                        + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走" : "正在等候"));
                        cb.await();              //第二個同步點
                        Thread.sleep((long) (Math.random() * 1000));
                        System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點3,當前已有"
                        + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting() == 2 ? "都到齊了,繼續走" : "正在等候"));
                        cb.await();             //第三個同步點
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }
        service.shutdown();
    }
}

 

13.CountDownLatch

線程通過等待計數器歸零來實現同步   實現一個人/多個人等待一個人/多個人的完成

public class CountdownLatchTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final CountDownLatch cdOrder = new CountDownLatch(1);  //初始計數器的數為1
        final CountDownLatch cdAnswer = new CountDownLatch(3);
        for (int i = 0; i < 3; i++) {
            Runnable runnable = new Runnable() {
                public void run() {
                    try {
                        System.out.println("線程" + Thread.currentThread().getName() + "准備接受執行命令");
                        cdOrder.await();
                        System.out.println("線程" + Thread.currentThread().getName() + "已接到命令, 開始執行");
                        // 模擬handling
                        Thread.sleep((long) (Math.random() * 5000));
                        System.out.println("線程" + Thread.currentThread().getName() + "的分任務完成");
                        cdAnswer.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }
        try {
            Thread.sleep((long) (Math.random() * 5000));
            System.out.println("線程" + Thread.currentThread().getName() + "即將發送執行命令");
            cdOrder.countDown();
            System.out.println("線程" + Thread.currentThread().getName() + "已發送命令, 任務正在處理");
            cdAnswer.await();
            System.out.println("線程" + Thread.currentThread().getName() + "主管的所有任務完成");
        } catch (Exception e) {
            e.printStackTrace();
        }
        service.shutdown();
    }
}

 

14. Exchanger

兩個線程間互相交換數據

public class ExchangerTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final Exchanger<String> exchanger = new Exchanger<>();
        service.execute(new Runnable(){
            public void run() {
                try {                
                    String data1 = "王老吉";
                    System.out.println("線程" + Thread.currentThread().getName() + "正在把數據 " + data1 +" 換出去");
                    Thread.sleep((long)(Math.random()*2000));
                    String data2 = (String)exchanger.exchange(data1);
                    System.out.println("線程" + Thread.currentThread().getName() + "換回的數據為 " + data2);
                }catch(Exception e){
                    e.printStackTrace();
                }
            }    
        });
        
        service.execute(new Runnable(){
            public void run() {
                try {                
                    String data1 = "加多寶";
                    System.out.println("線程" + Thread.currentThread().getName() + "正在把數據 " + data1 +" 換出去");
                    Thread.sleep((long)(Math.random()*2000));                    
                    String data2 = (String)exchanger.exchange(data1);
                    System.out.println("線程" + Thread.currentThread().getName() + "換回的數據為 " + data2);
                }catch(Exception e){
                    e.printStackTrace();
                }                
            }    
        });        
        
        service.shutdown();
    }
}

 

15. 阻塞隊列

阻塞隊列實現了BlockingQueue接口,  是生產者消費者模型的典范, 通過鎖實現

put和take方法才具有阻塞功能
阻塞隊列與線程同步      :  兩個大小為1的空/滿阻塞隊列可以實現condition或wait/notify的效果
阻塞隊列與Semaphore :  阻塞隊列是一個線程存入數據, 一個線程取出數據; Semaphore一般用作同一線程獲取和釋放

public class BlockingQueueTest {
    
    public static void main(String[] args) {
        final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
        for (int i = 0; i < 2; i++) {
            new Thread() {
                public void run() {
                    while (true) {
                        try {
                            Thread.sleep((long) (Math.random() * 1000));
                            System.out.println(Thread.currentThread().getName() + "准備放數據!");
                            queue.put(1);
                            System.out.println(Thread.currentThread().getName() + "已經放入數據, " + "隊列目前有" + queue.size() + "個數據");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
        }

        new Thread() {
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(1000);
                        System.out.println(Thread.currentThread().getName() + "准備取數據!");
                        queue.take();
                        System.out.println(Thread.currentThread().getName()
                                + "已經取走數據, " + "隊列目前有" + queue.size() + "個數據");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
    }
}

兩個長度為1的空/滿隊列實現condition的效果

public class BlockingQueueImplSynchronization {

    public static void main(String[] args) {

        final Business business = new Business();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 1; i <= 20; i++) {
                    business.sub1(i);
                }
            }
        }).start();

        for (int i = 1; i <= 20; i++) {
            business.sub2(i);
        }

    }

    static class Business {

        BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1);
        BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1);
        
        {
            try {
                System.out.println("init");
                queue2.put(1);    //queue1為空  queue為滿
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public void sub1(int i) {
            try {
                queue1.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (int j = 1; j <= 10; j++) {
                System.out.println("sub thread sequece of " + j + ", loop of " + i);
            }
            try {
                queue2.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public void sub2(int i) {
            try {
                queue2.put(1);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            for (int j = 1; j <= 20; j++) {
                System.out.println("main thread sequece of " + j + ", loop of " + i);
            }
            try {
                queue1.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

16. 線程安全的非阻塞容器

並發集合
在JDK5之前, 多線程中對容器的操作部分需要手動加synchronized塊保證線程安全
稍微輕便點的方式是使用Collections.synchronizedXXX()生成集合. 實現原理:通過裝飾器模式在同名方法前添加synchronized(this), 來達到實現線程安全
但這是不完整的解決方案, 因為
1)裝飾類的迭代器相關的代碼沒有加synchronized. 涉及到迭代還依然需要手動加synchronized塊
2)迭代器遍歷過程中除該迭代器外不能用其他方式增刪元素(單線程在自身循環內, 多線程在不同線程執行不同部分), 否則拋出並發修改異常
3)最重要的, 並發低

// 在集合的迭代器迭代過程中, 除了迭代器外不能對集合進行修改, 否則會拋出ConcurrentModificationException
// ConcurrentModificationException的實現: 樂觀鎖, 記錄一個版本號, 版本號不對拋異常
public class ConcurrentModificationExceptionExample {
    
    public static void main(String[] args) {
        // Collection users = new CopyOnWriteArrayList();  //若使用同步集合, 非迭代器修改就正常
        Collection<User> users = new ArrayList<>();
        users.add(new User("張三", 28));
        users.add(new User("李四", 25));
        users.add(new User("王五", 31));
        Iterator itrUsers = users.iterator();

        while (itrUsers.hasNext()) {
            System.out.println("mark");
            User user = (User) itrUsers.next();
            if ("張三".equals(user.getName())) {
                users.remove(user);   // 非迭代器修改拋出異常
                //itrUsers.remove();  // 若使用迭代器修改, 則正常
            } else {
                System.out.println(user);
            }
        }
    }
} 

JDK5之后提出了很多線程安全的容器, 與前輩synchronized方式比起來, 它們的亮點並不是保證了線程安全, 而是它們在保證線程安全的同時盡量避免並發瓶頸

基本上限制條件多的容器都能實現Concurrent版本, 保持一定的讀寫並發;  像ArrayList LinkedList很難避開並發瓶頸, 退而求其次ArrayList實現了CopyOn保證了讀並發;

 LinkedList只能是通過Collections.synchronizedList()的synchronized方式(讀|讀都有鎖), 盡量用其他集合替代.

ps:Collections.synchronizedList()或Vector的區別:  1.擴容量不同Vector 100%, SynchronizedList 50%.  2.Vector已對迭代器加鎖, SynchronizedList需要手動加鎖

原有集合 並發集合 原理
HashMap ConcurrentHashMap 鎖分段技術
HashSet Collections.newSetFromMap(new ConcurrentHashMap()) 用map版本實現
TreeMap ConcurrentSkipListMap

用SkipList替代紅黑樹, CAS

TreeSet ConcurrentSkipListSet 用map版本實現
Queue接口 ConcurrentLinkedQueue  非阻塞 CAS
ArrayList CopyOnWriteArrayList 寫時復制, 提高了讀並發度;  以空間換取了部分寫並發, 這點好壞需測試
Set接口 CopyOnWriteArraySet 用ArrayList版本實現, 用addIfAbsent()方法實現元素去重,寫時還要復制, 因此寫效率不佳

 

 

 

 

 

 

 

 

CAS原理類似樂觀鎖, 處理器保證底層實現, 理念:多次嘗試肯定有一個能成(版本匹配則操作成功->該操作即具有原子性), 但會做很多無用功; 相比加鎖能提高並發

 

17. 原子類

AtomicInteger等原子類用的是CAS, 並發比加鎖高

實現多線程下安全性的要素:  原子性(不能被其他影響某變量的程序段打斷) +  內存可見性 (一個線程修改, 另一個線程馬上能看到)

synchronized: 實現了原子性和可見性 
volatile: 實現內存可見性   CAS: 實現原子性

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM