多線程高並發


Concurrent並發編程

線程基本知識

1.開啟多線程的兩種方式

  • 繼承Thread類

  • 實現Runnable接口

    public class NewThread {
    public static void main(String[] args) {
    new Thread1().start();
    new Thread(new Thread2()).start();
    }
    }

    class Thread1 extends Thread {
    @Override
    public void run() {
    System.out.println("-----extends Thread-----");
    }
    }

    class Thread2 implements Runnable {
    @Override
    public void run() {
    System.out.println("-----implements Runnable-----");
    }
    }

    JDK關於多線程的開啟方式的說明


    /* There are two ways to create a new thread of execution. One is to
    * declare a class to be a subclass of <code>Thread</code>. This
    * subclass should override the <code>run</code> method of class
    * <code>Thread</code>. An instance of the subclass can then be
    * allocated and started
    ....
    * The other way to create a thread is to declare a class that
    * implements the <code>Runnable</code> interface. That class then
    * implements the <code>run</code> method. An instance of the class can
    * then be allocated, passed as an argument when creating
    * <code>Thread</code>, and started.
    */

2.守護線程

設置線程為守護線程之后,線程會在主線程結束后直接結束,所以finally代碼塊將不再保證能執行

public class DaemonThread {
public static void main(String[] args) throws InterruptedException {
DaemonThread1 daemonThread1 = new DaemonThread1();
daemonThread1.setDaemon(true);
daemonThread1.start();
Thread.sleep(1);
daemonThread1.interrupt();
}
}

class DaemonThread1 extends Thread {
@Override
public void run() {
try {
while (!isInterrupted()){
System.out.println(Thread.currentThread().getId()+"---"+Thread.currentThread().getName());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("finally block");
}
}
}

3.中斷線程

安全中斷線程interrupt(),只是提示線程該中斷了,並不保證一定中斷

public class InterruptThread {
public static void main(String[] args) throws InterruptedException {
InterruptThread1 thread1 = new InterruptThread1();
Thread thread = new Thread(new InterruptThread2());
thread1.start();
thread.start();
Thread.sleep(1);
thread1.interrupt();
thread.interrupt();
       Thread thread = new Thread(new InterruptThread3());
thread.start();
Thread.sleep(1);
thread.interrupt();
}
}
class InterruptThread1 extends Thread {
@Override
public void run() {
//響應中斷
while (!isInterrupted()) {
System.out.println("interrupt1---------");
}
}
}

class InterruptThread2 implements Runnable {
@Override
public void run() {
//runnable響應中斷
while (!Thread.currentThread().isInterrupted()) {
System.out.println("interrupt2---------");
}
}
}

class InterruptThread3 implements Runnable {
@Override
public void run() {
//不響應中斷,interrupt將無法阻止
while (true) {
System.out.println("interrupt3---------");
}
}
}

差異:

isInterrupted()

Thread.interrupted()判斷線程是否被中斷,如果線程處於阻塞狀態,線程在檢查中斷標示時如果發現中斷標示為true,會拋異常,這個方法會將中斷標志位重置為false

4.Sleep對Lock的影響

Sleep不會釋放鎖

public class ThreadSleepOnLock {
private static final Object lock = new Object();

public static void main(String[] args) {
new ThreadSleepOnLock1().start();
new ThreadSleepOnLock2().start();
}

private static class ThreadSleepOnLock1 extends Thread {
@Override
public void run() {
sleepLock();
}
}

private static class ThreadSleepOnLock2 extends Thread {
@Override
public void run() {
sleepLock();
}
}

private static void sleepLock() {
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + " hold the lock,current time:" + System.currentTimeMillis());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " release the lock,current time:" + System.currentTimeMillis());
}
}
}

5.Join

使用join()進行插隊,保證線程執行的有序性

public class ThreadJoin {
public static void main(String[] args) {
Join2 join2 = new Join2();
join2.setName("Join2");
Join1 join1 = new Join1(join2);
join1.setName("Join1");
join1.start();
join2.start();
}
}

class Join1 extends Thread {
private Thread sub;

public Join1(Thread sub) {
this.sub = sub;
}

public Join1() {
}

@Override
public void run() {
System.out.println(getName() + " is running----------");
if (sub != null) {
//插隊,這個sub線程執行完了,才繼續執行任務
try {
sub.join();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(getName() + " finished----------");
}
}

class Join2 extends Thread {

@Override
public void run() {
System.out.println(getName() + " is running----------");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName() + " finished----------");
}
}

線程間共享與協作

1.線程間共享

  • synchronized 關鍵字

    確保多個線程在同一個時刻,只能有一個線程處於方法或者同步塊中,它保證了線程對變量訪問的可見性和排他性,又稱為內置鎖機制

public class SynchronizedTest {
private int count = 0;
private static final Object lock = new Object();

public int getCount() {
return count;
}

public static void main(String[] args) throws InterruptedException {
SynchronizedTest synchronizedTest = new SynchronizedTest();
SynchronizedThread thread1 = synchronizedTest.new SynchronizedThread();
SynchronizedThread thread2 = synchronizedTest.new SynchronizedThread();
SynchronizedThread thread3 = synchronizedTest.new SynchronizedThread();
thread1.setName("Thread1");
thread2.setName("Thread2");
thread3.setName("Thread3");
thread1.start();
thread2.start();
thread3.start();
Thread.sleep(2000);
System.out.println(synchronizedTest.getCount());
}

private class SynchronizedThread extends Thread {
@Override
public void run() {
synchronized (lock) {
int i = 0;
while (i < 100) {
System.out.println(getName() + " is running...");
count++;
i++;
}
}
}
}
}

對象鎖 鎖的是對象,一個class可以有多個對象,鎖不同的對象,互不干擾

類鎖 加在靜態方法上的鎖,鎖的是整個class類,只有一份

public class SynchronizedThread {
public static void main(String[] args) {
InstanceThread instanceThread1 = new InstanceThread();
InstanceThread instanceThread2 = new InstanceThread();
InstanceThread instanceThread3 = new InstanceThread();
instanceThread1.setName("Thread1");
instanceThread2.setName("Thread2");
instanceThread3.setName("Thread3");
instanceThread1.start();
instanceThread2.start();
instanceThread3.start();
}
}
class InstanceThread extends Thread{
@Override
public void run() {
synchronizedInstance();
synchronizedClass();
}
//類鎖,加在static方法上
private static synchronized void synchronizedClass(){
System.out.println(Thread.currentThread().getName()+" synchronizedClass...");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" finished synchronizedClass...");
}
//對象鎖
private synchronized void synchronizedInstance(){
System.out.println(Thread.currentThread().getName()+" synchronizedInstance...");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" finished synchronizedInstance...");
}
}

錯誤的加鎖會導致不可預知的結果

public class ErrorSynchronized {
	public static void main(String[] args) throws InterruptedException {
		ErrorSynchronized errorSynchronized = new ErrorSynchronized();
		ErrorSynchronizedThread thread1 = errorSynchronized.new ErrorSynchronizedThread();
		for (int i = 0; i < 500; i++) {
			new Thread(thread1).start();
		}
		System.out.println(thread1.count);
	}
	private class ErrorSynchronizedThread implements Runnable{
		private Integer count=0;
		@Override
		public void run() {
			synchronized (count){
				count++;
			}
		}
	}
}

錯誤分析:

反編譯class

class ErrorSynchronized$ErrorSynchronizedThread
  implements Runnable
{
  private Integer count = Integer.valueOf(0);
  
  private ErrorSynchronized$ErrorSynchronizedThread(ErrorSynchronized paramErrorSynchronized) {}
  
  public void run()
  {
    Integer localInteger1;
    synchronized (this.count)
    {
      localInteger1 = this.count;Integer localInteger2 = this.count = Integer.valueOf(this.count.intValue() + 1);
    }
  }
}
//jdk
//Integer.valueOf():超過±128將會new新對象,導致鎖的是不同的對象,也就是沒鎖住
public static Integer valueOf(int i) {
       if (i >= IntegerCache.low && i <= IntegerCache.high)
           return IntegerCache.cache[i + (-IntegerCache.low)];
       return new Integer(i);
 }
  • volatile 最輕量的同步

    volatile關鍵字是最輕量的同步機制,保證了不同線程對某個變量進行操作時的可見性,但是不保證線程安全 ,例如,可以使用在創建單例鎖類對象時,防止二重鎖.應用場景:一寫多讀

    public class VolatileThread {
    	private volatile static boolean flag = false;
    
    	public static void main(String[] args) throws InterruptedException {
    		for (int i = 0; i < 5; i++) {
    			VolatileThread1 thread = new VolatileThread1();
    			thread.setName("Thread"+i);
    			thread.start();
    		}
    		Thread.sleep(2000);
    		flag=true;
    	}
    
    	public static class VolatileThread1 extends Thread{
    		@Override
    		public void run() {
    			System.out.println(getName()+" arrive....");
    			while (!flag);
    			System.out.println(getName()+" end....");
    		}
    	}
    }
    

    volatile為什么不安全?

    如果有一個變量i = 0用volatile修飾,兩個線程對其進行i++操作,如果線程1從內存中讀取i=0進了緩存,然后把數據讀入寄存器,之后時間片用完了,然后線程2也從內存中讀取i進緩存,因為線程1還未執行寫操作,內存屏障是插入在寫操作之后的指令,意味着還未觸發這個指令,所以緩存行是不會失效的。然后線程2執行完畢,內存中i=1,然后線程1又開始執行,然后將數據寫回緩存再寫回內存,結果還是1

    https://blog.csdn.net/qq_33330687/article/details/80990729

public class VolatileUnSafe {
	private volatile int count = 0;

	public static void main(String[] args) throws InterruptedException {
		VolatileUnSafe volatileUnSafe = new VolatileUnSafe();
		int index=0;
		while (index<10){
			volatileUnSafe.new VolatileUnSafeThread().start();
			index++;
		}
		Thread.sleep(1000);
		System.out.println(volatileUnSafe.count);
	}

	class VolatileUnSafeThread extends Thread {
		@Override
		public void run() {
			int i=0;
			while (i<10000){
				count++;
				i++;
			}
		}
	}
}

2.ThreadLocal

ThreadLocal主要用於線程的隔離,Spring事務中,一個事務可能包含了多個業務邏輯,穿梭於多個Dao,所以回滾是應該是同一個Connection,如果將Connection保存到ThreadLocal中,將十分有效,否則將需要將Connection進行傳遞,比較繁瑣

public class ThreadLocalUse {
	private ThreadLocal<Integer> threadLocal=new InheritableThreadLocal<Integer>(){
		@Override
		protected Integer initialValue() {
			return 0;
		}
	};

	public static void main(String[] args) {
		ThreadLocalUse threadLocalUse = new ThreadLocalUse();
		for (int i = 0; i < 10; i++) {
			ThreadLocalUseThread threadLocalUseThread = threadLocalUse.new ThreadLocalUseThread();
			threadLocalUseThread.setName("Thread["+i+"]");
			threadLocalUseThread.start();
		}
	}
	class ThreadLocalUseThread extends Thread{
		@Override
		public void run() {
			Integer local = threadLocal.get();
			int index=0;
			while (index++<getId()){
				local++;
			}
			System.out.println(getId()+"-"+getName()+":"+local);
		}
	}
}

ThreadLocal應該要保存自己的變量,所以,變量不能定義為static,否則會出現錯誤的結果

ThreadLocal使用不當會引發內存泄漏(該回收的對象沒有得到回收,一直占用內存)

Rather than keep track of all ThreadLocals, you could clear them all at once

protected void afterExecute(Runnable r, Throwable t) { 
 // you need to set this field via reflection.
 Thread.currentThread().threadLocals = null;
}

As a principle, whoever put something in thread local should be responsible to clear it

//set了之后記得remove
threadLocal.set(...);
try {
...
} finally {
threadLocal.remove();
}

3.線程間協作

  • 等待/通知

    是指一個線程A調用了對象O的wait()方法進入等待狀態,而另一個線程B調用了對象O的notify()或者notifyAll()方法,線程A收到通知后從對象O的wait()方法返回,進而執行后續操作。上述兩個線程通過對象O來完成交互,而對象上的wait()和notify/notifyAll()的關系就如同開關信號一樣,用來完成等待方和通知方之間的交互工作

notify():通知一個在對象上等待的線程,使其從wait方法返回,而返回的前提是該線程獲取到了對象的鎖,沒有獲得鎖的線程重新進入WAITING狀態。

notifyAll():通知所有等待在該對象上的線程

wait():調用該方法的線程進入 WAITING狀態,只有等待另外線程的通知或被中斷才會返回.需要注意,調用wait()方法后,會釋放對象的鎖

wait(long):超時等待一段時間,這里的參數時間是毫秒,也就是等待長達n毫秒,如果沒有通知就超時返回

wait (long,int)對於超時時間更細粒度的控制,可以達到納秒

 

  • 等待和通知的標准范式

    等待方遵循如下原則。

    1)獲取對象的鎖。

    2)如果條件不滿足,那么調用對象的wait()方法,被通知后仍要檢查條件。

    3)條件滿足則執行對應的邏輯。

    通知方遵循如下原則。

    1)獲得對象的鎖。

    2)改變條件。

    3)通知所有等待在對象上的線程。

    //等待方
    sychronized(obj){
        while(!condition){
            obj.wait();
        }
    }
    
    //通知方
    sychronized(obj){
        condtion=true;
        obj.notify();
        //or
      	obj.notifyAll();
    }
    

永遠在while循環而不是if語句中使用wait()

對在多線程間共享的那個Object來使用wait()

在調用wait(),notify()系列方法之前,線程必須要獲得該對象的對象級別鎖,即只能在同步方法或同步塊中調用wait(),notify()系列方法

盡量使用notifyAll(),而不是 notify()

public class WaitAndNotify {
	public static void main(String[] args) throws InterruptedException {
		for (int i = 0; i < 10; i++) {
			new ConsumerThread().start();
		}
		Thread.sleep(1000);
		new ProducerThread().start();
	}
}

class Shop {
	public static final List<String> PRODUCTS = new ArrayList<String>();
}

class ProducerThread extends Thread {
	@Override
	public void run() {
		synchronized (Shop.PRODUCTS){
			int index = 0;
			while (index < 100) {
				Shop.PRODUCTS.add("Product-" + index++);
			}
			Shop.PRODUCTS.notifyAll();
		}
	}
}

class ConsumerThread extends Thread {

	@Override
	public void run() {
		synchronized (Shop.PRODUCTS) {
			while (Shop.PRODUCTS.isEmpty()) {
				try {
					System.out.println("no product,"+getName()+" wait....");
					Shop.PRODUCTS.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			buy();
		}
	}

	private void buy() {
		synchronized (Shop.PRODUCTS){
			String shop = Shop.PRODUCTS.remove(0);
			System.out.println(getName() + " buy " + shop);
		}
	}
}

並發工具類

1.ForkJoin

處理分而治之類(如歸並排序)的問題

public class WordCountForkJoin {
	public static void main(String[] args) {

		String path = "D:\\apache-tomcat-8.5.34\\logs\\catalina.2019-03-30.log";
		InputStream inputStream = null;
		try {
			inputStream = new FileInputStream(path);
			int len = 0;
			byte[] bytes = new byte[1024];
			StringBuilder stringBuilder = new StringBuilder();
			while ((len = inputStream.read(bytes)) != -1) {
				String str = new String(bytes, 0, len, StandardCharsets.UTF_8);
				stringBuilder.append(str);
			}
			ForkJoinPool pool = new ForkJoinPool();
			Map<String, Integer> map =
					pool.invoke(new WordCount(stringBuilder.toString()
							.replaceAll("[\"\'\\[()]","")
							.replace("\\"," ")
							.replaceAll("[,:?.|+-=]"," ").split("\\s+")));
			Set<Map.Entry<String, Integer>> entrySet = map.entrySet();
			Iterator<Map.Entry<String, Integer>> iterator = entrySet.iterator();
			while (iterator.hasNext()) {
				Map.Entry<String, Integer> entry = iterator.next();
				System.out.println(entry.getKey() + ":" + entry.getValue());
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeQuietly(inputStream);
		}
	}
}

class WordCount extends RecursiveTask<Map<String, Integer>> {
	private String[] source;

	public WordCount(String[] source) {
		this.source = source;
	}

	@Override
	protected Map<String, Integer> compute() {

		if (source.length <= 1024) {
			return setMap(source);
		} else {
			String[] left = Arrays.copyOfRange(source, 0, source.length / 2);
			String[] right = Arrays.copyOfRange(source, source.length / 2, source.length);
			WordCount wordCountLeft = new WordCount(left);
			WordCount wordCountRight = new WordCount(right);
			invokeAll(wordCountLeft,wordCountRight);
			Map<String, Integer> joinLeft = wordCountLeft.join();
			Map<String, Integer> joinRight = wordCountRight.join();
			return merge(joinLeft, joinRight);
		}
	}

	private Map<String, Integer> setMap(String[] source) {
		Map<String, Integer> map = new HashMap<>();
		for (String item : source) {
			if (map.containsKey(item)) {
				Integer value = map.get(item);
				map.put(item, ++value);
			} else {
				map.put(item, 1);
			}
		}
		return map;
	}

	private Map<String, Integer> merge(Map<String, Integer> joinLeft, Map<String, Integer> joinRight) {
		Set<Map.Entry<String, Integer>> entrySet = joinRight.entrySet();
		Iterator<Map.Entry<String, Integer>> iterator = entrySet.iterator();
		while (iterator.hasNext()) {
			Map.Entry<String, Integer> entry = iterator.next();
			String key = entry.getKey();
			Integer value = entry.getValue();
			if (joinLeft.containsKey(key)) {
				joinLeft.put(key, joinLeft.get(key) + value);
			}
		}
		return joinLeft;
	}
}
  • ForkJoin標准范式

    我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork和join的操作機制,通常我們不直接繼承ForkjoinTask類,只需要直接繼承其子類。

    1. RecursiveAction,用於沒有返回結果的任務

    2. RecursiveTask,用於有返回值的任務

    task要通過ForkJoinPool來執行,使用submit 或 invoke 提交,兩者的區別是:invoke是同步執行,調用之后需要等待任務完成,才能執行后面的代碼;submit是異步執行

    join()和get方法當任務完成的時候返回計算結果。

    在我們自己實現的compute方法里,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在調用invokeAll方法時,又會進入compute方法,看看當前子任務是否需要繼續分割成子任務,如果不需要繼續分割,則執行當前子任務並返回結果。使用join方法會等待子任務執行完並得到其結果。

2.CountDownLatch

閉鎖,CountDownLatch這個類能夠使一個線程等待其他線程完成各自的工作后再執行

public class MyCountDownLatch {
	private CountDownLatch countDownLatch = new CountDownLatch(5);

	public static void main(String[] args) throws InterruptedException {
		MyCountDownLatch myCountDownLatch = new MyCountDownLatch();
		for (int i = 0; i < 5; i++) {
			myCountDownLatch.new InitialThread().start();
		}
		//等待countDownLatch=0,再執行后面的事情
		myCountDownLatch.countDownLatch.await();
		System.out.println("do task .... ");
		Thread.sleep(1000);
		System.out.println("task finished.... ");
	}

	private class InitialThread extends Thread {
		@Override
		public void run() {
			System.out.println(getName() + "  prepare initial.....");
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(getName() + " initial finished!");
			countDownLatch.countDown();
		}
	}
}

3.CycliBarrier

讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運行

區別CountDownLatch:

CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以反復使用

public class MyCyclicBarrier {
	private CyclicBarrier cyclicBarrier = new CyclicBarrier(4);

	public static void main(String[] args) throws Exception {
		MyCyclicBarrier myCyclicBarrier = new MyCyclicBarrier();
		for (int i = 0; i < 4; i++) {
			myCyclicBarrier.new HighAltitudeGame().start();
		}
		Thread.sleep(2000);
		System.out.println("高空項目完成");
		for (int i = 0; i <4; i++) {
			NotOneLessGame notOneLessGame = myCyclicBarrier.new NotOneLessGame();
			new Thread(notOneLessGame).start();
		}
	}

	//高空項目
	private class HighAltitudeGame extends Thread {
		@Override
		public void run() {
			System.out.println(getName() + "完成高空項目.....");
			try {
				System.out.println(getName() +"等待其他人");
				cyclicBarrier.await();
			} catch (InterruptedException | BrokenBarrierException e) {
				e.printStackTrace();
			}
		}
	}

	//一個不能少游戲
	private class NotOneLessGame implements Runnable {
		@Override
		public void run() {
			try {
				System.out.println(Thread.currentThread().getName() + "開始挑戰一個不能少項目");
				cyclicBarrier.await();
				System.out.println(Thread.currentThread().getName()+"完成任務");
			} catch (InterruptedException | BrokenBarrierException e) {
				e.printStackTrace();
			}
		}
	}
}

4.Semaphore

Semaphore信號量:用來控制同時訪問特定資源的線程數量,一般用作流量控制,數據庫連接等有限資源的訪問控制

線程使用Semaphore的acquire()方法獲取一個許可證

使用完之后調用release()方法歸還許可證

用tryAcquire()方法嘗試獲取許可證

public class MySemaphore {
	//兩個理發師
	private static final Semaphore BARBERS = new Semaphore(2);

	public static void main(String[] args) {
		for (int i = 0; i < 10; i++) {
			new Thread(new HairCut()).start();
		}
	}
	//理發線程
	private static class HairCut implements Runnable {
		@Override
		public void run() {
			try {
				BARBERS.acquire();
				System.out.println("當前有理發師空閑,"+Thread.currentThread().getName() +"准備理發....");
				Thread.sleep(1000);
				BARBERS.release();
				System.out.println(Thread.currentThread().getName() + "理發完畢....");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

5.Exchanger

Exchanger可以在兩個線程之間交換數據

Exchangers may be useful in applications such as genetic algorithms and pipeline designs.

public class MyExchanger {
	private static final Exchanger<Object> exchanger = new Exchanger<>();

	public static void main(String[] args) throws InterruptedException {
		Consumer consumer = new Consumer();
		Producer producer = new Producer();
		consumer.setName("consumer");
		consumer.setDaemon(true);
		consumer.start();
		producer.setName("producer");
		producer.setDaemon(true);
		producer.start();
		Thread.sleep(2000);
	}

	private static class Consumer extends Thread {
		@Override
		public void run() {
			Random random = new Random();
			try {
				for (int i = 0; i < 10; i++) {
					Object exchange = exchanger.exchange(random.nextInt(100));
					System.out.println(getName()+":"+exchange.toString());
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	private static class Producer extends Thread {
		@Override
		public void run() {
			Random random = new Random();
			try {
				for (int i = 0; i < 10; i++) {
					Object exchange = exchanger.exchange("Iphone" + random.nextInt(10));
					System.out.println(getName()+":"+exchange.toString());
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM