Java基礎——多線程


Java基礎-多線程

多個線程一起做同一件事情,縮短時間,提升效率
提高資源利用率
加快程序響應,提升用戶體驗

創建線程

1. 繼承Thread類

  • 步驟

    • 繼承Thread類,重寫run方法

    • 調用的時候,直接new一個對象,然后調start()方法啟動線程

  • 特點

    • 由於是繼承方式,所以不建議使用,因為Java是單繼承的,不夠靈活

    • Thread類本質也是實現Runnable接口(public class Thread implements Runnable)

  

2. 實現Runnable接口

  • 步驟

    • 實現Runnable接口,重寫run()方法

    • 創建Runnable實現類的實例,並用這個實例作為Thread的target來創建Thread對象

    • 調用Thread類實例對象的start()方法啟動線程

  • 特點

    • 只是實現,保留了繼承的其他類的能力

    • 如果需要訪問當前線程,必須使用Thread.currentThread()方法

  

3. 實現 Callable接口

  • 步驟

    • 實現Callable接口,重寫call()方法

    • 創建Callable實現類的實例,使用FutureTask類來包裝Callable對象

    • 並用FutureTask實例作為Thread的target來創建Thread對象

    • 調用Thread類實例對象的start()方法啟動線程

    • 調用FutureTask類實例對象的get()方法獲取異步返回值

  • 特點

    • call方法可以拋出異常

    • 只是實現,保留了繼承的其他類的能力

    • 如果需要訪問當前線程,必須使用Thread.currentThread()方法

    • 通過FutureTask對象可以了解任務執行情況,可取消任務的執行,還可獲取執行結果

  

4. 匿名內部類實現

  • 說明

    • 本質還是前面的方法,只是使用了匿名內部類來實現,簡化代碼

    • Callable接口之所以把FutureTask類的實例化寫出來,是因為需要通過task對象獲取返回值

  

參數傳遞

1. 通過構造方法傳遞數據

  通過前面的學習,可以看到,不管何種創建對象的方式,都需要新建立實例,所以我們可以通過構造函數傳入參數,並將傳入的數據使用類變量保存起來

  • 特點

    • 在線程運行之前,數據就已經傳入了

    • 使用構造參數,當參數較多時,使用不方便

    • 不同參數條件需要不同的構造方法,使得構造方法較多

2. 通過變量和方法傳遞數據

  在線程類里面定義一些列的變量,然后定義set方法,在新建實例之后,調用set方法傳遞參數

  • 特點

    • 在參數較多時使用方便,按需傳遞參數

3. 通過回調函數傳遞數據

  使用線程方法自己產生的變量值作為參數,去調取外部的方法,獲取返回數據的方式

  • 特點

    • 擁有獲取數據的主動權
線程同步

  要跨線程維護正確的可見性,只要在幾個線程之間共享非 final 變量,就必須使用線程同步

1. ThreadLocal

  ThreadLocal利用空間換時間,通過為每個線程提供一個獨立的變量副本,避免了資源等待,解決了變量並發訪問的沖突問題,提高了並發量。實現了線程間的數據隔離,但是線程間無法共享同一個資源

public class StudyThread {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        SyncTest syncTest = new SyncTest();
        ConcurrentHashMap<String, String> testConMap = new ConcurrentHashMap<>();
        for (int i = 0; i < 10; i++) {
            ThreadTest2 threadTest2 = new ThreadTest2();
            threadTest2.setSyncTest(syncTest);
            Thread threadTest = new Thread(threadTest2);
            threadTest.start();
        }
    }
}

//實現Runnable
class ThreadTest2 implements Runnable {
    private SyncTest syncTest;

    public void setSyncTest(SyncTest syncTest) {
        this.syncTest = syncTest;
    }

    @Override
    public void run() {
        syncTest.threadLocalTest(Thread.currentThread().getName());
    }
}

class SyncTest {
    private static ThreadLocal<String> threadLocal = new ThreadLocal<String>();

    public void threadLocalTest(String name) {
        try {
            System.out.println(name + "進入了threadLocal方法!");
            threadLocal.set(name);
            Thread.currentThread().sleep(100);
            System.out.println(threadLocal.get() + "離開了threadLocal方法!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2. synchronized

  不管synchronized是用來修飾方法,還是修飾代碼塊,其本質都是鎖定某一個對象。修飾方法時,鎖上的是調用這個方法的對象,即this;修飾代碼塊時,鎖上的是括號里的那個對象。每一個Java對象都有一個內置鎖,訪問synchronized代碼塊或synchronized方法的時候,線程都需要首先獲取到對象關聯的內置鎖,對於static方法,線程獲取的是類對象的內置鎖。

  • 特點

    • 鎖的對象越小越好
public class StudyThread {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        SyncTest syncTest = new SyncTest();
        ConcurrentHashMap<String, String> testConMap = new ConcurrentHashMap<>();
        for (int i = 0; i < 10; i++) {
            ThreadTest2 threadTest2 = new ThreadTest2();
            threadTest2.setSyncTest(syncTest);
            threadTest2.setTestConMap(testConMap);
            Thread threadTest = new Thread(threadTest2);
            threadTest.start();
        }
    }

}
//實現Runnable
class ThreadTest2 implements Runnable {
    private ConcurrentHashMap<String, String> testConMap;
    private SyncTest syncTest;

    public void setTestConMap(ConcurrentHashMap<String, String> testConMap) {
        this.testConMap = testConMap;
    }

    public void setSyncTest(SyncTest syncTest) {
        this.syncTest = syncTest;
    }

    @Override
    public void run() {
        //三個方法需要單獨測試,因為testConMap會相互影響

        //測試同步方法,鎖住的對象是syncTest
        //syncTest.testSyncMethod(testConMap,Thread.currentThread().getName());
        //測試同步代碼塊,鎖住的對象是testConMap
        //syncTest.testSyncObject(testConMap, Thread.currentThread().getName());
        //測試沒有鎖時執行請求是多么的混亂!!!
        //syncTest.testNoneSyncObject(testConMap, Thread.currentThread().getName());
    }
}
//同步測試方法類
class SyncTest {
    public synchronized void testSyncMethod(ConcurrentHashMap<String, String> testConMap, String name) {
        try {
            System.out.println(name + "進入了同步方法!");
            testConMap.put("name", name);
            Thread.currentThread().sleep(10);
            System.out.println(testConMap.get("name") + "離開了同步方法!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void testSyncObject(ConcurrentHashMap<String, String> testConMap, String name) {
        synchronized (testConMap) {
            try {
                System.out.println(name + "進入了同步代碼塊!");
                testConMap.put("name", name);
                Thread.currentThread().sleep(10);
                System.out.println(testConMap.get("name") + "離開了同步代碼塊!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void testNoneSyncObject(ConcurrentHashMap<String, String> testConMap, String name) {
        try {
            System.out.println(name + "進入了無人管轄區域!");
            testConMap.put("name", name);
            Thread.currentThread().sleep(10);
            System.out.println(testConMap.get("name") + "離開了無人管轄區域!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3. volatile

  • 特點

    • 保證可見性,有序性,不保證原子性

    • 它會強制將對緩存的修改操作立即寫入主存

    • volatile不適合復合操作(對變量的寫操作不依賴於當前值),否則需要保證只有單一線程能夠修改變量的值

    • 使用volatile關鍵字,可以禁止指令重排序(單例雙重檢查鎖)

public class StudyThread {
    static int v = 1;//volatile能夠保證變量的可見性

    public static void main(String[] args) {

        //改動線程
        new Thread(new Runnable() {
            public void run() {
                for (int i = 0; i < 10; i++) {
                    v++;//確保只有一個線程修改變量值
                    try {
                        Thread.currentThread().sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
        //檢測線程
        new Thread(new Runnable() {
            @Override
            public void run() {
                int old = 0;
                while (old < 11) {
                    if (old != v) {
                        old = v;
                        System.out.println("檢測線程:v的值變動為" + old);
                    }
                }
            }
        }).start();
    }
}

4. ReentrantLock

  • 說明
    • 目前ReentrantLock和synchronized性能上沒有什么差別

    • ReentrantLock需要手動加鎖和解鎖,且解鎖的操作盡量要放在finally代碼塊中,保證線程正確釋放鎖

    • ReentrantLock可以實現公平鎖,在鎖上等待時間最長的線程將獲得鎖的使用權,性能沒有非公平鎖性能好

    • ReentrantLock提供了一個可以響應中斷的獲取鎖的方法lockInterruptibly(),可以用來解決死鎖問題

    • ReentrantLock還提供了獲取鎖限時等待的方法tryLock(),使用該方法配合失敗重試機制來更好的解決死鎖問題

    • ReentrantLock結合Condition接口可以實現等待通知機制

public class StudyThread {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        //三段代碼逐一測試
        
        //測試tryLock
        Thread threadTest00 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    while(!lock.tryLock()){
                        System.out.println(Thread.currentThread().getName() + "沒有拿到鎖,繼續等待!");
                        Thread.sleep(50);
                    }
                    System.out.println(Thread.currentThread().getName() + "進入了lock代碼塊!");
                    Thread.currentThread().sleep(300);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(Thread.currentThread().getName() + "准備釋放鎖,並離開lock代碼塊!");
                    lock.unlock();
                }
            }
        });
        threadTest00.start();
        Thread threadTest01 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    while(!lock.tryLock()){
                        System.out.println(Thread.currentThread().getName() + "沒有拿到鎖,繼續等待!");
                        Thread.sleep(50);
                    }
                    System.out.println(Thread.currentThread().getName() + "進入了lock代碼塊!");
                    Thread.currentThread().sleep(300);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(Thread.currentThread().getName() + "准備釋放鎖,並離開lock代碼塊!");
                    lock.unlock();
                }
            }
        });
        threadTest01.start();

        //測試中斷鎖
        Thread threadTest02 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock.lockInterruptibly();
                    System.out.println(Thread.currentThread().getName() + "進入了lock代碼塊!");
                    Thread.currentThread().sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(Thread.currentThread().getName() + "准備釋放鎖,並離開lock代碼塊!");
                    lock.unlock();
                }
            }
        });
        threadTest02.start();
        Thread threadTest03 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock.lockInterruptibly();
                    System.out.println(Thread.currentThread().getName() + "進入了lock代碼塊!");
                    Thread.currentThread().sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(Thread.currentThread().getName() + "准備釋放鎖,並離開lock代碼塊!");
                    lock.unlock();
                }
            }
        });
        threadTest03.start();
        Thread.currentThread().sleep(20);
        threadTest02.interrupt();

        //測試condition
        Thread threadTest04 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock.lock();
                    System.out.println(Thread.currentThread().getName() + "進入了lock代碼塊,等待通知!");
                    condition.await();
                    System.out.println(Thread.currentThread().getName() + "收到通知,繼續執行!");
                    Thread.currentThread().sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(Thread.currentThread().getName() + "准備釋放鎖,並離開lock代碼塊!");
                    lock.unlock();
                }
            }
        });
        threadTest04.start();
        Thread threadTest05 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock.lock();
                    System.out.println(Thread.currentThread().getName() + "進入了lock代碼塊!");
                    Thread.currentThread().sleep(1000);
                    condition.signal();
                    System.out.println(Thread.currentThread().getName() + "發出通知!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(Thread.currentThread().getName() + "准備釋放鎖,並離開lock代碼塊!");
                    lock.unlock();
                }
            }
        });
        threadTest05.start();
    }

}

5. 線程安全的類

  Java中很多類說的線程安全指的是,它的每個方法單獨調用(即原子操作)都是線程安全的,但是代碼總體的互斥性並不受控制

  • 線程安全的類有以下幾類

    • Concurrentxxx

    • ThreadPoolExecutor

    • BlockingQueue和BlockingDeque

    • 原子類Atomicxxx—包裝類的線程安全類

    • CopyOnWriteArrayList和CopyOnWriteArraySet

    • 通過synchronized 關鍵字給方法加上內置鎖來實現線程安全:Timer,TimerTask,Vector,Stack,HashTable,StringBuffer

    • Collections中的synchronizedCollection(Collection c)方法可將一個集合變為線程安全:

      Map m=Collections.synchronizedMap(new HashMap());

線程池

線程池只能放入實現Runable或callable類線程,不能直接放入繼承Thread的類

1. Executors線程池的實現

  • 要點

    • 可能導致資源耗盡,OOM問題出現

    • 線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式(阿里巴巴java開發)

public class StudyThread {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //創建一個線程池,該線程池重用固定數量的從共享無界隊列中運行的線程
        //ExecutorService threadPool = Executors.newFixedThreadPool(20);
        //創建一個維護足夠的線程以支持給定的並行級別的線程池,線程的實際數量可以動態增長和收縮,工作竊取池不保證執行提交的任務的順序
        //ExecutorService threadPool = Executors.newWorkStealingPool(8);
        //創建一個使用從無界隊列運行的單個工作線程的執行程序。
        //ExecutorService threadPool = Executors.newSingleThreadExecutor();
        //創建一個根據需要創建新線程的線程池,但在可用時將重新使用以前構造的線程。如果沒有可用的線程,將創建一個新的線程並將其添加到該池中。未使用六十秒的線程將被終止並從緩存中刪除
        ExecutorService threadPool = Executors.newCachedThreadPool();
        //放入Runnable類線程
        for (int i = 0; i < 10; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("線程名:" + Thread.currentThread().getName());
                }
            });
        }
        //Thread.currentThread().sleep(1000);
        //放入Callable類線程
        List<Future<String>> futures = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Future<String> future = threadPool.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    System.out.println("線程名:" + Thread.currentThread().getName());
                    return Thread.currentThread().getName();
                }
            });
            futures.add(future);
        }
        threadPool.shutdown();
        for (Future future : futures) {
            System.out.println(future.get());
        }
    }
}

2. ThreadPoolExecutor創建線程池

  

  • 要點

    • 線程池空閑大小和最大線程數根據實際情況確定

    • keepAliveTime一般設置為0

    • unit一般設置為TimeUnit.SECONDS(其他的也行,反正是0)

    • 任務隊列需要指定大小,不要使用無界隊列,容易造成OOM-> new ArrayBlockingQueue<>(512)

    • ThreadFactory threadFactory使用系統默認的

    • 拒絕策略:

      • AbortPolicy:拋出RejectedExecutionException(該異常是非受檢異常,要記得捕獲)

      • DiscardPolicy:什么也不做,直接忽略

      • DiscardOldestPolicy:丟棄執行隊列中最老的任務,嘗試為當前提交的任務騰出位置

      • CallerRunsPolicy:直接由提交任務者執行這個任務

public class StudyThread {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int poolSize = Runtime.getRuntime().availableProcessors() * 2;
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
        RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy();
        ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize,
                0, TimeUnit.SECONDS,
                queue,
                policy);
        //放入Runnable類線程
        for (int i = 0; i < 10; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("線程名:" + Thread.currentThread().getName());
                }
            });
        }

        //放入Callable類線程
        List<Future<String>> futures = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Future<String> future = executorService.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    System.out.println("線程名:" + Thread.currentThread().getName());
                    return Thread.currentThread().getName();
                }
            });
            futures.add(future);
        }
        for (Future future:futures) {
            System.out.println(future.get());
        }

        //放入Callable類線程
        //使用CompletionService簡化獲取結果的操作,執行完一個任務,獲取一個結果,結果順序和執行順序相同
        CompletionService<String> ecs = new ExecutorCompletionService<String>(executorService);
        for (int i = 0; i < 10; i++) {
            Future<String> future = ecs.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    System.out.println("線程名:" + Thread.currentThread().getName());
                    return Thread.currentThread().getName();
                }
            });
        }
        for (int i = 0; i < 10; i++) {
            System.out.println(ecs.take().get());
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM