多線程與高並發


創建線程

new MyThread().start();
new Thread(new MyRun()).start();
new Thread(()->{
System.out.println("Hello Lambda!");
}).start();

常見方法

  • sleep
  • yield 讓出線程,從running到ready狀態
  • join 線程A中調用B.join(),表示A讓B先執行

線程狀態

總共6大塊,分別是:new,runnable,timedwaiting,waiting,blocked,terminated

synchronize

原理:對象頭(mark word)上有兩位(00,01),表示不同類型的鎖
同一個對象中的兩個非靜態synchronize方法,不能同時執行,由此可見,非靜態的synchronize方法鎖住的對象是當前對象,也就是和synchronize(this)等效
synchronize是可重入鎖:兩個synchronize方法A和B,A可以調用B,B也可以調用A
鎖定的區域發生異常,默認情況下鎖會被釋放,寫業務代碼時要小心
synchronize的底層實現:
JDK早期,重量級鎖
后來有鎖升級的概念,第一次只在對象頭記錄線程的id,叫偏向鎖;如果線程爭用,升級為自旋鎖;自旋10次之后,升級為重量級鎖,也就是系統鎖;
鎖無法降級;
自旋鎖是用戶態,加鎖和解鎖快;重量級鎖是內核態,效率低;
什么情況用自旋鎖,什么情況用系統鎖?運行時間長、線程多用重量級鎖;反之用自旋鎖

volatile

  • 線程可見性
  • 禁止指令重排
    單例模式雙重檢查鎖需要加volatile,答案是需要,原因就是指令重排(對象創建步驟可能重排序,如3和2交換:1、創建對象並賦初始值 2、賦值 3、變量指向堆內存)

其他並發控制類

  • ReentrantLock
    可以替換synchronize,功能比synchronize強大
    lock&unlock 不能自動解鎖,所以unlock一定要在finally中
    trylock
  • CountDownLatch
    功能和join一樣
    latch.countDown()
    latch.await()
  • CyclicBarrier
    滿足一定數據量的線程后,一起啟動
  • ReentrantReadWriteLock
    讀鎖=共享鎖 readWriteLock.readLock();
    寫鎖=排它鎖=互斥鎖 readWriteLock.writeLock();
    讀的時候,其他線程也可以讀;寫的時候,其他寫線程都阻塞
  • Semaphore
    信號量,燈亮了可以執行,滅了不能執行
    Semaphore s = new Semaphore(2);可以限流,最多2個線程執行

AQS

ReentrantLock、CountDownLatch、CyclicBarrier都是AQS的實現,AQS的核心是含有一個volatile修飾的state,這個state由實現類維護,和一個包含Thread的雙向鏈表(Node)

面試題:兩個線程交替打印A1B2C3D4(可重入鎖特性)

public class App {
    public static void main(String[] args) {
        App app = new App();
        Thread t1 = new Thread(app::printABC);
        Thread t2 = new Thread(app::print123);
        t1.start();
        t2.start();
    }

    public synchronized void printABC() {
        String str = "ABCDEFG";
        for (int i = 0; i < str.length(); i++) {
            System.out.println(str.charAt(i));
            this.notify();
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.notify();
    }

    public synchronized void print123() {
        String str = "1234567";
        for (int i = 0; i < str.length(); i++) {
            System.out.println(str.charAt(i));
            this.notify();
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.notify();
    }
}

生產者消費者實現

public class MyContainer2<T> {
    final private LinkedList<T> lists = new LinkedList<>();
    final private int MAX = 10; //最多10個元素
    private int count = 0;

    private Lock lock = new ReentrantLock();
    private Condition producer = lock.newCondition();
    private Condition consumer = lock.newCondition();

    public void put(T t) {
        try {
            lock.lock();
            while (lists.size() == MAX) { //想想為什么用while而不是用if?
                producer.await();
            }
            lists.add(t);
            ++count;
            consumer.signalAll(); //通知消費者線程進行消費
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public T get() {
        T t = null;
        try {
            lock.lock();
            while (lists.size() == 0) {
                consumer.await();
            }
            t = lists.removeFirst();
            count--;
            producer.signalAll(); //通知生產者進行生產
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return t;
    }

    public static void main(String[] args) {
        MyContainer2<String> c = new MyContainer2<>();
        //啟動消費者線程
        for (int i = 0; i < 1; i++) {
            new Thread(() -> {
                while (true) {
                    System.out.println(c.get() + " OUT:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " left:" + c.count);
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        //生產者延遲2秒啟動
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //啟動生產者線程
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                while (true) {
                    c.put(Thread.currentThread().getName() + " IN:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "product" + i).start();
        }
    }
}

自定義線程池

public class App3 {
    public static void main(String[] args) {
        App3 app3 = new App3();
        app3.test();
    }

    public void test() {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                3, 3, 60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10000),
                new MyThreadFactory("MyThreadGroup"),
                new MyRejectedExecutionHandler());

        for (int i = 1; i <= 10; i++) {
            pool.execute(new MyThread(i));
        }
        pool.shutdown();
    }

    class MyThread implements Runnable {
        int id;

        public MyThread(int id) {
            this.id = id;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " id=" + id);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "id=" + id;
        }
    }

    class MyThreadFactory implements ThreadFactory {
        String threadGroup;
        AtomicInteger nextId = new AtomicInteger(1);

        MyThreadFactory(String threadGroup) {
            this.threadGroup = threadGroup;
        }

        @Override
        public Thread newThread(Runnable r) {
            String threadName = threadGroup + "-" + nextId.getAndIncrement();
            Thread thread = new Thread(r, threadName);
            return thread;
        }
    }

    class MyRejectedExecutionHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("任務被拒絕 " + r);
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM