創建線程
new MyThread().start();
new Thread(new MyRun()).start();
new Thread(()->{
System.out.println("Hello Lambda!");
}).start();
常見方法
- sleep
- yield 讓出線程,從running到ready狀態
- join 線程A中調用B.join(),表示A讓B先執行
線程狀態
總共6大塊,分別是:new,runnable,timedwaiting,waiting,blocked,terminated
synchronize
原理:對象頭(mark word)上有兩位(00,01),表示不同類型的鎖
同一個對象中的兩個非靜態synchronize方法,不能同時執行,由此可見,非靜態的synchronize方法鎖住的對象是當前對象,也就是和synchronize(this)等效
synchronize是可重入鎖:兩個synchronize方法A和B,A可以調用B,B也可以調用A
鎖定的區域發生異常,默認情況下鎖會被釋放,寫業務代碼時要小心
synchronize的底層實現:
JDK早期,重量級鎖
后來有鎖升級的概念,第一次只在對象頭記錄線程的id,叫偏向鎖;如果線程爭用,升級為自旋鎖;自旋10次之后,升級為重量級鎖,也就是系統鎖;
鎖無法降級;
自旋鎖是用戶態,加鎖和解鎖快;重量級鎖是內核態,效率低;
什么情況用自旋鎖,什么情況用系統鎖?運行時間長、線程多用重量級鎖;反之用自旋鎖
volatile
- 線程可見性
- 禁止指令重排
單例模式雙重檢查鎖需要加volatile,答案是需要,原因就是指令重排(對象創建步驟可能重排序,如3和2交換:1、創建對象並賦初始值 2、賦值 3、變量指向堆內存)
其他並發控制類
- ReentrantLock
可以替換synchronize,功能比synchronize強大
lock&unlock 不能自動解鎖,所以unlock一定要在finally中
trylock - CountDownLatch
功能和join一樣
latch.countDown()
latch.await() - CyclicBarrier
滿足一定數據量的線程后,一起啟動 - ReentrantReadWriteLock
讀鎖=共享鎖 readWriteLock.readLock();
寫鎖=排它鎖=互斥鎖 readWriteLock.writeLock();
讀的時候,其他線程也可以讀;寫的時候,其他寫線程都阻塞 - Semaphore
信號量,燈亮了可以執行,滅了不能執行
Semaphore s = new Semaphore(2);可以限流,最多2個線程執行
AQS
ReentrantLock、CountDownLatch、CyclicBarrier都是AQS的實現,AQS的核心是含有一個volatile修飾的state,這個state由實現類維護,和一個包含Thread的雙向鏈表(Node)
面試題:兩個線程交替打印A1B2C3D4(可重入鎖特性)
public class App {
public static void main(String[] args) {
App app = new App();
Thread t1 = new Thread(app::printABC);
Thread t2 = new Thread(app::print123);
t1.start();
t2.start();
}
public synchronized void printABC() {
String str = "ABCDEFG";
for (int i = 0; i < str.length(); i++) {
System.out.println(str.charAt(i));
this.notify();
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notify();
}
public synchronized void print123() {
String str = "1234567";
for (int i = 0; i < str.length(); i++) {
System.out.println(str.charAt(i));
this.notify();
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.notify();
}
}
生產者消費者實現
public class MyContainer2<T> {
final private LinkedList<T> lists = new LinkedList<>();
final private int MAX = 10; //最多10個元素
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
public void put(T t) {
try {
lock.lock();
while (lists.size() == MAX) { //想想為什么用while而不是用if?
producer.await();
}
lists.add(t);
++count;
consumer.signalAll(); //通知消費者線程進行消費
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public T get() {
T t = null;
try {
lock.lock();
while (lists.size() == 0) {
consumer.await();
}
t = lists.removeFirst();
count--;
producer.signalAll(); //通知生產者進行生產
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return t;
}
public static void main(String[] args) {
MyContainer2<String> c = new MyContainer2<>();
//啟動消費者線程
for (int i = 0; i < 1; i++) {
new Thread(() -> {
while (true) {
System.out.println(c.get() + " OUT:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " left:" + c.count);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
//生產者延遲2秒啟動
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//啟動生產者線程
for (int i = 0; i < 2; i++) {
new Thread(() -> {
while (true) {
c.put(Thread.currentThread().getName() + " IN:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "product" + i).start();
}
}
}
自定義線程池
public class App3 {
public static void main(String[] args) {
App3 app3 = new App3();
app3.test();
}
public void test() {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
3, 3, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10000),
new MyThreadFactory("MyThreadGroup"),
new MyRejectedExecutionHandler());
for (int i = 1; i <= 10; i++) {
pool.execute(new MyThread(i));
}
pool.shutdown();
}
class MyThread implements Runnable {
int id;
public MyThread(int id) {
this.id = id;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " id=" + id);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "id=" + id;
}
}
class MyThreadFactory implements ThreadFactory {
String threadGroup;
AtomicInteger nextId = new AtomicInteger(1);
MyThreadFactory(String threadGroup) {
this.threadGroup = threadGroup;
}
@Override
public Thread newThread(Runnable r) {
String threadName = threadGroup + "-" + nextId.getAndIncrement();
Thread thread = new Thread(r, threadName);
return thread;
}
}
class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("任務被拒絕 " + r);
}
}
}