線程基本知識
1.開啟多線程的兩種方式
-
繼承Thread類
-
實現Runnable接口
public class NewThread {
public static void main(String[] args) {
new Thread1().start();
new Thread(new Thread2()).start();
}
}
class Thread1 extends Thread {
JDK關於多線程的開啟方式的說明
/* There are two ways to create a new thread of execution. One is to
* declare a class to be a subclass of <code>Thread</code>. This
* subclass should override the <code>run</code> method of class
* <code>Thread</code>. An instance of the subclass can then be
* allocated and started
....
* The other way to create a thread is to declare a class that
* implements the <code>Runnable</code> interface. That class then
* implements the <code>run</code> method. An instance of the class can
* then be allocated, passed as an argument when creating
* <code>Thread</code>, and started.
*/
2.守護線程
設置線程為守護線程之后,線程會在主線程結束后直接結束,所以finally
代碼塊將不再保證能執行
public class DaemonThread {
public static void main(String[] args) throws InterruptedException {
DaemonThread1 daemonThread1 = new DaemonThread1();
daemonThread1.setDaemon(true);
daemonThread1.start();
Thread.sleep(1);
daemonThread1.interrupt();
}
}
class DaemonThread1 extends Thread {
3.中斷線程
安全中斷線程interrupt()
,只是提示線程該中斷了,並不保證一定中斷
public class InterruptThread {
public static void main(String[] args) throws InterruptedException {
InterruptThread1 thread1 = new InterruptThread1();
Thread thread = new Thread(new InterruptThread2());
thread1.start();
thread.start();
Thread.sleep(1);
thread1.interrupt();
thread.interrupt();
Thread thread = new Thread(new InterruptThread3());
thread.start();
Thread.sleep(1);
thread.interrupt();
}
}
class InterruptThread1 extends Thread {
差異:
isInterrupted()
Thread.interrupted()
判斷線程是否被中斷,如果線程處於阻塞狀態,線程在檢查中斷標示時如果發現中斷標示為true,會拋異常,這個方法會將中斷標志位重置為false
4.Sleep對Lock的影響
Sleep不會釋放鎖
public class ThreadSleepOnLock {
private static final Object lock = new Object();
public static void main(String[] args) {
new ThreadSleepOnLock1().start();
new ThreadSleepOnLock2().start();
}
private static class ThreadSleepOnLock1 extends Thread {
5.Join
使用join()
進行插隊,保證線程執行的有序性
public class ThreadJoin {
public static void main(String[] args) {
Join2 join2 = new Join2();
join2.setName("Join2");
Join1 join1 = new Join1(join2);
join1.setName("Join1");
join1.start();
join2.start();
}
}
class Join1 extends Thread {
private Thread sub;
public Join1(Thread sub) {
this.sub = sub;
}
public Join1() {
}
線程間共享與協作
1.線程間共享
-
synchronized 關鍵字
確保多個線程在同一個時刻,只能有一個線程處於方法或者同步塊中,它保證了線程對變量訪問的可見性和排他性,又稱為內置鎖機制
public class SynchronizedTest {
private int count = 0;
private static final Object lock = new Object();
public int getCount() {
return count;
}
public static void main(String[] args) throws InterruptedException {
SynchronizedTest synchronizedTest = new SynchronizedTest();
SynchronizedThread thread1 = synchronizedTest.new SynchronizedThread();
SynchronizedThread thread2 = synchronizedTest.new SynchronizedThread();
SynchronizedThread thread3 = synchronizedTest.new SynchronizedThread();
thread1.setName("Thread1");
thread2.setName("Thread2");
thread3.setName("Thread3");
thread1.start();
thread2.start();
thread3.start();
Thread.sleep(2000);
System.out.println(synchronizedTest.getCount());
}
private class SynchronizedThread extends Thread {
對象鎖 鎖的是對象,一個class可以有多個對象,鎖不同的對象,互不干擾
類鎖 加在靜態方法上的鎖,鎖的是整個class類,只有一份
public class SynchronizedThread {
public static void main(String[] args) {
InstanceThread instanceThread1 = new InstanceThread();
InstanceThread instanceThread2 = new InstanceThread();
InstanceThread instanceThread3 = new InstanceThread();
instanceThread1.setName("Thread1");
instanceThread2.setName("Thread2");
instanceThread3.setName("Thread3");
instanceThread1.start();
instanceThread2.start();
instanceThread3.start();
}
}
class InstanceThread extends Thread{
錯誤的加鎖會導致不可預知的結果
public class ErrorSynchronized { public static void main(String[] args) throws InterruptedException { ErrorSynchronized errorSynchronized = new ErrorSynchronized(); ErrorSynchronizedThread thread1 = errorSynchronized.new ErrorSynchronizedThread(); for (int i = 0; i < 500; i++) { new Thread(thread1).start(); } System.out.println(thread1.count); } private class ErrorSynchronizedThread implements Runnable{ private Integer count=0; @Override public void run() { synchronized (count){ count++; } } } }
錯誤分析:
反編譯class
class ErrorSynchronized$ErrorSynchronizedThread implements Runnable { private Integer count = Integer.valueOf(0); private ErrorSynchronized$ErrorSynchronizedThread(ErrorSynchronized paramErrorSynchronized) {} public void run() { Integer localInteger1; synchronized (this.count) { localInteger1 = this.count;Integer localInteger2 = this.count = Integer.valueOf(this.count.intValue() + 1); } } } //jdk //Integer.valueOf():超過±128將會new新對象,導致鎖的是不同的對象,也就是沒鎖住 public static Integer valueOf(int i) { if (i >= IntegerCache.low && i <= IntegerCache.high) return IntegerCache.cache[i + (-IntegerCache.low)]; return new Integer(i); }
-
volatile 最輕量的同步
volatile
關鍵字是最輕量的同步機制
,保證了不同線程對某個變量進行操作時的可見性,但是不保證線程安全
,例如,可以使用在創建單例鎖類對象時,防止二重鎖.應用場景:一寫多讀public class VolatileThread { private volatile static boolean flag = false; public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 5; i++) { VolatileThread1 thread = new VolatileThread1(); thread.setName("Thread"+i); thread.start(); } Thread.sleep(2000); flag=true; } public static class VolatileThread1 extends Thread{ @Override public void run() { System.out.println(getName()+" arrive...."); while (!flag); System.out.println(getName()+" end...."); } } }
volatile為什么不安全?
如果有一個變量i = 0用volatile修飾,兩個線程對其進行i++操作,如果線程1從內存中讀取i=0進了緩存,然后把數據讀入寄存器,之后時間片用完了,然后線程2也從內存中讀取i進緩存,因為線程1還未執行寫操作,內存屏障是插入在寫操作之后的指令,意味着還未觸發這個指令,所以緩存行是不會失效的。然后線程2執行完畢,內存中i=1,然后線程1又開始執行,然后將數據寫回緩存再寫回內存,結果還是1
https://blog.csdn.net/qq_33330687/article/details/80990729
public class VolatileUnSafe { private volatile int count = 0; public static void main(String[] args) throws InterruptedException { VolatileUnSafe volatileUnSafe = new VolatileUnSafe(); int index=0; while (index<10){ volatileUnSafe.new VolatileUnSafeThread().start(); index++; } Thread.sleep(1000); System.out.println(volatileUnSafe.count); } class VolatileUnSafeThread extends Thread { @Override public void run() { int i=0; while (i<10000){ count++; i++; } } } }
2.ThreadLocal
ThreadLocal
主要用於線程的隔離,Spring事務中,一個事務可能包含了多個業務邏輯,穿梭於多個Dao,所以回滾是應該是同一個Connection
,如果將Connection
保存到ThreadLocal
中,將十分有效,否則將需要將Connection
進行傳遞,比較繁瑣
public class ThreadLocalUse { private ThreadLocal<Integer> threadLocal=new InheritableThreadLocal<Integer>(){ @Override protected Integer initialValue() { return 0; } }; public static void main(String[] args) { ThreadLocalUse threadLocalUse = new ThreadLocalUse(); for (int i = 0; i < 10; i++) { ThreadLocalUseThread threadLocalUseThread = threadLocalUse.new ThreadLocalUseThread(); threadLocalUseThread.setName("Thread["+i+"]"); threadLocalUseThread.start(); } } class ThreadLocalUseThread extends Thread{ @Override public void run() { Integer local = threadLocal.get(); int index=0; while (index++<getId()){ local++; } System.out.println(getId()+"-"+getName()+":"+local); } } }
ThreadLocal
應該要保存自己的變量,所以,變量不能定義為static
,否則會出現錯誤的結果
ThreadLocal
使用不當會引發內存泄漏
(該回收的對象沒有得到回收,一直占用內存)Rather than keep track of all ThreadLocals, you could clear them all at once
protected void afterExecute(Runnable r, Throwable t) { // you need to set this field via reflection. Thread.currentThread().threadLocals = null; }As a principle, whoever put something in thread local should be responsible to clear it
//set了之后記得remove threadLocal.set(...); try { ... } finally { threadLocal.remove(); }
3.線程間協作
-
等待/通知
是指一個線程A調用了對象O的wait()方法進入等待狀態,而另一個線程B調用了對象O的notify()或者notifyAll()方法,線程A收到通知后從對象O的wait()方法返回,進而執行后續操作。上述兩個線程通過對象O來完成交互,而對象上的wait()和notify/notifyAll()的關系就如同開關信號一樣,用來完成等待方和通知方之間的交互工作
notify()
:通知一個在對象上等待的線程,使其從wait方法返回,而返回的前提是該線程獲取到了對象的鎖,沒有獲得鎖的線程重新進入WAITING狀態。
notifyAll()
:通知所有等待在該對象上的線程
wait()
:調用該方法的線程進入 WAITING狀態,只有等待另外線程的通知或被中斷才會返回.需要注意,調用wait()方法后,會釋放對象的鎖
wait(long)
:超時等待一段時間,這里的參數時間是毫秒,也就是等待長達n毫秒,如果沒有通知就超時返回
wait (long,int)
對於超時時間更細粒度的控制,可以達到納秒
-
等待和通知的標准范式
等待方遵循如下原則。
1)獲取對象的鎖。
2)如果條件不滿足,那么調用對象的wait()方法,被通知后仍要檢查條件。
3)條件滿足則執行對應的邏輯。
通知方遵循如下原則。
1)獲得對象的鎖。
2)改變條件。
3)通知所有等待在對象上的線程。
//等待方 sychronized(obj){ while(!condition){ obj.wait(); } } //通知方 sychronized(obj){ condtion=true; obj.notify(); //or obj.notifyAll(); }
永遠在
while
循環而不是if
語句中使用wait()對在多線程間共享的那個Object來使用wait()
在調用wait(),notify()系列方法之前,線程必須要獲得該對象的對象級別鎖,即只能在同步方法或同步塊中調用wait(),notify()系列方法
盡量使用notifyAll(),而不是 notify()
public class WaitAndNotify { public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; i++) { new ConsumerThread().start(); } Thread.sleep(1000); new ProducerThread().start(); } } class Shop { public static final List<String> PRODUCTS = new ArrayList<String>(); } class ProducerThread extends Thread { @Override public void run() { synchronized (Shop.PRODUCTS){ int index = 0; while (index < 100) { Shop.PRODUCTS.add("Product-" + index++); } Shop.PRODUCTS.notifyAll(); } } } class ConsumerThread extends Thread { @Override public void run() { synchronized (Shop.PRODUCTS) { while (Shop.PRODUCTS.isEmpty()) { try { System.out.println("no product,"+getName()+" wait...."); Shop.PRODUCTS.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } buy(); } } private void buy() { synchronized (Shop.PRODUCTS){ String shop = Shop.PRODUCTS.remove(0); System.out.println(getName() + " buy " + shop); } } }
並發工具類
1.ForkJoin
處理分而治之類(如歸並排序)的問題
public class WordCountForkJoin { public static void main(String[] args) { String path = "D:\\apache-tomcat-8.5.34\\logs\\catalina.2019-03-30.log"; InputStream inputStream = null; try { inputStream = new FileInputStream(path); int len = 0; byte[] bytes = new byte[1024]; StringBuilder stringBuilder = new StringBuilder(); while ((len = inputStream.read(bytes)) != -1) { String str = new String(bytes, 0, len, StandardCharsets.UTF_8); stringBuilder.append(str); } ForkJoinPool pool = new ForkJoinPool(); Map<String, Integer> map = pool.invoke(new WordCount(stringBuilder.toString() .replaceAll("[\"\'\\[()]","") .replace("\\"," ") .replaceAll("[,:?.|+-=]"," ").split("\\s+"))); Set<Map.Entry<String, Integer>> entrySet = map.entrySet(); Iterator<Map.Entry<String, Integer>> iterator = entrySet.iterator(); while (iterator.hasNext()) { Map.Entry<String, Integer> entry = iterator.next(); System.out.println(entry.getKey() + ":" + entry.getValue()); } } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(inputStream); } } } class WordCount extends RecursiveTask<Map<String, Integer>> { private String[] source; public WordCount(String[] source) { this.source = source; } @Override protected Map<String, Integer> compute() { if (source.length <= 1024) { return setMap(source); } else { String[] left = Arrays.copyOfRange(source, 0, source.length / 2); String[] right = Arrays.copyOfRange(source, source.length / 2, source.length); WordCount wordCountLeft = new WordCount(left); WordCount wordCountRight = new WordCount(right); invokeAll(wordCountLeft,wordCountRight); Map<String, Integer> joinLeft = wordCountLeft.join(); Map<String, Integer> joinRight = wordCountRight.join(); return merge(joinLeft, joinRight); } } private Map<String, Integer> setMap(String[] source) { Map<String, Integer> map = new HashMap<>(); for (String item : source) { if (map.containsKey(item)) { Integer value = map.get(item); map.put(item, ++value); } else { map.put(item, 1); } } return map; } private Map<String, Integer> merge(Map<String, Integer> joinLeft, Map<String, Integer> joinRight) { Set<Map.Entry<String, Integer>> entrySet = joinRight.entrySet(); Iterator<Map.Entry<String, Integer>> iterator = entrySet.iterator(); while (iterator.hasNext()) { Map.Entry<String, Integer> entry = iterator.next(); String key = entry.getKey(); Integer value = entry.getValue(); if (joinLeft.containsKey(key)) { joinLeft.put(key, joinLeft.get(key) + value); } } return joinLeft; } }
-
ForkJoin標准范式
我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork和join的操作機制,通常我們不直接繼承ForkjoinTask類,只需要直接繼承其子類。
-
RecursiveAction,用於沒有返回結果的任務
-
RecursiveTask,用於有返回值的任務
task要通過ForkJoinPool來執行,使用submit 或 invoke 提交,兩者的區別是:invoke是同步執行,調用之后需要等待任務完成,才能執行后面的代碼;submit是異步執行
join()和get方法當任務完成的時候返回計算結果。
在我們自己實現的compute方法里,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在調用invokeAll方法時,又會進入compute方法,看看當前子任務是否需要繼續分割成子任務,如果不需要繼續分割,則執行當前子任務並返回結果。使用join方法會等待子任務執行完並得到其結果。
-
2.CountDownLatch
閉鎖,CountDownLatch這個類能夠使一個線程等待其他線程完成各自的工作后再執行
public class MyCountDownLatch { private CountDownLatch countDownLatch = new CountDownLatch(5); public static void main(String[] args) throws InterruptedException { MyCountDownLatch myCountDownLatch = new MyCountDownLatch(); for (int i = 0; i < 5; i++) { myCountDownLatch.new InitialThread().start(); } //等待countDownLatch=0,再執行后面的事情 myCountDownLatch.countDownLatch.await(); System.out.println("do task .... "); Thread.sleep(1000); System.out.println("task finished.... "); } private class InitialThread extends Thread { @Override public void run() { System.out.println(getName() + " prepare initial....."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(getName() + " initial finished!"); countDownLatch.countDown(); } } }
3.CycliBarrier
讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運行
區別CountDownLatch:
CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以反復使用
public class MyCyclicBarrier { private CyclicBarrier cyclicBarrier = new CyclicBarrier(4); public static void main(String[] args) throws Exception { MyCyclicBarrier myCyclicBarrier = new MyCyclicBarrier(); for (int i = 0; i < 4; i++) { myCyclicBarrier.new HighAltitudeGame().start(); } Thread.sleep(2000); System.out.println("高空項目完成"); for (int i = 0; i <4; i++) { NotOneLessGame notOneLessGame = myCyclicBarrier.new NotOneLessGame(); new Thread(notOneLessGame).start(); } } //高空項目 private class HighAltitudeGame extends Thread { @Override public void run() { System.out.println(getName() + "完成高空項目....."); try { System.out.println(getName() +"等待其他人"); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } //一個不能少游戲 private class NotOneLessGame implements Runnable { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + "開始挑戰一個不能少項目"); cyclicBarrier.await(); System.out.println(Thread.currentThread().getName()+"完成任務"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } }
4.Semaphore
Semaphore信號量:用來控制同時訪問
特定資源的線程數量,一般用作流量控制,數據庫連接等有限資源的訪問控制
線程使用Semaphore的acquire()方法獲取一個許可證
使用完之后調用release()方法歸還許可證
用tryAcquire()方法嘗試獲取許可證
public class MySemaphore { //兩個理發師 private static final Semaphore BARBERS = new Semaphore(2); public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(new HairCut()).start(); } } //理發線程 private static class HairCut implements Runnable { @Override public void run() { try { BARBERS.acquire(); System.out.println("當前有理發師空閑,"+Thread.currentThread().getName() +"准備理發...."); Thread.sleep(1000); BARBERS.release(); System.out.println(Thread.currentThread().getName() + "理發完畢...."); } catch (InterruptedException e) { e.printStackTrace(); } } }
5.Exchanger
Exchanger可以在兩個線程之間交換數據
Exchangers may be useful in applications such as genetic algorithms and pipeline designs.
public class MyExchanger { private static final Exchanger<Object> exchanger = new Exchanger<>(); public static void main(String[] args) throws InterruptedException { Consumer consumer = new Consumer(); Producer producer = new Producer(); consumer.setName("consumer"); consumer.setDaemon(true); consumer.start(); producer.setName("producer"); producer.setDaemon(true); producer.start(); Thread.sleep(2000); } private static class Consumer extends Thread { @Override public void run() { Random random = new Random(); try { for (int i = 0; i < 10; i++) { Object exchange = exchanger.exchange(random.nextInt(100)); System.out.println(getName()+":"+exchange.toString()); } } catch (InterruptedException e) { e.printStackTrace(); } } } private static class Producer extends Thread { @Override public void run() { Random random = new Random(); try { for (int i = 0; i < 10; i++) { Object exchange = exchanger.exchange("Iphone" + random.nextInt(10)); System.out.println(getName()+":"+exchange.toString()); } } catch (InterruptedException e) { e.printStackTrace(); } } } }