Java 並發與多線程


Java 並發與多線程

基本概念

並發與並行

  1. 並發:指兩個或多個事件在同一時間間隔內發生 。當有多個線程在操作時,如果系統只有一個CPU,則它根本不可能真正同時進行一個以上的線程,它只能把CPU運行時間划分成若干個時間段,再將時間 段分配給各個線程執行,在一個時間段的線程代碼運行時,其它線程處於掛起狀。這種方式稱之為並發(Concurrent)
  2. 並行:指兩個或者多個事件在同一時刻發生 。當系統有一個以上CPU時,則線程的操作有可能非並發。當一個CPU執行一個線程時,另一個CPU可以執行另一個線程,兩個線程互不搶占CPU資源,可以同時進行,這種方式稱之為並行(Parallel)

進程與線程

  1. 一個程序可能有多個進程,一個進程由多個線程和共享資源組成
  2. 進程:擁有資源的基本單位
  3. 線程:獨立調度分派的基本單位

線程

創建線程

Thread

  1. 繼承 Thread 類(Thread 實現了 Runnable 接口)
  2. 重寫 run 方法
  3. start() 方法啟動線程

Runnable

  1. 實現 Runnable 接口
  2. 重寫 run 方法
  3. new Thread(Runnable target),new Thread(Runnable target,String name)

多個 Thread 實例共用一個 Runnable,這些線程的 run 方法相同,可以共享相同的數據

但是存在線程同步問題

public class RunnableTest implements Runnable
{
    private int ticket = 10;
    public void run()
    {
        while (true)
        {
            if (ticket > 0)
            {
                System.out.println(Thread.currentThread().getName() + "售出" + ticket + "號票");
                ticket--;
            }
            else System.exit(0);
        }
    }
    public static void main(String[] args)
    {
        RunnableTest rt = new RunnableTest();
        Thread t1 = new Thread(rt, "1號窗口");
        Thread t2 = new Thread(rt, "2號窗口");
        t1.start();
        t2.start();
    }
}

print

1號窗口售出10號票
1號窗口售出9號票
1號窗口售出8號票
1號窗口售出7號票
2號窗口售出7號票
2號窗口售出5號票
1號窗口售出6號票
2號窗口售出4號票
1號窗口售出3號票
2號窗口售出2號票
1號窗口售出1號票

匿名類

匿名類可以方便的訪問方法的局部變量,但是必須聲明為 final,因為匿名類和普通局部變量生命周期不一致

jdk7 中已不再需要顯示聲明為 final,實際上被虛擬機自動隱式聲明了

public static void main(String[] args)
{
    new Thread( )
    {
        public void run( )
        {
            //內容
        }
    }.start( );
    new Thread(new Runnable( )
    {
        public void run( )
        {
            //內容
        }
	}).start( );
}

Callable

  1. 創建 Callable 的實現類,並沖寫 call() 方法,該方法為線程執行體,並且該方法有返回值

  2. 創建 Callable 實現類的實例,並用 FutuerTask 類來包裝 Callable 對象,該 FutuerTask 封裝了 Callable 對象 call() 方法的返回值

  3. 實例化 FutuerTask 類,參數為 FutuerTask 接口實現類的對象來啟動線程

  4. 通過 FutuerTask 類的對象的 get() 方法來獲取線程結束后的返回值

    public class CallableTest implements Callable<Integer>
    {
        //重寫執行體 call( )
        public Integer call( ) throws Exception
        {
            int i = 0;
            for (; i < 10; i++)
            {
               //
            }
            return i;
        }
        public static void main(String[] args)
        {
            Callable call = new CallableTest( );
            FutureTask<Integer> f = new FutureTask<Integer>(call);
            Thread t = new Thread(f);
            t.start( );
            //得到返回值
            try
            {
                System.out.println("返回值:" + f.get( ));
            }
            catch (Exception e)
            {
                e.printStackTrace( );
            }
        }
    }
    

    print

    返回值:10
    

線程方法

  1. 線程執行體:run()

  2. 啟動線程:start()

  3. Thread 類方法

    方法 描述
    public final void setName(String name) 改變線程名稱
    public final void setPriority(int priority) 設置優先級
    public final void setDaemon(boolean on) 設為守護線程,當只剩下守護線程時自動結束
    public final boolean isAlive() 測試線程是否處於活動狀態
    public static void yield() 暫停當前線程(回到就緒狀態)
    public static void sleep(long millisec) 進入休眠狀態
    public final void join() 暫停當前線程,等待調用該方法線程執行完畢
    public final void join(long millisec) 暫停當前線程指定時間
    public static Thread currentThread() 返回對當前正在執行的線程對象的引用

線程狀態

  1. 就緒狀態:

    • start() 方法進入就緒狀態,等待虛擬機調度
    • 運行狀態調用 yield 方法會進入就緒狀態
    • lock 池中的線程獲得鎖后進入就緒狀態
  2. 運行狀態:就緒狀態經過線程調度進去運行狀態

  3. 阻塞狀態:

    • 休眠:調用 sleep 方法
    • 對象 wait 池:調用 wait 或 join 方法,被 notify 后進入 lock 池
    • 對象 lock 池:未獲得鎖
  4. 死亡狀態:run 方法執行完畢

    graph TB T(新線程)--start方法-->A(就緒狀態) A--線程調度-->B(運行狀態) B--yield方法-->A B--sleep方法-->D(阻塞:休眠) B--wait或join方法-->E(阻塞:wait池) B--未獲得鎖-->F(阻塞:lock池) B--run方法執行完-->C(死亡狀態) D--時間到-->A E--notify方法-->F F--獲得鎖-->A

線程同步

保證程序原子性、可見性、有序性的過程

阻塞同步

基於加鎖爭用的悲觀並發策略

synchronized

  1. synchronized 含義

    • 使用 synchronized 可以鎖住某一對象, 當其他線程也想鎖住該對象以執行某段代碼時,必須等待已經持有鎖的線程釋放鎖

    • 釋放鎖的方式有互斥代碼執行完畢、拋出異常、鎖對象調用 wait 方法

  2. 不同的使用方式代表不同的鎖粒度

    • 修飾普通方法 = synchronized(this)
    • 修飾靜態方法 = synchronized(X.class)
    • 修飾代碼塊(對象 extends Object)

ReentrantLock

  1. 創建 Lock 鎖

    ReentrantLock 實現了 Lock 接口, Lock lock = new ReentrantLock()

  2. Lock 含義

    • 使用 lock() 方法表示當前線程占有 lock 對象

    • 釋放該對象要顯示掉用 unlock() 方法 ,多在 finally 塊中進行釋放

  3. trylock 方法

    • synchronized 會一直等待鎖,而 Lock 提供了 trylock 方法,在指定時間內試圖占用
    • 使用 trylock, 釋放鎖時要判斷,若占用失敗,unlock 會拋出異常
  4. Lock 的線程交互

    • 通過 lock 對象得到一個 Condition 對象,Condition condition = lock.newCondition()

    • 調用這個Condition對象的:await,signal,signalAll 方法

  5. 示例

    public class LockTest
    {
        public static void log(String msg)//日志方法
        {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = new Date( );
            String dateStr = sdf.format(date);
            System.out.println(dateStr + " " + Thread.currentThread( ).getName( ) + " " + msg);
        }
        public static void main(String[] args)
        {
            Lock lock = new ReentrantLock( );
            new Thread("t1")
            {
                public void run( )
                {
                    boolean flag = false;
                    try
                    {
                        log("線程已啟動");
                        log("嘗試占有lock");
                        flag = lock.tryLock(1, TimeUnit.SECONDS);
                        if (flag)
                        {
                            log("成功占有lock");
                            log("執行3秒業務操作");
                            Thread.sleep(3000);
                        }
                        else
                        {
                            log("經過1秒鍾嘗試,占有lock失敗,放棄占有");
                        }
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace( );
                    }
                    finally
                    {
                        if (flag)
                        {
                            log("釋放lock");
                            lock.unlock( );
                        }
                    }
                    log("線程結束");
                }
            }.start( );
            try
            {
                //先讓 t1 先執行兩秒
                Thread.sleep(2000);
            }
            catch (InterruptedException e1)
            {
                e1.printStackTrace( );
            }
            new Thread("t2")
            {
                public void run( )
                {
                    boolean flag = false;
                    try
                    {
                        log("線程啟動");
                        log("嘗試占有lock");
    
                        flag = lock.tryLock(1, TimeUnit.SECONDS);
                        if (flag)
                        {
                            log("成功占有lock");
                            log("執行3秒的業務操作");
                            Thread.sleep(3000);
                        }
                        else
                        {
                            log("經過1秒鍾的嘗試,占有lock失敗,放棄占有");
                        }
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace( );
                    }
                    finally
                    {
                        if (flag)
                        {
                            log("釋放lock");
                            lock.unlock( );
                        }
                    }
                    log("線程結束");
                }
            }.start( );
        }
    }
    

    print

    2019-11-07 15:50:01 t1 線程已啟動
    2019-11-07 15:50:01 t1 嘗試占有lock
    2019-11-07 15:50:01 t1 成功占有lock
    2019-11-07 15:50:01 t1 執行3秒業務操作
    2019-11-07 15:50:03 t2 線程啟動
    2019-11-07 15:50:03 t2 嘗試占有lock
    2019-11-07 15:50:04 t2 經過1秒鍾的嘗試,占有lock失敗,放棄占有
    2019-11-07 15:50:04 t2 線程結束
    2019-11-07 15:50:04 t1 釋放lock
    2019-11-07 15:50:04 t1 線程結束
    
  6. synchronized 和 Lock 區別

    • synchronized 是關鍵字,Lock 是接口, synchronized是內置的語言實現,Lock是代碼層面的實現
    • synchronized 執行完畢自動釋放鎖,Lock 需要顯示 unlock()
    • synchronized 會一直等待,嘗試占用鎖,Lock 可以使用 trylock,在一段時間內嘗試占用,時間到占用失敗則放棄

非阻塞同步

非阻塞同步是一種基於沖突檢測和數據更新的樂觀並發策略

actomic 類

  1. 原子操作

    • 原子操作是不可中斷的操作,必須一次性執行完成
    • 賦值操作是原子操作,但 a++ 不是原子操作, 而是取值、加一、賦值三個步驟
    • 一個線程取 i 的值后,還沒來得及加一,第二個線程也來取值,就產生了線程安全問題
  2. actomic 類的使用

    • jdk6 以后,新增包 java.util.concurrent.atomic,里面有各種原子類,比如 AtomicInteger
    • AtomicInteger 提供了各種自增,自減等方法,這些方法都是原子性的。換句話說,自增方法 incrementAndGet 是線程安全的
    • 10000 個線程做 value 加一的操作,用 a++ 方式得出不准確的結果,用原子類 AtomicInteger 的 addAndGet() 方法得出正確結果
    public class ThreadTest
    {
        static int value1 = 0;
        static AtomicInteger value2 = new AtomicInteger(0);//原子整型類
        public static void main(String[] args)
        {
            for (int i = 0; i < 100000; i++)
            {
                new Thread( )
                {
                    public void run( )
                    {
                        value1++;
                    }
                }.start( );
                new Thread( )
                {
                    public void run( )
                    {
                        value2.addAndGet(1);//value++的原子操作
                    }
                }.start( );
            }
            while (Thread.activeCount( ) > 2)
            {
                Thread.yield( );
            }
            System.out.println(value1);
            System.out.println(value2);
        }
    }
    

    print

    99996
    100000
    

無同步方案

如果一個方法不涉及共享數據,那么他天生就是線程安全的

可重入代碼

可以在代碼執行的任何時刻中斷它,轉而去執行另外一段代碼,在控制權返回之后,原來的程序不會出現任何的錯誤

  1. 一個方法返回結果是可以預測的,輸入了相同的數據,就能返回相同的結果,那這個方法就具有可重入性,也就是線程安全的

  2. 棧封閉是一種可重用代碼

    多個線程訪問同一個方法的局部變量時,不會出現線程安全問題,因為局部變量保存在虛擬機棧中,屬於線程的私有區域,所以不會出現線程安全性

    public class ThreadTest
    {
        static void add( )
        {
            int value = 0;
            for (int i = 0; i < 1000; i++)
            {
                value++;
            }
            System.out.println(value);
        }
    
        public static void main(String[] args)
        {
            ExecutorService threadPool = Executors.newCachedThreadPool( );
            threadPool.execute(( ) -> add( ));
            threadPool.execute(( ) -> add( ));
            threadPool.shutdown( );
        }
    }
    

    print

    1000
    1000
    

線程本地存儲

  1. 把共享數據的可見范圍限制在同一個線程之內,即便無同步也能做到避免數據爭用

  2. 使用 java.lang.ThreadLocal 類來實現線程本地存儲功能

    • ThreadLocal 變量是一個不同線程可以擁有不同值的變量,所有的線程可以共享一個ThreadLocal對象
    • 任意一個線程的 ThreadLocal 值發生變化,不會影響其他的線程
    • 用set()和get()方法對ThreadLocal變量進行賦值和查看其值
    public class ThreadLocalDemo
    {
        public static void main(String[] args)
        {
            ThreadLocal threadLocal1 = new ThreadLocal( );
            Thread t1 = new Thread(( ) ->
            {
                threadLocal1.set(1);
                try
                {
                    Thread.sleep(3000);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace( );
                }
                System.out.println(threadLocal1.get( ));
            });
            Thread t2 = new Thread(( ) -> threadLocal1.set(2));
            t1.start( );
            t2.start( );
        }
    }
    

    print

    1
    
  3. ThreadLocal 原理

    • 每個線程都有一個 ThreadLocal.ThreadLocalMap 對象,調用 threadLocal1.set(T value) 方法時,將 threadLoacl1 和 value 鍵值對存入 map
    • ThreadLocalMap 底層數據結構可能導致內存泄露,盡可能在使用 ThreadLocal 后調用 remove()方法

死鎖

死鎖條件

  1. 互斥條件
  2. 請求與保持條件
  3. 不可剝奪條件
  4. 循環等待條件(環路條件)

Java死鎖示例

public static void main(String[] args)
{
    Object o1 = new Object( );
    Object o2 = new Object( );

    Thread t1 = new Thread( )
    {
        public void run( )
        {
            synchronized (o1)//占有 o1
            {
                System.out.println("t1 已占有 O1");
                try
                {
                    Thread.sleep(1000);//停頓1000毫秒,另一個線程有足夠的時間占有 o1
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace( );
                }
                System.out.println("t1 試圖占有 o2");
                System.out.println("t1 等待中");
                synchronized (o2)
                {
                    System.out.println("t1 已占有 O2");
                }
            }
        }
    };
    Thread t2 = new Thread( )
    {
        public void run( )
        {
            synchronized (o2)  //占有 o2
            {
                System.out.println("t2 已占有 o2");
                try
                {
                    Thread.sleep(1000);//停頓1000毫秒,另一個線程有足夠的時間占有 o2
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace( );
                }
                System.out.println("t2 試圖占有 o1");
                System.out.println("t2 等待中");
                synchronized (o1)
                {
                    System.out.println("t2 已占有 O1");
                }
            }
        }
    };
    t1.start( );
    t2.start( );
}

print

t1 已占有 O1
t2 已占有 o2
t1 試圖占有 o2
t1 等待中
t2 試圖占有 o1
t2 等待中

線程通信

  1. Object 類方法

    方法 描述
    wait() 線程進入等待池
    notify() 喚醒等待當前線程鎖的線程
    notifyAll() 喚醒所有線程,優先級高的優先喚醒

    為什么這些方法設置在 Object 對象上?

    表面上看,因為任何對象都可以加鎖

    底層上說,java 多線程同步的 Object Monitor 機制,每個對象上都設置有類似於集合的數據結構,儲存當前獲得鎖的線程、等待獲得鎖的線程(lock set)、等待被喚醒的線程(wait set)

  2. 生產者消費者模型

    • sleep 方法,讓出 cpu,但不放下鎖
    • wait 方法,進入鎖對象的等待池,放下鎖
public class ProducerAndConsumer
{
    public static void main(String[] args)
    {
        Goods goods = new Goods();
        Thread producer = new Thread()//生產者線程
        {
            public void run()
            {
                while (true) goods.put();
            }
        };
        Thread consumer = new Thread()//消費者線程
        {
            public void run()
            {
                while (true) goods.take();
            }
        };
        consumer.start();
        producer.start();
    }
}
class Goods//商品類
{
    int num = 0;//商品數目
    int space = 10;//空位總數
    public synchronized void put()
    {
        if (num < space)//有空位可放,可以生產
        {
            num++;
            System.out.println("放入一個商品,現有" + num + "個商品," + (space - num) + "個空位");
            notify();//喚醒等待該鎖的線程
        }
        else//無空位可放,等待空位
        {
            try
            {
                System.out.println("沒有空位可放,等待拿出");
                wait();//進入該鎖對象的等待池
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }
    public synchronized void take()
    {
        if (num > 0)//有商品可拿
        {
            num--;
            System.out.println("拿出一個商品,現有" + num + "個商品," + (space - num) + "個空位");
            notify();//喚醒等待該鎖的線程
        }
        else///等待生產產品
        {
            try
            {
                System.out.println("沒有商品可拿,等待放入");
                wait();//進入該鎖對象的等待池
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }
}

print

沒有商品可拿,等待放入
放入一個商品,現有1個商品,9個空位
放入一個商品,現有2個商品,8個空位
拿出一個商品,現有1個商品,9個空位
放入一個商品,現有2個商品,8個空位
放入一個商品,現有3個商品,7個空位
放入一個商品,現有4個商品,6個空位
拿出一個商品,現有3個商品,7個空位
放入一個商品,現有4個商品,6個空位
···

線程池

線程的啟動和結束都是比較消耗時間和占用資源的,如果在系統中用到了很多的線程,大量的啟動和結束動作會嚴重影響性能

線程池很像生產者消費者模式,消費的對象是一個一個的能夠運行的任務

  1. 設計思路

    • 准備任務容器,可用 List,存放任務
    • 線程池類構造方法中創建多個執行者線程
    • 任務容器為空時,所有線程 wait
    • 當外部線程向任務容器加入任務,就會有執行者線程被 notify
    • 執行任務完畢后,沒有接到新任務,就回歸等待狀態
  2. 實現一個線程池

    public class ThreadPool
    {
        int poolSize;// 線程池大小
        LinkedList<Runnable> tasks = new LinkedList<Runnable>();// 任務容器
        public ThreadPool(int poolSize)
        {
            this.poolSize = poolSize;
            synchronized (tasks)//啟動 poolSize 個任務執行者線程
            {
                for (int i = 0; i < poolSize; i++)
                {
                    new ExecuteThread("執行者線程 " + i).start();
                }
            }
        }
        public void add(Runnable r)//添加任務
        {
            synchronized (tasks)
            {
                tasks.add(r);
                System.out.println("加入新任務");
                tasks.notifyAll();// 喚醒等待的任務執行者線程
            }
        }
        class ExecuteThread extends Thread//等待執行任務的線程
        {
            Runnable task;
            public ExecuteThread(String name)
            {
                super(name);
            }
            public void run()
            {
                System.out.println("啟動:" + this.getName());
                while (true)
                {
                    synchronized (tasks)
                    {
                        while (tasks.isEmpty())
                        {
                            try
                            {
                                tasks.wait();
                            }
                            catch (InterruptedException e)
                            {
                                e.printStackTrace();
                            }
                        }
                        task = tasks.removeLast();
                        tasks.notifyAll(); // 允許添加任務的線程可以繼續添加任務
                    }
                    System.out.println(this.getName() + " 接到任務");
                    task.run();//執行任務
                }
            }
        }
        public static void main(String[] args)
        {
            ThreadPool pool = new ThreadPool(3);
            for (int i = 0; i < 5; i++)
            {
                Runnable task = new Runnable()//創建任務
                {
                    public void run()//任務內容
                    {
                        System.out.println(Thread.currentThread().getName()+" 執行任務");
                    }
                };
                pool.add(task);//加入任務
                try
                {
                    Thread.sleep(1000);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }
    }
    

    print

    main 加入新任務
    啟動:執行者線程 0
    執行者線程 0 接到任務
    執行者線程 0 執行任務
    啟動:執行者線程 1
    啟動:執行者線程 2
    main 加入新任務
    執行者線程 2 接到任務
    執行者線程 2 執行任務
    main 加入新任務
    執行者線程 2 接到任務
    執行者線程 2 執行任務
    
  3. java 線程池類

    • 默認線程池類 ThreadPoolExecutor 在 java.util.concurrent 包下

      ThreadPoolExecutor threadPool= new ThreadPoolExecutor(10, 15, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
      /*
      第一個參數 int 類型, 10 表示這個線程池初始化了 10 個線程在里面工作
      第二個參數 int 類型, 15 表示如果 10 個線程不夠用了,就會自動增加到最多 15個 線程
      第三個參數 60 結合第四個參數 TimeUnit.SECONDS,表示經過 60 秒,多出來的線程還沒有接到任務,就會回收,最后保持池子里就 10 個
      第五個參數 BlockingQueue 類型,new LinkedBlockingQueue() 用來放任務的集合
      */
      
    • execute() 方法添加新任務

      public class TestThread 
      {   
          public static void main(String[] args) throws InterruptedException 
          {
              ThreadPoolExecutor threadPool= new ThreadPoolExecutor(10, 15, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
              threadPool.execute(new Runnable()
              {//添加任務
                  public void run() 
                  {
                      System.out.println("執行任務");
                  }    
              });
          }
      }
      
  4. java 中幾種線程池

    java 線程池的頂級接口是 Executor ,子接口是 ExecutorService ,子接口使用更廣泛

    Executors 類提供了一系列工廠方法用於創建線程池,返回的線程池實現了 ExecutorService 接口

    • newCachedThreadPool有緩沖的線程池,線程數 JVM 控制,有線程可使用時不會創建新線程
    • newFixedThreadPool,固定大小的線程池,任務量超過線程數時,任務存入等待隊列
    • newScheduledThreadPool,創建一個線程池,可安排在給定延遲后運行命令或者定期地執行
    • newSingleThreadExecutor,只有一個線程,順序執行多個任務,若意外終止,則會新創建一個
    ExecutorService threadPool = null;
    threadPool = Executors.newCachedThreadPool();//緩沖線程池
    threadPool = Executors.newFixedThreadPool(3);//固定大小的線程池
    threadPool = Executors.newScheduledThreadPool(2);//定時任務線程池
    threadPool = Executors.newSingleThreadExecutor();//單線程的線程池
    threadPool = new ThreadPoolExecutor(···);//默認線程池,多個可控參數
    

線程安全類

  1. StringBuffer:內部方法用 synchronized 修飾
  2. Vetort:繼承於 AbstractList
  3. Stack:繼承於 Vector
  4. HashTable:繼承於 Dictionary,實現了 Map 接口
  5. Property:繼承於 HashTable,實現了 Map 接口
  6. concurrentHashMap:分段加鎖機制


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM