線程基本知識
1.開啟多線程的兩種方式
-
繼承Thread類
-
實現Runnable接口
public class NewThread {
public static void main(String[] args) {
new Thread1().start();
new Thread(new Thread2()).start();
}
}
class Thread1 extends Thread {
JDK關於多線程的開啟方式的說明
/* There are two ways to create a new thread of execution. One is to
* declare a class to be a subclass of <code>Thread</code>. This
* subclass should override the <code>run</code> method of class
* <code>Thread</code>. An instance of the subclass can then be
* allocated and started
....
* The other way to create a thread is to declare a class that
* implements the <code>Runnable</code> interface. That class then
* implements the <code>run</code> method. An instance of the class can
* then be allocated, passed as an argument when creating
* <code>Thread</code>, and started.
*/
2.守護線程
設置線程為守護線程之后,線程會在主線程結束后直接結束,所以finally代碼塊將不再保證能執行
public class DaemonThread {
public static void main(String[] args) throws InterruptedException {
DaemonThread1 daemonThread1 = new DaemonThread1();
daemonThread1.setDaemon(true);
daemonThread1.start();
Thread.sleep(1);
daemonThread1.interrupt();
}
}
class DaemonThread1 extends Thread {
3.中斷線程
安全中斷線程interrupt(),只是提示線程該中斷了,並不保證一定中斷
public class InterruptThread {
public static void main(String[] args) throws InterruptedException {
InterruptThread1 thread1 = new InterruptThread1();
Thread thread = new Thread(new InterruptThread2());
thread1.start();
thread.start();
Thread.sleep(1);
thread1.interrupt();
thread.interrupt();
Thread thread = new Thread(new InterruptThread3());
thread.start();
Thread.sleep(1);
thread.interrupt();
}
}
class InterruptThread1 extends Thread {
差異:
isInterrupted()
Thread.interrupted()判斷線程是否被中斷,如果線程處於阻塞狀態,線程在檢查中斷標示時如果發現中斷標示為true,會拋異常,這個方法會將中斷標志位重置為false
4.Sleep對Lock的影響
Sleep不會釋放鎖
public class ThreadSleepOnLock {
private static final Object lock = new Object();
public static void main(String[] args) {
new ThreadSleepOnLock1().start();
new ThreadSleepOnLock2().start();
}
private static class ThreadSleepOnLock1 extends Thread {
5.Join
使用join()進行插隊,保證線程執行的有序性
public class ThreadJoin {
public static void main(String[] args) {
Join2 join2 = new Join2();
join2.setName("Join2");
Join1 join1 = new Join1(join2);
join1.setName("Join1");
join1.start();
join2.start();
}
}
class Join1 extends Thread {
private Thread sub;
public Join1(Thread sub) {
this.sub = sub;
}
public Join1() {
}
線程間共享與協作
1.線程間共享
-
synchronized 關鍵字
確保多個線程在同一個時刻,只能有一個線程處於方法或者同步塊中,它保證了線程對變量訪問的可見性和排他性,又稱為內置鎖機制
public class SynchronizedTest {
private int count = 0;
private static final Object lock = new Object();
public int getCount() {
return count;
}
public static void main(String[] args) throws InterruptedException {
SynchronizedTest synchronizedTest = new SynchronizedTest();
SynchronizedThread thread1 = synchronizedTest.new SynchronizedThread();
SynchronizedThread thread2 = synchronizedTest.new SynchronizedThread();
SynchronizedThread thread3 = synchronizedTest.new SynchronizedThread();
thread1.setName("Thread1");
thread2.setName("Thread2");
thread3.setName("Thread3");
thread1.start();
thread2.start();
thread3.start();
Thread.sleep(2000);
System.out.println(synchronizedTest.getCount());
}
private class SynchronizedThread extends Thread {
對象鎖 鎖的是對象,一個class可以有多個對象,鎖不同的對象,互不干擾
類鎖 加在靜態方法上的鎖,鎖的是整個class類,只有一份
public class SynchronizedThread {
public static void main(String[] args) {
InstanceThread instanceThread1 = new InstanceThread();
InstanceThread instanceThread2 = new InstanceThread();
InstanceThread instanceThread3 = new InstanceThread();
instanceThread1.setName("Thread1");
instanceThread2.setName("Thread2");
instanceThread3.setName("Thread3");
instanceThread1.start();
instanceThread2.start();
instanceThread3.start();
}
}
class InstanceThread extends Thread{
錯誤的加鎖會導致不可預知的結果
public class ErrorSynchronized {
public static void main(String[] args) throws InterruptedException {
ErrorSynchronized errorSynchronized = new ErrorSynchronized();
ErrorSynchronizedThread thread1 = errorSynchronized.new ErrorSynchronizedThread();
for (int i = 0; i < 500; i++) {
new Thread(thread1).start();
}
System.out.println(thread1.count);
}
private class ErrorSynchronizedThread implements Runnable{
private Integer count=0;
@Override
public void run() {
synchronized (count){
count++;
}
}
}
}
錯誤分析:
反編譯class
class ErrorSynchronized$ErrorSynchronizedThread
implements Runnable
{
private Integer count = Integer.valueOf(0);
private ErrorSynchronized$ErrorSynchronizedThread(ErrorSynchronized paramErrorSynchronized) {}
public void run()
{
Integer localInteger1;
synchronized (this.count)
{
localInteger1 = this.count;Integer localInteger2 = this.count = Integer.valueOf(this.count.intValue() + 1);
}
}
}
//jdk
//Integer.valueOf():超過±128將會new新對象,導致鎖的是不同的對象,也就是沒鎖住
public static Integer valueOf(int i) {
if (i >= IntegerCache.low && i <= IntegerCache.high)
return IntegerCache.cache[i + (-IntegerCache.low)];
return new Integer(i);
}
-
volatile 最輕量的同步
volatile關鍵字是最輕量的同步機制,保證了不同線程對某個變量進行操作時的可見性,但是不保證線程安全,例如,可以使用在創建單例鎖類對象時,防止二重鎖.應用場景:一寫多讀public class VolatileThread { private volatile static boolean flag = false; public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 5; i++) { VolatileThread1 thread = new VolatileThread1(); thread.setName("Thread"+i); thread.start(); } Thread.sleep(2000); flag=true; } public static class VolatileThread1 extends Thread{ @Override public void run() { System.out.println(getName()+" arrive...."); while (!flag); System.out.println(getName()+" end...."); } } }volatile為什么不安全?
如果有一個變量i = 0用volatile修飾,兩個線程對其進行i++操作,如果線程1從內存中讀取i=0進了緩存,然后把數據讀入寄存器,之后時間片用完了,然后線程2也從內存中讀取i進緩存,因為線程1還未執行寫操作,內存屏障是插入在寫操作之后的指令,意味着還未觸發這個指令,所以緩存行是不會失效的。然后線程2執行完畢,內存中i=1,然后線程1又開始執行,然后將數據寫回緩存再寫回內存,結果還是1
https://blog.csdn.net/qq_33330687/article/details/80990729
public class VolatileUnSafe {
private volatile int count = 0;
public static void main(String[] args) throws InterruptedException {
VolatileUnSafe volatileUnSafe = new VolatileUnSafe();
int index=0;
while (index<10){
volatileUnSafe.new VolatileUnSafeThread().start();
index++;
}
Thread.sleep(1000);
System.out.println(volatileUnSafe.count);
}
class VolatileUnSafeThread extends Thread {
@Override
public void run() {
int i=0;
while (i<10000){
count++;
i++;
}
}
}
}
2.ThreadLocal
ThreadLocal主要用於線程的隔離,Spring事務中,一個事務可能包含了多個業務邏輯,穿梭於多個Dao,所以回滾是應該是同一個Connection,如果將Connection保存到ThreadLocal中,將十分有效,否則將需要將Connection進行傳遞,比較繁瑣
public class ThreadLocalUse {
private ThreadLocal<Integer> threadLocal=new InheritableThreadLocal<Integer>(){
@Override
protected Integer initialValue() {
return 0;
}
};
public static void main(String[] args) {
ThreadLocalUse threadLocalUse = new ThreadLocalUse();
for (int i = 0; i < 10; i++) {
ThreadLocalUseThread threadLocalUseThread = threadLocalUse.new ThreadLocalUseThread();
threadLocalUseThread.setName("Thread["+i+"]");
threadLocalUseThread.start();
}
}
class ThreadLocalUseThread extends Thread{
@Override
public void run() {
Integer local = threadLocal.get();
int index=0;
while (index++<getId()){
local++;
}
System.out.println(getId()+"-"+getName()+":"+local);
}
}
}
ThreadLocal應該要保存自己的變量,所以,變量不能定義為static,否則會出現錯誤的結果
ThreadLocal使用不當會引發內存泄漏(該回收的對象沒有得到回收,一直占用內存)Rather than keep track of all ThreadLocals, you could clear them all at once
protected void afterExecute(Runnable r, Throwable t) { // you need to set this field via reflection. Thread.currentThread().threadLocals = null; }As a principle, whoever put something in thread local should be responsible to clear it
//set了之后記得remove threadLocal.set(...); try { ... } finally { threadLocal.remove(); }
3.線程間協作
-
等待/通知
是指一個線程A調用了對象O的wait()方法進入等待狀態,而另一個線程B調用了對象O的notify()或者notifyAll()方法,線程A收到通知后從對象O的wait()方法返回,進而執行后續操作。上述兩個線程通過對象O來完成交互,而對象上的wait()和notify/notifyAll()的關系就如同開關信號一樣,用來完成等待方和通知方之間的交互工作
notify():通知一個在對象上等待的線程,使其從wait方法返回,而返回的前提是該線程獲取到了對象的鎖,沒有獲得鎖的線程重新進入WAITING狀態。
notifyAll():通知所有等待在該對象上的線程
wait():調用該方法的線程進入 WAITING狀態,只有等待另外線程的通知或被中斷才會返回.需要注意,調用wait()方法后,會釋放對象的鎖
wait(long):超時等待一段時間,這里的參數時間是毫秒,也就是等待長達n毫秒,如果沒有通知就超時返回
wait (long,int)對於超時時間更細粒度的控制,可以達到納秒
-
等待和通知的標准范式
等待方遵循如下原則。
1)獲取對象的鎖。
2)如果條件不滿足,那么調用對象的wait()方法,被通知后仍要檢查條件。
3)條件滿足則執行對應的邏輯。
通知方遵循如下原則。
1)獲得對象的鎖。
2)改變條件。
3)通知所有等待在對象上的線程。
//等待方 sychronized(obj){ while(!condition){ obj.wait(); } } //通知方 sychronized(obj){ condtion=true; obj.notify(); //or obj.notifyAll(); }
永遠在
while循環而不是if語句中使用wait()對在多線程間共享的那個Object來使用wait()
在調用wait(),notify()系列方法之前,線程必須要獲得該對象的對象級別鎖,即只能在同步方法或同步塊中調用wait(),notify()系列方法
盡量使用notifyAll(),而不是 notify()
public class WaitAndNotify {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
new ConsumerThread().start();
}
Thread.sleep(1000);
new ProducerThread().start();
}
}
class Shop {
public static final List<String> PRODUCTS = new ArrayList<String>();
}
class ProducerThread extends Thread {
@Override
public void run() {
synchronized (Shop.PRODUCTS){
int index = 0;
while (index < 100) {
Shop.PRODUCTS.add("Product-" + index++);
}
Shop.PRODUCTS.notifyAll();
}
}
}
class ConsumerThread extends Thread {
@Override
public void run() {
synchronized (Shop.PRODUCTS) {
while (Shop.PRODUCTS.isEmpty()) {
try {
System.out.println("no product,"+getName()+" wait....");
Shop.PRODUCTS.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
buy();
}
}
private void buy() {
synchronized (Shop.PRODUCTS){
String shop = Shop.PRODUCTS.remove(0);
System.out.println(getName() + " buy " + shop);
}
}
}
並發工具類
1.ForkJoin
處理分而治之類(如歸並排序)的問題
public class WordCountForkJoin {
public static void main(String[] args) {
String path = "D:\\apache-tomcat-8.5.34\\logs\\catalina.2019-03-30.log";
InputStream inputStream = null;
try {
inputStream = new FileInputStream(path);
int len = 0;
byte[] bytes = new byte[1024];
StringBuilder stringBuilder = new StringBuilder();
while ((len = inputStream.read(bytes)) != -1) {
String str = new String(bytes, 0, len, StandardCharsets.UTF_8);
stringBuilder.append(str);
}
ForkJoinPool pool = new ForkJoinPool();
Map<String, Integer> map =
pool.invoke(new WordCount(stringBuilder.toString()
.replaceAll("[\"\'\\[()]","")
.replace("\\"," ")
.replaceAll("[,:?.|+-=]"," ").split("\\s+")));
Set<Map.Entry<String, Integer>> entrySet = map.entrySet();
Iterator<Map.Entry<String, Integer>> iterator = entrySet.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Integer> entry = iterator.next();
System.out.println(entry.getKey() + ":" + entry.getValue());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(inputStream);
}
}
}
class WordCount extends RecursiveTask<Map<String, Integer>> {
private String[] source;
public WordCount(String[] source) {
this.source = source;
}
@Override
protected Map<String, Integer> compute() {
if (source.length <= 1024) {
return setMap(source);
} else {
String[] left = Arrays.copyOfRange(source, 0, source.length / 2);
String[] right = Arrays.copyOfRange(source, source.length / 2, source.length);
WordCount wordCountLeft = new WordCount(left);
WordCount wordCountRight = new WordCount(right);
invokeAll(wordCountLeft,wordCountRight);
Map<String, Integer> joinLeft = wordCountLeft.join();
Map<String, Integer> joinRight = wordCountRight.join();
return merge(joinLeft, joinRight);
}
}
private Map<String, Integer> setMap(String[] source) {
Map<String, Integer> map = new HashMap<>();
for (String item : source) {
if (map.containsKey(item)) {
Integer value = map.get(item);
map.put(item, ++value);
} else {
map.put(item, 1);
}
}
return map;
}
private Map<String, Integer> merge(Map<String, Integer> joinLeft, Map<String, Integer> joinRight) {
Set<Map.Entry<String, Integer>> entrySet = joinRight.entrySet();
Iterator<Map.Entry<String, Integer>> iterator = entrySet.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Integer> entry = iterator.next();
String key = entry.getKey();
Integer value = entry.getValue();
if (joinLeft.containsKey(key)) {
joinLeft.put(key, joinLeft.get(key) + value);
}
}
return joinLeft;
}
}
-
ForkJoin標准范式
我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork和join的操作機制,通常我們不直接繼承ForkjoinTask類,只需要直接繼承其子類。
-
RecursiveAction,用於沒有返回結果的任務
-
RecursiveTask,用於有返回值的任務
task要通過ForkJoinPool來執行,使用submit 或 invoke 提交,兩者的區別是:invoke是同步執行,調用之后需要等待任務完成,才能執行后面的代碼;submit是異步執行
join()和get方法當任務完成的時候返回計算結果。
在我們自己實現的compute方法里,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在調用invokeAll方法時,又會進入compute方法,看看當前子任務是否需要繼續分割成子任務,如果不需要繼續分割,則執行當前子任務並返回結果。使用join方法會等待子任務執行完並得到其結果。
-
2.CountDownLatch
閉鎖,CountDownLatch這個類能夠使一個線程等待其他線程完成各自的工作后再執行
public class MyCountDownLatch {
private CountDownLatch countDownLatch = new CountDownLatch(5);
public static void main(String[] args) throws InterruptedException {
MyCountDownLatch myCountDownLatch = new MyCountDownLatch();
for (int i = 0; i < 5; i++) {
myCountDownLatch.new InitialThread().start();
}
//等待countDownLatch=0,再執行后面的事情
myCountDownLatch.countDownLatch.await();
System.out.println("do task .... ");
Thread.sleep(1000);
System.out.println("task finished.... ");
}
private class InitialThread extends Thread {
@Override
public void run() {
System.out.println(getName() + " prepare initial.....");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName() + " initial finished!");
countDownLatch.countDown();
}
}
}
3.CycliBarrier
讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運行
區別CountDownLatch:
CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以反復使用
public class MyCyclicBarrier {
private CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
public static void main(String[] args) throws Exception {
MyCyclicBarrier myCyclicBarrier = new MyCyclicBarrier();
for (int i = 0; i < 4; i++) {
myCyclicBarrier.new HighAltitudeGame().start();
}
Thread.sleep(2000);
System.out.println("高空項目完成");
for (int i = 0; i <4; i++) {
NotOneLessGame notOneLessGame = myCyclicBarrier.new NotOneLessGame();
new Thread(notOneLessGame).start();
}
}
//高空項目
private class HighAltitudeGame extends Thread {
@Override
public void run() {
System.out.println(getName() + "完成高空項目.....");
try {
System.out.println(getName() +"等待其他人");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
//一個不能少游戲
private class NotOneLessGame implements Runnable {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "開始挑戰一個不能少項目");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName()+"完成任務");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
4.Semaphore
Semaphore信號量:用來控制同時訪問特定資源的線程數量,一般用作流量控制,數據庫連接等有限資源的訪問控制
線程使用Semaphore的acquire()方法獲取一個許可證
使用完之后調用release()方法歸還許可證
用tryAcquire()方法嘗試獲取許可證
public class MySemaphore {
//兩個理發師
private static final Semaphore BARBERS = new Semaphore(2);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(new HairCut()).start();
}
}
//理發線程
private static class HairCut implements Runnable {
@Override
public void run() {
try {
BARBERS.acquire();
System.out.println("當前有理發師空閑,"+Thread.currentThread().getName() +"准備理發....");
Thread.sleep(1000);
BARBERS.release();
System.out.println(Thread.currentThread().getName() + "理發完畢....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5.Exchanger
Exchanger可以在兩個線程之間交換數據
Exchangers may be useful in applications such as genetic algorithms and pipeline designs.
public class MyExchanger {
private static final Exchanger<Object> exchanger = new Exchanger<>();
public static void main(String[] args) throws InterruptedException {
Consumer consumer = new Consumer();
Producer producer = new Producer();
consumer.setName("consumer");
consumer.setDaemon(true);
consumer.start();
producer.setName("producer");
producer.setDaemon(true);
producer.start();
Thread.sleep(2000);
}
private static class Consumer extends Thread {
@Override
public void run() {
Random random = new Random();
try {
for (int i = 0; i < 10; i++) {
Object exchange = exchanger.exchange(random.nextInt(100));
System.out.println(getName()+":"+exchange.toString());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class Producer extends Thread {
@Override
public void run() {
Random random = new Random();
try {
for (int i = 0; i < 10; i++) {
Object exchange = exchanger.exchange("Iphone" + random.nextInt(10));
System.out.println(getName()+":"+exchange.toString());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
