實戰體會多線程


線程

首先了解線程的五大狀態:新建,就緒,運行,阻塞,終結。

1、新建狀態(New):新創建了一個線程對象。

2、就緒狀態(Runnable):線程對象創建后,其他線程調用了該對象的start()方法。該狀態的線程位於“可運行線程池”中,變得可運行,只等待獲取CPU的使用權。即在就緒狀態的進程除CPU之外,其它的運行所需資源都已全部獲得。

3、運行狀態(Running):就緒狀態的線程獲取了CPU,執行程序代碼。

4、阻塞狀態(Blocked):阻塞狀態是線程因為某種原因放棄CPU使用權,暫時停止運行。直到線程進入就緒狀態,才有機會轉到運行狀態。
阻塞的情況分三種:
(1)、等待阻塞:運行的線程執行wait()方法,該線程會釋放占用的所有資源,JVM會把該線程放入“等待池”中。進入這個狀態后,是不能自動喚醒的,必須依靠其他線程調用notify()或notifyAll()方法才能被喚醒,
(2)、同步阻塞:運行的線程在獲取對象的同步鎖時,若該同步鎖被別的線程占用,則JVM會把該線程放入“鎖池”中。
(3)、其他阻塞:運行的線程執行sleep()或join()方法,或者發出了I/O請求時,JVM會把該線程置為阻塞狀態。當sleep()狀態超時、join()等待線程終止或者超時、或者I/O處理完畢時,線程重新轉入就緒狀態。

5、死亡狀態(Dead):線程執行完了或者因異常退出了run()方法,該線程結束生命周期。
線程變化的狀態轉換圖如下:

線程的實現方法

實現有兩種方式,一是繼承Thread類,二是實現Runnable接口,但不管怎樣, 當我們new了這個對象后,線程就進入了初始狀態; 當該對象調用了start()方法,就進入就緒狀態!

在工作中,因為需要獲取返回值的線程,接觸到了callable接口

Callable需要實現的是call()方法,而不是run()方法,返回值的類型有Callable的類型參數指定, Callable只能由ExecutorService.submit() 執行,正常結束后將返回一個future對象,實例如下:

ExecutorService pool = Executors.newFixedThreadPool(10); 
Callable<String> c1 = new YkuploadRunable(");
Future<String> f1 = pool.submit(c1);  
System.out.println(f1.get());  		 
pool.shutdown();

線程的經典問題:生產者和消費者問題

生產者可以將產品放入倉庫,消費者則可以從倉庫中取走產品,要有保護倉庫為空或溢出

解決生產者/消費者問題的方法可分為兩類:

1.采用某種機制保護生產者和消費者之間的同步;

2.在生產者和消費者之間建立一個管道。

實現方法有

1.object.wait(),object.notify();

2.condition.await(),condition.signal();

3.LinkedBlockingQueue<Object> list;  put(),take()會自動阻塞

方法一的具體代碼

倉庫.java

public class Storage
{
    // 倉庫最大存儲量
    private final int MAX_SIZE = 100;

    // 倉庫存儲的載體
    private LinkedList<Object> list = new LinkedList<Object>();

    // 生產產品
    public void produce(String producer)
    {
        synchronized (list)
        {
            // 如果倉庫已滿
            while (list.size() == MAX_SIZE)
            {
                System.out.println("倉庫已滿,【"+producer+"】: 暫時不能執行生產任務!");
                try
                {
                    // 由於條件不滿足,生產阻塞
                    list.wait();
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }

            // 生產產品            
            list.add(new Object());            

            System.out.println("【"+producer+"】:生產了一個產品\t【現倉儲量為】:" + list.size());

            list.notifyAll();
        }
    }

    // 消費產品
    public void consume(String consumer)
    {
        synchronized (list)
        {
            //如果倉庫存儲量不足
            while (list.size()==0)
            {
                System.out.println("倉庫已空,【"+consumer+"】: 暫時不能執行消費任務!");
                try
                {
                    // 由於條件不滿足,消費阻塞
                    list.wait();
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
            
            list.remove();
            System.out.println("【"+consumer+"】:消費了一個產品\t【現倉儲量為】:" + list.size());
            list.notifyAll();
        }
    }

    public LinkedList<Object> getList()
    {
        return list;
    }

    public void setList(LinkedList<Object> list)
    {
        this.list = list;
    }

    public int getMAX_SIZE()
    {
        return MAX_SIZE;
    }
}

生產者.java 

public class Producer extends Thread
{
    private String producer;
    private Storage storage;

    public Producer(Storage storage)
    {
        this.storage = storage;
    }

    @Override
    public void run()
    {
        produce(producer);
    }

    public void produce(String producer)
    {
        storage.produce(producer);
    }

    public String getProducer()
    {
        return producer;
    }

    public void setProducer(String producer)
    {
        this.producer = producer;
    }

    public Storage getStorage()
    {
        return storage;
    }

    public void setStorage(Storage storage)
    {
        this.storage = storage;
    }
}

消費者.java

public class Consumer extends Thread
{
    private String consumer;
    private Storage storage;

    public Consumer(Storage storage)
    {
        this.storage = storage;
    }

    @Override
    public void run()
    {
        consume(consumer);
    }

    public void consume(String consumer)
    {
        storage.consume(consumer);
    }

    public Storage getStorage()
    {
        return storage;
    }

    public void setStorage(Storage storage)
    {
        this.storage = storage;
    }

    public String getConsumer() {
        return consumer;
    }

    public void setConsumer(String consumer) {
        this.consumer = consumer;
    }
}

方法二的實現如下:注意lock的lock(),unlock()

public class Storage {
    // 倉庫最大存儲量
    private final int MAX_SIZE = 100;

    // 倉庫存儲的載體
    private LinkedList<Object> list = new LinkedList<Object>();
    // 鎖
    private final Lock lock = new ReentrantLock();

    // 倉庫滿的條件變量
    private final Condition full = lock.newCondition();

    // 倉庫空的條件變量
    private final Condition empty = lock.newCondition();

    // 生產產品
    public void produce(String producer) {
        lock.lock();
        // 如果倉庫已滿
        while (list.size() == MAX_SIZE) {
            System.out.println("倉庫已滿,【" + producer + "】: 暫時不能執行生產任務!");
            try {
                // 由於條件不滿足,生產阻塞
                full.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 生產產品
        list.add(new Object());

        System.out.println("【" + producer + "】:生產了一個產品\t【現倉儲量為】:" + list.size());

        empty.signalAll();

        // 釋放鎖
        lock.unlock();

    }

    // 消費產品
    public void consume(String consumer) {
        // 獲得鎖
        lock.lock();

        // 如果倉庫存儲量不足
        while (list.size() == 0) {
            System.out.println("倉庫已空,【" + consumer + "】: 暫時不能執行消費任務!");
            try {
                // 由於條件不滿足,消費阻塞
                empty.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        list.remove();
        System.out.println("【" + consumer + "】:消費了一個產品\t【現倉儲量為】:" + list.size());
        full.signalAll();
        
        // 釋放鎖
        lock.unlock();

    }

    public LinkedList<Object> getList() {
        return list;
    }

    public void setList(LinkedList<Object> list) {
        this.list = list;
    }

    public int getMAX_SIZE() {
        return MAX_SIZE;
    }
}

方法三的實現如下:

import java.util.concurrent.LinkedBlockingQueue;

public class Storage {
    // 倉庫最大存儲量
    private final int MAX_SIZE = 100;

    // 倉庫存儲的載體
    private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(100);  

    // 生產產品
    public void produce(String producer) {
        // 如果倉庫已滿
        if (list.size() == MAX_SIZE) {
            System.out.println("倉庫已滿,【" + producer + "】: 暫時不能執行生產任務!");            
        }

        // 生產產品
        try {
            list.put(new Object());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println("【" + producer + "】:生產了一個產品\t【現倉儲量為】:" + list.size());
    }

    // 消費產品
    public void consume(String consumer) {
        // 如果倉庫存儲量不足
        if (list.size() == 0) {
            System.out.println("倉庫已空,【" + consumer + "】: 暫時不能執行消費任務!");            
        }

        try {
            list.take();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("【" + consumer + "】:消費了一個產品\t【現倉儲量為】:" + list.size());        

    }

    public LinkedBlockingQueue<Object> getList() {
        return list;
    }

    public void setList(LinkedBlockingQueue<Object> list) {
        this.list = list;
    }
    public int getMAX_SIZE() {
        return MAX_SIZE;
    }
}

線程同步

同步鎖synchronized

上面的wait(),await()

還有一種join方法,使線程順序進行。

線程實戰

因為需要在controller中同時上傳多個視頻並保存返回的視頻id進數據庫,所以在controller中寫了一個內部類,具體代碼如下:

class Ykupload implements Runnable {

@Override
public void run() {
System.out.println(tpVideoLibrary.toString());
YkuploadRunable yp = new YkuploadRunable();
//獲取yp的數據並寫進數據庫
}
}

后記:貌似不能同時上傳同一個文件,多個視頻上傳速度還是取決於你的帶寬!.

線程池

線程池,顧名思義存放線程的池子,可以類比數據庫的連接池。因為頻繁地創建和銷毀線程會給服務器帶來很大的壓力。若能將創建的線程不再銷毀而是存放在池中等待下一個任務使用,可以不僅減少了創建和銷毀線程所用的時間,提高了性能,同時還減輕了服務器的壓力。

初始化線程池有五個核心參數,分別是 corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue。還有兩個默認參數 threadFactory, handler
corePoolSize:線程池初始核心線程數。初始化線程池的時候,池內是沒有線程,只有在執行任務的時會創建線程。
maximumPoolSize:線程池允許存在的最大線程數。若超過該數字,默認提示RejectedExecutionException異常
keepAliveTime:當前線程數大於核心線程時,該參數生效,其目的是終止多余的空閑線程等待新任務的最長時間。即指定時間內將還未接收任務的線程銷毀。
unit:keepAliveTime 的時間單位
workQueue:緩存任務的的隊列,一般采用LinkedBlockingQueue。
threadFactory:執行程序創建新線程時使用的工廠,一般采用默認值。
handler:超出線程范圍和隊列容量而使執行被阻塞時所使用的處理程序,一般采用默認值。

在接收任務前,線程池內是沒有線程。只有當任務來了才開始新建線程。當任務數大於核心線程數時,任務進入隊列中等待。若隊列滿了,則線程池新增線程直到最大線程數。再超過則會執行拒絕策略。

舉個栗子:

/**
若有Thread1、Thread2、Thread3、Thread4四條線程分別統計C、D、E、F四個盤的大小,所有線程都統計完畢交給Thread5線程去做匯總,應當如何實現
**/

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ITDragonThreads {  
    
    public static void main(String[] args) throws Exception {  
        // 無緩沖無界線程池
        ExecutorService executor = Executors.newFixedThreadPool(8); 
        /**
         * CompletionService與ExecutorService最主要的區別在於 
         *前者submit的task不一定是按照加入時的順序完成的。CompletionService對ExecutorService進行了包裝, 
         *內部維護一個保存Future對象的BlockingQueue。 
         *只有當這個Future對象狀態是結束的時候,才會加入到這個Queue中,take()方法其實就是Producer-Consumer中的Consumer。 
         *它會從Queue中取出Future對象,如果Queue是空的,就會阻塞在那里,直到有 完成的Future對象加入到Queue中。 
         *所以,先完成的必定先被取出。這樣就減少了不必要的等待時間。 
        **/
        CompletionService<Long> completion = new ExecutorCompletionService<Long>(executor);  
        CountWorker countWorker = null;  
        for (int i = 0; i < 4; i++) { // 四個線程負責統計
            countWorker = new CountWorker(i+1);  
            completion.submit(countWorker);  
        }  
        // 關閉線程池
       //shutdown()阻止新來的任務提交,對已經提交了的任務不會產生任何影響。當已經提交的任務執行完后,它會將那些閑置的線程(idleWorks)進行中斷,這個過程是異步的。
        executor.shutdown();  
        // 主線程相當於第五個線程,用於匯總數據
        long total = 0;  
        for (int i = 0; i < 4; i++) { 
            total += completion.take().get(); 
        }  
        System.out.println(total / 1024 / 1024 / 1024 +"G");  
    }  
}  
  
class CountWorker implements Callable<Long>{  
    private Integer type;  
    public CountWorker() {
    }
    public CountWorker(Integer type) {
        this.type = type;
    }

    @Override  
    public Long call() throws Exception {  
        ArrayList<String> paths = new ArrayList<>(Arrays.asList("c:", "d:", "e:", "f:"));
        return countDiskSpace(paths.get(type - 1));  
    }  
    
    // 統計磁盤大小
    private Long countDiskSpace (String path) {  
        File file = new File(path);  
        long totalSpace = file.getTotalSpace();  
        System.out.println(path + " 總空間大小 : " + totalSpace / 1024 / 1024 / 1024 + "G");  
        return totalSpace;
    }  
}  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM