线程
首先了解线程的五大状态:新建,就绪,运行,阻塞,终结。
1、新建状态(New):新创建了一个线程对象。
2、就绪状态(Runnable):线程对象创建后,其他线程调用了该对象的start()方法。该状态的线程位于“可运行线程池”中,变得可运行,只等待获取CPU的使用权。即在就绪状态的进程除CPU之外,其它的运行所需资源都已全部获得。
3、运行状态(Running):就绪状态的线程获取了CPU,执行程序代码。
4、阻塞状态(Blocked):阻塞状态是线程因为某种原因放弃CPU使用权,暂时停止运行。直到线程进入就绪状态,才有机会转到运行状态。
阻塞的情况分三种:
(1)、等待阻塞:运行的线程执行wait()方法,该线程会释放占用的所有资源,JVM会把该线程放入“等待池”中。进入这个状态后,是不能自动唤醒的,必须依靠其他线程调用notify()或notifyAll()方法才能被唤醒,
(2)、同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入“锁池”中。
(3)、其他阻塞:运行的线程执行sleep()或join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。
5、死亡状态(Dead):线程执行完了或者因异常退出了run()方法,该线程结束生命周期。
线程变化的状态转换图如下:
线程的实现方法
实现有两种方式,一是继承Thread类,二是实现Runnable接口,但不管怎样, 当我们new了这个对象后,线程就进入了初始状态; 当该对象调用了start()方法,就进入就绪状态!
在工作中,因为需要获取返回值的线程,接触到了callable接口
Callable需要实现的是call()方法,而不是run()方法,返回值的类型有Callable的类型参数指定, Callable只能由ExecutorService.submit() 执行,正常结束后将返回一个future对象,实例如下:
ExecutorService pool = Executors.newFixedThreadPool(10);
Callable<String> c1 = new YkuploadRunable(");
Future<String> f1 = pool.submit(c1);
System.out.println(f1.get());
pool.shutdown();
线程的经典问题:生产者和消费者问题
生产者可以将产品放入仓库,消费者则可以从仓库中取走产品,要有保护仓库为空或溢出
解决生产者/消费者问题的方法可分为两类:
1.采用某种机制保护生产者和消费者之间的同步;
2.在生产者和消费者之间建立一个管道。
实现方法有
1.object.wait(),object.notify();
2.condition.await(),condition.signal();
3.LinkedBlockingQueue<Object> list; put(),take()会自动阻塞
方法一的具体代码
仓库.java
public class Storage
{
// 仓库最大存储量
private final int MAX_SIZE = 100;
// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
// 生产产品
public void produce(String producer)
{
synchronized (list)
{
// 如果仓库已满
while (list.size() == MAX_SIZE)
{
System.out.println("仓库已满,【"+producer+"】: 暂时不能执行生产任务!");
try
{
// 由于条件不满足,生产阻塞
list.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// 生产产品
list.add(new Object());
System.out.println("【"+producer+"】:生产了一个产品\t【现仓储量为】:" + list.size());
list.notifyAll();
}
}
// 消费产品
public void consume(String consumer)
{
synchronized (list)
{
//如果仓库存储量不足
while (list.size()==0)
{
System.out.println("仓库已空,【"+consumer+"】: 暂时不能执行消费任务!");
try
{
// 由于条件不满足,消费阻塞
list.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
list.remove();
System.out.println("【"+consumer+"】:消费了一个产品\t【现仓储量为】:" + list.size());
list.notifyAll();
}
}
public LinkedList<Object> getList()
{
return list;
}
public void setList(LinkedList<Object> list)
{
this.list = list;
}
public int getMAX_SIZE()
{
return MAX_SIZE;
}
}
生产者.java
public class Producer extends Thread
{
private String producer;
private Storage storage;
public Producer(Storage storage)
{
this.storage = storage;
}
@Override
public void run()
{
produce(producer);
}
public void produce(String producer)
{
storage.produce(producer);
}
public String getProducer()
{
return producer;
}
public void setProducer(String producer)
{
this.producer = producer;
}
public Storage getStorage()
{
return storage;
}
public void setStorage(Storage storage)
{
this.storage = storage;
}
}
消费者.java
public class Consumer extends Thread
{
private String consumer;
private Storage storage;
public Consumer(Storage storage)
{
this.storage = storage;
}
@Override
public void run()
{
consume(consumer);
}
public void consume(String consumer)
{
storage.consume(consumer);
}
public Storage getStorage()
{
return storage;
}
public void setStorage(Storage storage)
{
this.storage = storage;
}
public String getConsumer() {
return consumer;
}
public void setConsumer(String consumer) {
this.consumer = consumer;
}
}
方法二的实现如下:注意lock的lock(),unlock()
public class Storage {
// 仓库最大存储量
private final int MAX_SIZE = 100;
// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
// 锁
private final Lock lock = new ReentrantLock();
// 仓库满的条件变量
private final Condition full = lock.newCondition();
// 仓库空的条件变量
private final Condition empty = lock.newCondition();
// 生产产品
public void produce(String producer) {
lock.lock();
// 如果仓库已满
while (list.size() == MAX_SIZE) {
System.out.println("仓库已满,【" + producer + "】: 暂时不能执行生产任务!");
try {
// 由于条件不满足,生产阻塞
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 生产产品
list.add(new Object());
System.out.println("【" + producer + "】:生产了一个产品\t【现仓储量为】:" + list.size());
empty.signalAll();
// 释放锁
lock.unlock();
}
// 消费产品
public void consume(String consumer) {
// 获得锁
lock.lock();
// 如果仓库存储量不足
while (list.size() == 0) {
System.out.println("仓库已空,【" + consumer + "】: 暂时不能执行消费任务!");
try {
// 由于条件不满足,消费阻塞
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.remove();
System.out.println("【" + consumer + "】:消费了一个产品\t【现仓储量为】:" + list.size());
full.signalAll();
// 释放锁
lock.unlock();
}
public LinkedList<Object> getList() {
return list;
}
public void setList(LinkedList<Object> list) {
this.list = list;
}
public int getMAX_SIZE() {
return MAX_SIZE;
}
}
方法三的实现如下:
import java.util.concurrent.LinkedBlockingQueue;
public class Storage {
// 仓库最大存储量
private final int MAX_SIZE = 100;
// 仓库存储的载体
private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(100);
// 生产产品
public void produce(String producer) {
// 如果仓库已满
if (list.size() == MAX_SIZE) {
System.out.println("仓库已满,【" + producer + "】: 暂时不能执行生产任务!");
}
// 生产产品
try {
list.put(new Object());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("【" + producer + "】:生产了一个产品\t【现仓储量为】:" + list.size());
}
// 消费产品
public void consume(String consumer) {
// 如果仓库存储量不足
if (list.size() == 0) {
System.out.println("仓库已空,【" + consumer + "】: 暂时不能执行消费任务!");
}
try {
list.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("【" + consumer + "】:消费了一个产品\t【现仓储量为】:" + list.size());
}
public LinkedBlockingQueue<Object> getList() {
return list;
}
public void setList(LinkedBlockingQueue<Object> list) {
this.list = list;
}
public int getMAX_SIZE() {
return MAX_SIZE;
}
}
线程同步
同步锁synchronized
上面的wait(),await()
还有一种join方法,使线程顺序进行。
线程实战
因为需要在controller中同时上传多个视频并保存返回的视频id进数据库,所以在controller中写了一个内部类,具体代码如下:
class Ykupload implements Runnable {
@Override
public void run() {
System.out.println(tpVideoLibrary.toString());
YkuploadRunable yp = new YkuploadRunable();
//获取yp的数据并写进数据库
}
}
后记:貌似不能同时上传同一个文件,多个视频上传速度还是取决于你的带宽!.
线程池
线程池,顾名思义存放线程的池子,可以类比数据库的连接池。因为频繁地创建和销毁线程会给服务器带来很大的压力。若能将创建的线程不再销毁而是存放在池中等待下一个任务使用,可以不仅减少了创建和销毁线程所用的时间,提高了性能,同时还减轻了服务器的压力。
初始化线程池有五个核心参数,分别是 corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue。还有两个默认参数 threadFactory, handler
corePoolSize:线程池初始核心线程数。初始化线程池的时候,池内是没有线程,只有在执行任务的时会创建线程。
maximumPoolSize:线程池允许存在的最大线程数。若超过该数字,默认提示RejectedExecutionException异常
keepAliveTime:当前线程数大于核心线程时,该参数生效,其目的是终止多余的空闲线程等待新任务的最长时间。即指定时间内将还未接收任务的线程销毁。
unit:keepAliveTime 的时间单位
workQueue:缓存任务的的队列,一般采用LinkedBlockingQueue。
threadFactory:执行程序创建新线程时使用的工厂,一般采用默认值。
handler:超出线程范围和队列容量而使执行被阻塞时所使用的处理程序,一般采用默认值。
在接收任务前,线程池内是没有线程。只有当任务来了才开始新建线程。当任务数大于核心线程数时,任务进入队列中等待。若队列满了,则线程池新增线程直到最大线程数。再超过则会执行拒绝策略。
举个栗子:
/**
若有Thread1、Thread2、Thread3、Thread4四条线程分别统计C、D、E、F四个盘的大小,所有线程都统计完毕交给Thread5线程去做汇总,应当如何实现
**/
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ITDragonThreads {
public static void main(String[] args) throws Exception {
// 无缓冲无界线程池
ExecutorService executor = Executors.newFixedThreadPool(8);
/**
* CompletionService与ExecutorService最主要的区别在于
*前者submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装,
*内部维护一个保存Future对象的BlockingQueue。
*只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。
*它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有 完成的Future对象加入到Queue中。
*所以,先完成的必定先被取出。这样就减少了不必要的等待时间。
**/
CompletionService<Long> completion = new ExecutorCompletionService<Long>(executor);
CountWorker countWorker = null;
for (int i = 0; i < 4; i++) { // 四个线程负责统计
countWorker = new CountWorker(i+1);
completion.submit(countWorker);
}
// 关闭线程池
//shutdown()阻止新来的任务提交,对已经提交了的任务不会产生任何影响。当已经提交的任务执行完后,它会将那些闲置的线程(idleWorks)进行中断,这个过程是异步的。
executor.shutdown();
// 主线程相当于第五个线程,用于汇总数据
long total = 0;
for (int i = 0; i < 4; i++) {
total += completion.take().get();
}
System.out.println(total / 1024 / 1024 / 1024 +"G");
}
}
class CountWorker implements Callable<Long>{
private Integer type;
public CountWorker() {
}
public CountWorker(Integer type) {
this.type = type;
}
@Override
public Long call() throws Exception {
ArrayList<String> paths = new ArrayList<>(Arrays.asList("c:", "d:", "e:", "f:"));
return countDiskSpace(paths.get(type - 1));
}
// 统计磁盘大小
private Long countDiskSpace (String path) {
File file = new File(path);
long totalSpace = file.getTotalSpace();
System.out.println(path + " 总空间大小 : " + totalSpace / 1024 / 1024 / 1024 + "G");
return totalSpace;
}
}
