线程基本知识
1.开启多线程的两种方式
-
继承Thread类
-
实现Runnable接口
public class NewThread {
public static void main(String[] args) {
new Thread1().start();
new Thread(new Thread2()).start();
}
}
class Thread1 extends Thread {
JDK关于多线程的开启方式的说明
/* There are two ways to create a new thread of execution. One is to
* declare a class to be a subclass of <code>Thread</code>. This
* subclass should override the <code>run</code> method of class
* <code>Thread</code>. An instance of the subclass can then be
* allocated and started
....
* The other way to create a thread is to declare a class that
* implements the <code>Runnable</code> interface. That class then
* implements the <code>run</code> method. An instance of the class can
* then be allocated, passed as an argument when creating
* <code>Thread</code>, and started.
*/
2.守护线程
设置线程为守护线程之后,线程会在主线程结束后直接结束,所以finally
代码块将不再保证能执行
public class DaemonThread {
public static void main(String[] args) throws InterruptedException {
DaemonThread1 daemonThread1 = new DaemonThread1();
daemonThread1.setDaemon(true);
daemonThread1.start();
Thread.sleep(1);
daemonThread1.interrupt();
}
}
class DaemonThread1 extends Thread {
3.中断线程
安全中断线程interrupt()
,只是提示线程该中断了,并不保证一定中断
public class InterruptThread {
public static void main(String[] args) throws InterruptedException {
InterruptThread1 thread1 = new InterruptThread1();
Thread thread = new Thread(new InterruptThread2());
thread1.start();
thread.start();
Thread.sleep(1);
thread1.interrupt();
thread.interrupt();
Thread thread = new Thread(new InterruptThread3());
thread.start();
Thread.sleep(1);
thread.interrupt();
}
}
class InterruptThread1 extends Thread {
差异:
isInterrupted()
Thread.interrupted()
判断线程是否被中断,如果线程处于阻塞状态,线程在检查中断标示时如果发现中断标示为true,会抛异常,这个方法会将中断标志位重置为false
4.Sleep对Lock的影响
Sleep不会释放锁
public class ThreadSleepOnLock {
private static final Object lock = new Object();
public static void main(String[] args) {
new ThreadSleepOnLock1().start();
new ThreadSleepOnLock2().start();
}
private static class ThreadSleepOnLock1 extends Thread {
5.Join
使用join()
进行插队,保证线程执行的有序性
public class ThreadJoin {
public static void main(String[] args) {
Join2 join2 = new Join2();
join2.setName("Join2");
Join1 join1 = new Join1(join2);
join1.setName("Join1");
join1.start();
join2.start();
}
}
class Join1 extends Thread {
private Thread sub;
public Join1(Thread sub) {
this.sub = sub;
}
public Join1() {
}
线程间共享与协作
1.线程间共享
-
synchronized 关键字
确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性,又称为内置锁机制
public class SynchronizedTest {
private int count = 0;
private static final Object lock = new Object();
public int getCount() {
return count;
}
public static void main(String[] args) throws InterruptedException {
SynchronizedTest synchronizedTest = new SynchronizedTest();
SynchronizedThread thread1 = synchronizedTest.new SynchronizedThread();
SynchronizedThread thread2 = synchronizedTest.new SynchronizedThread();
SynchronizedThread thread3 = synchronizedTest.new SynchronizedThread();
thread1.setName("Thread1");
thread2.setName("Thread2");
thread3.setName("Thread3");
thread1.start();
thread2.start();
thread3.start();
Thread.sleep(2000);
System.out.println(synchronizedTest.getCount());
}
private class SynchronizedThread extends Thread {
对象锁 锁的是对象,一个class可以有多个对象,锁不同的对象,互不干扰
类锁 加在静态方法上的锁,锁的是整个class类,只有一份
public class SynchronizedThread {
public static void main(String[] args) {
InstanceThread instanceThread1 = new InstanceThread();
InstanceThread instanceThread2 = new InstanceThread();
InstanceThread instanceThread3 = new InstanceThread();
instanceThread1.setName("Thread1");
instanceThread2.setName("Thread2");
instanceThread3.setName("Thread3");
instanceThread1.start();
instanceThread2.start();
instanceThread3.start();
}
}
class InstanceThread extends Thread{
错误的加锁会导致不可预知的结果
public class ErrorSynchronized { public static void main(String[] args) throws InterruptedException { ErrorSynchronized errorSynchronized = new ErrorSynchronized(); ErrorSynchronizedThread thread1 = errorSynchronized.new ErrorSynchronizedThread(); for (int i = 0; i < 500; i++) { new Thread(thread1).start(); } System.out.println(thread1.count); } private class ErrorSynchronizedThread implements Runnable{ private Integer count=0; @Override public void run() { synchronized (count){ count++; } } } }
错误分析:
反编译class
class ErrorSynchronized$ErrorSynchronizedThread implements Runnable { private Integer count = Integer.valueOf(0); private ErrorSynchronized$ErrorSynchronizedThread(ErrorSynchronized paramErrorSynchronized) {} public void run() { Integer localInteger1; synchronized (this.count) { localInteger1 = this.count;Integer localInteger2 = this.count = Integer.valueOf(this.count.intValue() + 1); } } } //jdk //Integer.valueOf():超过±128将会new新对象,导致锁的是不同的对象,也就是没锁住 public static Integer valueOf(int i) { if (i >= IntegerCache.low && i <= IntegerCache.high) return IntegerCache.cache[i + (-IntegerCache.low)]; return new Integer(i); }
-
volatile 最轻量的同步
volatile
关键字是最轻量的同步机制
,保证了不同线程对某个变量进行操作时的可见性,但是不保证线程安全
,例如,可以使用在创建单例锁类对象时,防止二重锁.应用场景:一写多读public class VolatileThread { private volatile static boolean flag = false; public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 5; i++) { VolatileThread1 thread = new VolatileThread1(); thread.setName("Thread"+i); thread.start(); } Thread.sleep(2000); flag=true; } public static class VolatileThread1 extends Thread{ @Override public void run() { System.out.println(getName()+" arrive...."); while (!flag); System.out.println(getName()+" end...."); } } }
volatile为什么不安全?
如果有一个变量i = 0用volatile修饰,两个线程对其进行i++操作,如果线程1从内存中读取i=0进了缓存,然后把数据读入寄存器,之后时间片用完了,然后线程2也从内存中读取i进缓存,因为线程1还未执行写操作,内存屏障是插入在写操作之后的指令,意味着还未触发这个指令,所以缓存行是不会失效的。然后线程2执行完毕,内存中i=1,然后线程1又开始执行,然后将数据写回缓存再写回内存,结果还是1
https://blog.csdn.net/qq_33330687/article/details/80990729
public class VolatileUnSafe { private volatile int count = 0; public static void main(String[] args) throws InterruptedException { VolatileUnSafe volatileUnSafe = new VolatileUnSafe(); int index=0; while (index<10){ volatileUnSafe.new VolatileUnSafeThread().start(); index++; } Thread.sleep(1000); System.out.println(volatileUnSafe.count); } class VolatileUnSafeThread extends Thread { @Override public void run() { int i=0; while (i<10000){ count++; i++; } } } }
2.ThreadLocal
ThreadLocal
主要用于线程的隔离,Spring事务中,一个事务可能包含了多个业务逻辑,穿梭于多个Dao,所以回滚是应该是同一个Connection
,如果将Connection
保存到ThreadLocal
中,将十分有效,否则将需要将Connection
进行传递,比较繁琐
public class ThreadLocalUse { private ThreadLocal<Integer> threadLocal=new InheritableThreadLocal<Integer>(){ @Override protected Integer initialValue() { return 0; } }; public static void main(String[] args) { ThreadLocalUse threadLocalUse = new ThreadLocalUse(); for (int i = 0; i < 10; i++) { ThreadLocalUseThread threadLocalUseThread = threadLocalUse.new ThreadLocalUseThread(); threadLocalUseThread.setName("Thread["+i+"]"); threadLocalUseThread.start(); } } class ThreadLocalUseThread extends Thread{ @Override public void run() { Integer local = threadLocal.get(); int index=0; while (index++<getId()){ local++; } System.out.println(getId()+"-"+getName()+":"+local); } } }
ThreadLocal
应该要保存自己的变量,所以,变量不能定义为static
,否则会出现错误的结果
ThreadLocal
使用不当会引发内存泄漏
(该回收的对象没有得到回收,一直占用内存)Rather than keep track of all ThreadLocals, you could clear them all at once
protected void afterExecute(Runnable r, Throwable t) { // you need to set this field via reflection. Thread.currentThread().threadLocals = null; }As a principle, whoever put something in thread local should be responsible to clear it
//set了之后记得remove threadLocal.set(...); try { ... } finally { threadLocal.remove(); }
3.线程间协作
-
等待/通知
是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。上述两个线程通过对象O来完成交互,而对象上的wait()和notify/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作
notify()
:通知一个在对象上等待的线程,使其从wait方法返回,而返回的前提是该线程获取到了对象的锁,没有获得锁的线程重新进入WAITING状态。
notifyAll()
:通知所有等待在该对象上的线程
wait()
:调用该方法的线程进入 WAITING状态,只有等待另外线程的通知或被中断才会返回.需要注意,调用wait()方法后,会释放对象的锁
wait(long)
:超时等待一段时间,这里的参数时间是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回
wait (long,int)
对于超时时间更细粒度的控制,可以达到纳秒
-
等待和通知的标准范式
等待方遵循如下原则。
1)获取对象的锁。
2)如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
3)条件满足则执行对应的逻辑。
通知方遵循如下原则。
1)获得对象的锁。
2)改变条件。
3)通知所有等待在对象上的线程。
//等待方 sychronized(obj){ while(!condition){ obj.wait(); } } //通知方 sychronized(obj){ condtion=true; obj.notify(); //or obj.notifyAll(); }
永远在
while
循环而不是if
语句中使用wait()对在多线程间共享的那个Object来使用wait()
在调用wait(),notify()系列方法之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用wait(),notify()系列方法
尽量使用notifyAll(),而不是 notify()
public class WaitAndNotify { public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; i++) { new ConsumerThread().start(); } Thread.sleep(1000); new ProducerThread().start(); } } class Shop { public static final List<String> PRODUCTS = new ArrayList<String>(); } class ProducerThread extends Thread { @Override public void run() { synchronized (Shop.PRODUCTS){ int index = 0; while (index < 100) { Shop.PRODUCTS.add("Product-" + index++); } Shop.PRODUCTS.notifyAll(); } } } class ConsumerThread extends Thread { @Override public void run() { synchronized (Shop.PRODUCTS) { while (Shop.PRODUCTS.isEmpty()) { try { System.out.println("no product,"+getName()+" wait...."); Shop.PRODUCTS.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } buy(); } } private void buy() { synchronized (Shop.PRODUCTS){ String shop = Shop.PRODUCTS.remove(0); System.out.println(getName() + " buy " + shop); } } }
并发工具类
1.ForkJoin
处理分而治之类(如归并排序)的问题
public class WordCountForkJoin { public static void main(String[] args) { String path = "D:\\apache-tomcat-8.5.34\\logs\\catalina.2019-03-30.log"; InputStream inputStream = null; try { inputStream = new FileInputStream(path); int len = 0; byte[] bytes = new byte[1024]; StringBuilder stringBuilder = new StringBuilder(); while ((len = inputStream.read(bytes)) != -1) { String str = new String(bytes, 0, len, StandardCharsets.UTF_8); stringBuilder.append(str); } ForkJoinPool pool = new ForkJoinPool(); Map<String, Integer> map = pool.invoke(new WordCount(stringBuilder.toString() .replaceAll("[\"\'\\[()]","") .replace("\\"," ") .replaceAll("[,:?.|+-=]"," ").split("\\s+"))); Set<Map.Entry<String, Integer>> entrySet = map.entrySet(); Iterator<Map.Entry<String, Integer>> iterator = entrySet.iterator(); while (iterator.hasNext()) { Map.Entry<String, Integer> entry = iterator.next(); System.out.println(entry.getKey() + ":" + entry.getValue()); } } catch (Exception e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(inputStream); } } } class WordCount extends RecursiveTask<Map<String, Integer>> { private String[] source; public WordCount(String[] source) { this.source = source; } @Override protected Map<String, Integer> compute() { if (source.length <= 1024) { return setMap(source); } else { String[] left = Arrays.copyOfRange(source, 0, source.length / 2); String[] right = Arrays.copyOfRange(source, source.length / 2, source.length); WordCount wordCountLeft = new WordCount(left); WordCount wordCountRight = new WordCount(right); invokeAll(wordCountLeft,wordCountRight); Map<String, Integer> joinLeft = wordCountLeft.join(); Map<String, Integer> joinRight = wordCountRight.join(); return merge(joinLeft, joinRight); } } private Map<String, Integer> setMap(String[] source) { Map<String, Integer> map = new HashMap<>(); for (String item : source) { if (map.containsKey(item)) { Integer value = map.get(item); map.put(item, ++value); } else { map.put(item, 1); } } return map; } private Map<String, Integer> merge(Map<String, Integer> joinLeft, Map<String, Integer> joinRight) { Set<Map.Entry<String, Integer>> entrySet = joinRight.entrySet(); Iterator<Map.Entry<String, Integer>> iterator = entrySet.iterator(); while (iterator.hasNext()) { Map.Entry<String, Integer> entry = iterator.next(); String key = entry.getKey(); Integer value = entry.getValue(); if (joinLeft.containsKey(key)) { joinLeft.put(key, joinLeft.get(key) + value); } } return joinLeft; } }
-
ForkJoin标准范式
我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork和join的操作机制,通常我们不直接继承ForkjoinTask类,只需要直接继承其子类。
-
RecursiveAction,用于没有返回结果的任务
-
RecursiveTask,用于有返回值的任务
task要通过ForkJoinPool来执行,使用submit 或 invoke 提交,两者的区别是:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行
join()和get方法当任务完成的时候返回计算结果。
在我们自己实现的compute方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用invokeAll方法时,又会进入compute方法,看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。
-
2.CountDownLatch
闭锁,CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行
public class MyCountDownLatch { private CountDownLatch countDownLatch = new CountDownLatch(5); public static void main(String[] args) throws InterruptedException { MyCountDownLatch myCountDownLatch = new MyCountDownLatch(); for (int i = 0; i < 5; i++) { myCountDownLatch.new InitialThread().start(); } //等待countDownLatch=0,再执行后面的事情 myCountDownLatch.countDownLatch.await(); System.out.println("do task .... "); Thread.sleep(1000); System.out.println("task finished.... "); } private class InitialThread extends Thread { @Override public void run() { System.out.println(getName() + " prepare initial....."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(getName() + " initial finished!"); countDownLatch.countDown(); } } }
3.CycliBarrier
让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行
区别CountDownLatch:
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以反复使用
public class MyCyclicBarrier { private CyclicBarrier cyclicBarrier = new CyclicBarrier(4); public static void main(String[] args) throws Exception { MyCyclicBarrier myCyclicBarrier = new MyCyclicBarrier(); for (int i = 0; i < 4; i++) { myCyclicBarrier.new HighAltitudeGame().start(); } Thread.sleep(2000); System.out.println("高空项目完成"); for (int i = 0; i <4; i++) { NotOneLessGame notOneLessGame = myCyclicBarrier.new NotOneLessGame(); new Thread(notOneLessGame).start(); } } //高空项目 private class HighAltitudeGame extends Thread { @Override public void run() { System.out.println(getName() + "完成高空项目....."); try { System.out.println(getName() +"等待其他人"); cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } //一个不能少游戏 private class NotOneLessGame implements Runnable { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + "开始挑战一个不能少项目"); cyclicBarrier.await(); System.out.println(Thread.currentThread().getName()+"完成任务"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } }
4.Semaphore
Semaphore信号量:用来控制同时访问
特定资源的线程数量,一般用作流量控制,数据库连接等有限资源的访问控制
线程使用Semaphore的acquire()方法获取一个许可证
使用完之后调用release()方法归还许可证
用tryAcquire()方法尝试获取许可证
public class MySemaphore { //两个理发师 private static final Semaphore BARBERS = new Semaphore(2); public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(new HairCut()).start(); } } //理发线程 private static class HairCut implements Runnable { @Override public void run() { try { BARBERS.acquire(); System.out.println("当前有理发师空闲,"+Thread.currentThread().getName() +"准备理发...."); Thread.sleep(1000); BARBERS.release(); System.out.println(Thread.currentThread().getName() + "理发完毕...."); } catch (InterruptedException e) { e.printStackTrace(); } } }
5.Exchanger
Exchanger可以在两个线程之间交换数据
Exchangers may be useful in applications such as genetic algorithms and pipeline designs.
public class MyExchanger { private static final Exchanger<Object> exchanger = new Exchanger<>(); public static void main(String[] args) throws InterruptedException { Consumer consumer = new Consumer(); Producer producer = new Producer(); consumer.setName("consumer"); consumer.setDaemon(true); consumer.start(); producer.setName("producer"); producer.setDaemon(true); producer.start(); Thread.sleep(2000); } private static class Consumer extends Thread { @Override public void run() { Random random = new Random(); try { for (int i = 0; i < 10; i++) { Object exchange = exchanger.exchange(random.nextInt(100)); System.out.println(getName()+":"+exchange.toString()); } } catch (InterruptedException e) { e.printStackTrace(); } } } private static class Producer extends Thread { @Override public void run() { Random random = new Random(); try { for (int i = 0; i < 10; i++) { Object exchange = exchanger.exchange("Iphone" + random.nextInt(10)); System.out.println(getName()+":"+exchange.toString()); } } catch (InterruptedException e) { e.printStackTrace(); } } } }