簡介:
生產者、消費者模型是多線程編程的常見問題,最簡單的一個生產者、一個消費者線程模型大多數人都能夠寫出來,但是一旦條件發生變化,我們就很容易掉進多線程的bug中。這篇文章主要講解了生產者和消費者的數量,商品緩存位置數量,商品數量等多個條件的不同組合下,寫出正確的生產者消費者模型的方法。
歡迎探討,如有錯誤敬請指正
如需轉載,請注明出處 http://www.cnblogs.com/nullzx/
定義商品類
package demo;
/*定義商品*/
public class Goods {
public final String name;
public final int price;
public final int id;
public Goods(String name, int price, int id){
this.name = name; /*類型*/
this.price = price; /*價格*/
this.id = id; /*商品序列號*/
}
@Override
public String toString(){
return "name: " + name + ", price:"+ price + ", id: " + id;
}
}
基本要求:
1)生產者不能重復生產一個商品,也就是說不能有兩個id相同的商品
2)生產者不能覆蓋一個商品(當前商品還未被消費,就被下一個新商品覆蓋)。也就是說消費商品時,商品的id屬性可以不連續,但不能出現缺號的情況
3)消費者不能重復消費一個商品
1. 生產者線程無限生產,消費者線程無限消費 的模式
1.1使用線程對象,一個生產者線程,一個消費者線程,一個商品存儲位置
package demo;
import java.util.Random;
/*使用線程對象,一個緩存位置,一個生產者,一個消費者,無限生產商品消費商品*/
public class ProducterComsumerDemo1 {
/*定義一個商品緩存位置*/
private volatile Goods goods;
/*定義一個對象作為鎖,不使用goods作為鎖是因為生產者每次會產生一個新的對象*/
private Object obj = new Object();
/*isFull == true 生產者線程休息,消費者線程消費
*isFull == false 消費者線程休息,生產者線程生產*/
private volatile boolean isFull = false;
/*商品的id編號,生產者制造的每個商品的id都不一樣,每生產一個id自增1*/
private int id = 1;
/*隨機產生一個sleep時間*/
private Random rnd = new Random();
/*=================定義消費者線程==================*/
public class ComsumeThread implements Runnable{
@Override
public void run(){
try{
while(true){
/*獲取obj對象的鎖, id 和 isFull 的操作都在同步代碼塊中*/
synchronized(obj){
if(!isFull){
/*wait方法使當前線程阻塞,並釋放鎖*/
obj.wait();
}
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(250));
/*模擬消費商品*/
System.out.println(goods);
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(250));
isFull = false;
/*喚醒阻塞obj上的生產者線程*/
obj.notify();
}
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(250));
}
}catch (InterruptedException e){
/*什么都不做*/
}
}
}
/*=================定義生產者線程==================*/
public class ProductThread implements Runnable{
@Override
public void run(){
try {
while(true){
synchronized(obj){
if(isFull){
obj.wait();
}
Thread.sleep(rnd.nextInt(500));
/*如果id為偶數,生產價格為2的產品A
*如果id為奇數,生產價格為1的產品B*/
if(id % 2 == 0){
goods = new Goods("A", 2, id);
}else{
goods = new Goods("B", 1, id);
}
Thread.sleep(rnd.nextInt(250));
id++;
isFull = true;
/*喚醒阻塞的消費者線程*/
obj.notify();
}
}
} catch (InterruptedException e) {
/*什么都不做*/
}
}
}
public static void main(String[] args) throws InterruptedException{
ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
Runnable c = pcd.new ComsumeThread();
Runnable p = pcd.new ProductThread();
new Thread(p).start();
new Thread(c).start();
}
}
運行結果
name: B, price:1, id: 1 name: A, price:2, id: 2 name: B, price:1, id: 3 name: A, price:2, id: 4 name: B, price:1, id: 5 name: A, price:2, id: 6 name: B, price:1, id: 7 name: A, price:2, id: 8 name: B, price:1, id: 9 name: A, price:2, id: 10 name: B, price:1, id: 11 name: A, price:2, id: 12 name: B, price:1, id: 13 ……
從結果看出,商品類型交替生產,每個商品的id都不相同,且不會漏過任何一個id,生產者沒有重復生產,消費者沒有重復消費,結果完全正確。
1.2. 使用線程對象,多個生產者線程,多個消費者線程,1個緩存位置
1.2.1一個經典的bug
對於多生產者,多消費者這個問題,看起來我們似乎不用修改代碼,只需在main方法中多添加幾個線程就好。假設我們需要三個消費者,一個生產者,那么我們只需要在main方法中再添加兩個消費者線程。
public static void main(String[] args) throws InterruptedException{
ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
Runnable c = pcd.new ComsumeThread();
Runnable p = pcd.new ProductThread();
new Thread(c).start();
new Thread(p).start();
new Thread(c).start();
new Thread(c).start();
}
運行結果
name: B, price:1, id: 1 name: A, price:2, id: 2 name: A, price:2, id: 2 name: B, price:1, id: 3 name: B, price:1, id: 3 name: A, price:2, id: 4 name: A, price:2, id: 4 name: B, price:1, id: 5 name: B, price:1, id: 5 name: A, price:2, id: 6 ……
從結果中,我們發現消費者重復消費了商品,所以這樣做顯然是錯誤的。這里我們定義多個消費者,一個生產者,所以遇到了重復消費的問題,如果定義成一個消費者,多個生產者就會遇到id覆蓋的問題。如果我們定義多個消費者,多個生產者,那么即會遇到重復消費,也會遇到id覆蓋的問題。注意,上面的代碼使用的notifyAll喚醒方法,如果使用notify方法喚醒bug仍然可能發生。
現在我們來分析一下原因。當生產者生產好了商品,會喚醒因沒有商品而阻塞消費者線程,假設喚醒的消費者線程超過兩個,這兩個線程會競爭獲取鎖,獲取到鎖的線程就會從obj.wait()方法中返回,然后消費商品,並把isFull置為false,然后釋放鎖。當被喚醒的另一個線程競爭獲取到鎖了以后也會從obj.wait()方法中返回。會再次消費同一個商品。顯然,每一個被喚醒的線程應該再次檢查isFull這個條件。所以無論是消費者,還是生產者,isFull的判斷必須改成while循環,這樣才能得到正確的結果而不受生產者的線程數和消費者的線程數的影響。
而對於只有一個生產者線程,一個消費者線程,用if判斷是沒有問題的,但是仍然強烈建議改成while語句進行判斷。
1.2.2正確的姿勢
package demo;
import java.util.Random;
/*使用線程對象,一個緩存位置,一個生產者,一個消費者,無限生產商品消費商品*/
public class ProducterComsumerDemo1 {
/*定義一個商品緩存位置*/
private volatile Goods goods;
/*定義一個對象作為鎖,不使用goods作為鎖是因為生產者每次會產生一個新的對象*/
private Object obj = new Object();
/*isFull == true 生產者線程休息,消費者線程消費
*isFull == false 消費者線程消費,生產者線程生產*/
private volatile boolean isFull = false;
/*商品的id編號,生產者制造的每個商品的id都不一樣,每生產一個id自增1*/
private int id = 1;
/*隨機產生一個sleep時間*/
private Random rnd = new Random();
/*=================定義消費者線程==================*/
public class ComsumeThread implements Runnable{
@Override
public void run(){
try{
while(true){
/*獲取obj對象的鎖, id 和 isFull 的操作都在同步代碼塊中*/
synchronized(obj){
while(!isFull){
/*wait方法使當前線程阻塞,並釋放鎖*/
obj.wait();
}
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(250));
/*模擬消費商品*/
System.out.println(goods);
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(250));
isFull = false;
/*喚醒阻塞obj上的生產者線程*/
obj.notifyAll();
}
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(250));
}
}catch (InterruptedException e){
/*我就是任性,這里什么都不做*/
}
}
}
/*=================定義生產者線程==================*/
public class ProductThread implements Runnable{
@Override
public void run(){
try {
while(true){
synchronized(obj){
while(isFull){
obj.wait();
}
Thread.sleep(rnd.nextInt(500));
/*如果id為偶數,生產價格為2的產品A
如果id為奇數,生產價格為1的產品B*/
if(id % 2 == 0){
goods = new Goods("A", 2, id);
}else{
goods = new Goods("B", 1, id);
}
Thread.sleep(rnd.nextInt(250));
id++;
isFull = true;
/*喚醒阻塞的消費者線程*/
obj.notifyAll();
}
}
} catch (InterruptedException e) {
/*我就是任性,這里什么都不做*/
}
}
}
public static void main(String[] args) throws InterruptedException{
ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
Runnable c = pcd.new ComsumeThread();
Runnable p = pcd.new ProductThread();
new Thread(p).start();
new Thread(p).start();
new Thread(p).start();
new Thread(c).start();
new Thread(c).start();
new Thread(c).start();
}
}
1.3 使用線程對象,多個緩存位置(有界),多生產者,多消費者
1)當緩存位置滿時,我們應該阻塞生產者線程
2)當緩存位置空時,我們應該阻塞消費者線程
下面的代碼我沒有用java對象內置的鎖,而是用了ReentrantLock對象。是因為普通對象的鎖只有一個阻塞隊列,如果使用notify方式,無法保證喚醒的就是特定類型的線程(消費者線程或生產者線程),而notifyAll方法會喚醒所有的線程,當剩余的緩存商品的數量小於生產者線程數量或已緩存商品的數量小於消費者線程時效率就比較低。所以這里我們通過ReentrantLock對象構造兩個阻塞隊列提高效率。
1.3.1 普通方式
package demo;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*使用線程對象,多個緩存位置(有界),多生產者,多消費者,無限循環模式*/
public class ProducterComsumerDemo2 {
/*最大緩存商品數*/
private final int MAX_SLOT = 2;
/*定義緩存商品的容器*/
private LinkedList<Goods> queue = new LinkedList<Goods>();
/*定義線程鎖和鎖對應的阻塞隊列*/
private Lock lock = new ReentrantLock();
private Condition full = lock.newCondition();
private Condition empty = lock.newCondition();
/*商品的id編號,生產者制造的每個商品的id都不一樣,每生產一個id自增1*/
private int id = 1;
/*隨機產生一個sleep時間*/
private Random rnd = new Random();
/*=================定義消費者線程==================*/
public class ComsumeThread implements Runnable{
@Override
public void run(){
while(true){
/*加鎖,queue的出列操作都在同步代碼塊中*/
lock.lock();
try {
while(queue.isEmpty()){
System.out.println("queue is empty");
empty.await();
}
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(200));
/*模擬消費商品*/
Goods goods = queue.remove();
System.out.println(goods);
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(200));
/*喚醒阻塞的生產者線程*/
full.signal();
} catch (InterruptedException e) {
/*什么都不做*/
}finally{
lock.unlock();
}
/*釋放鎖后隨機延時一段時間*/
try {
Thread.sleep(rnd.nextInt(200));
} catch (InterruptedException e) {
/*什么都不做*/
}
}
}
}
/*=================定義生產者線程==================*/
public class ProductThread implements Runnable{
@Override
public void run(){
while(true){
/*加鎖,queue的入列操作,id操作都在同步代碼塊中*/
lock.lock();
try{
while(queue.size() == MAX_SLOT){
System.out.println("queue is full");
full.await();
}
Thread.sleep(rnd.nextInt(200));
Goods goods = null;
/*根據序號產生不同的商品*/
switch(id%3){
case 0 : goods = new Goods("A", 1, id);
break;
case 1 : goods = new Goods("B", 2, id);
break;
case 2 : goods = new Goods("C", 3, id);
break;
}
Thread.sleep(rnd.nextInt(200));
queue.add(goods);
id++;
/*喚醒阻塞的消費者線程*/
empty.signal();
}catch(InterruptedException e){
/*什么都不做*/
}finally{
lock.unlock();
}
/*釋放鎖后隨機延時一段時間*/
try {
Thread.sleep(rnd.nextInt(100));
} catch (InterruptedException e) {
/*什么都不做*/
}
}
}
}
/*=================main==================*/
public static void main(String[] args) throws InterruptedException{
ProducterComsumerDemo2 pcd = new ProducterComsumerDemo2();
Runnable c = pcd.new ComsumeThread();
Runnable p = pcd.new ProductThread();
/*兩個生產者線程,兩個消費者線程*/
new Thread(p).start();
new Thread(p).start();
new Thread(c).start();
new Thread(c).start();
}
}
運行結果
queue is empty queue is empty name: B, price:2, id: 1 name: C, price:3, id: 2 name: A, price:1, id: 3 queue is full name: B, price:2, id: 4 name: C, price:3, id: 5 queue is full name: A, price:1, id: 6 name: B, price:2, id: 7 name: C, price:3, id: 8 name: A, price:1, id: 9 name: B, price:2, id: 10 name: C, price:3, id: 11 name: A, price:1, id: 12 name: B, price:2, id: 13 name: C, price:3, id: 14 ……
1.3.2 更優雅的實現方式
下面使用線程池(ThreadPool)和阻塞隊列(LinkedBlockingQueue)原子類(AtomicInteger)以更加優雅的方式實現上述功能。LinkedBlockingQueue阻塞隊列僅在take和put方法上鎖,所以id必須定義為原子類。
package demo;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/*使用線程對象,多個緩存位置(有界),多生產者,多消費者,無限循環模式*/
public class ProducterComsumerDemo4 {
/*最大緩存商品數*/
private final int MAX_SLOT = 3;
/*定義緩存商品的容器*/
private LinkedBlockingQueue<Goods> queue = new LinkedBlockingQueue<Goods>(MAX_SLOT);
/*商品的id編號,生產者制造的每個商品的id都不一樣,每生產一個id自增1*/
private AtomicInteger id = new AtomicInteger(1);
/*隨機產生一個sleep時間*/
private Random rnd = new Random();
/*=================定義消費者線程==================*/
public class ComsumeThread implements Runnable{
@Override
public void run(){
while(true){
try {
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(200));
/*模擬消費商品*/
Goods goods = queue.take();
System.out.println(goods);
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(200));
} catch (InterruptedException e) {
/*什么都不做*/
}
}
}
}
/*=================定義生產者線程==================*/
public class ProductThread implements Runnable{
@Override
public void run(){
while(true){
try{
int x = id.getAndIncrement();
Goods goods = null;
Thread.sleep(rnd.nextInt(200));
/*根據序號產生不同的商品*/
switch(x%3){
case 0 : goods = new Goods("A", 1, x);
break;
case 1 : goods = new Goods("B", 2, x);
break;
case 2 : goods = new Goods("C", 3, x);
break;
}
Thread.sleep(rnd.nextInt(200));
queue.put(goods);
Thread.sleep(rnd.nextInt(100));
}catch(InterruptedException e){
/*什么都不做*/
}
}
}
}
/*=================main==================*/
public static void main(String[] args) throws InterruptedException{
ProducterComsumerDemo4 pcd = new ProducterComsumerDemo4();
Runnable c = pcd.new ComsumeThread();
Runnable p = pcd.new ProductThread();
/*定義線程池*/
ExecutorService es = Executors.newCachedThreadPool();
/*三個生產者線程,兩個消費者線程*/
es.execute(p);
es.execute(p);
es.execute(p);
es.execute(c);
es.execute(c);
es.shutdown();
}
}
2. 有限商品個數
這個問題顯然比上面的問題要復雜不少,原因在於要保證緩存區的商品要全部消費掉,沒有重復消費商品,沒有覆蓋商品,同時還要保證所有線程能夠正常結束,防止存在一直阻塞的線程。
2.1 使用線程對象,多個緩存位置(有界),多生產者,多消費者
思路 定義一下三個變量
/*需要生產的總商品數*/ private final int TOTAL_NUM = 30; /*已產生的數量*/ private volatile int productNum = 0; /*已消耗的商品數*/ private volatile int comsumedNum = 0;
每生產一個商品 productNum 自增1,直到TOTAL_NUM為止,如果不滿足條件 productNum < TOTAL_NUM 則結束進程,自增操作必須在full.await()方法調用之前,防止生產者線程無法喚醒。
同理,每消費一個商品 comsumedNum 自增1,直到TOTAL_NUM為止,如果不滿足條件 comsumedNum < TOTAL_NUM 則結束進程,自增操作必須在empty.await()方法調用之前,防止消費者線程無法喚醒。
comsumedNum和productNum相當於計划經濟時代的糧票一樣,有了它能夠保證生產者線程在喚醒后一定需要生產一個商品,消費者線程在喚醒以后一定能夠消費一個商品
package demo;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*使用線程對象,多個緩存位置(有界),多生產者,多消費者, 有限商品個數*/
public class ProducterComsumerDemo3 {
/*需要生產的總商品數*/
private final int TOTAL_NUM = 30;
/*已產生的數量*/
private volatile int productNum = 0;
/*已消耗的商品數*/
private volatile int comsumedNum = 0;
/*最大緩存商品數*/
private final int MAX_SLOT = 2;
/*定義線程公用的鎖和條件*/
private Lock lock = new ReentrantLock();
private Condition full = lock.newCondition();
private Condition empty = lock.newCondition();
/*定義緩存商品的容器*/
private LinkedList<Goods> queue = new LinkedList<Goods>();
/*商品的id編號,生產者制造的每個商品的id都不一樣,每生產一個id自增1*/
private int id = 1;
/*隨機產生一個sleep時間*/
private Random rnd = new Random();
/*=================定義消費者線程==================*/
public class ComsumeThread implements Runnable{
@Override
public void run(){
while(true){
/*加鎖, id、comsumedNum 操作都在同步代碼塊中*/
lock.lock();
try {
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(250));
if(comsumedNum < TOTAL_NUM){
comsumedNum++;
}else{
/*這里會自動執行finally的語句,釋放鎖*/
break;
}
while(queue.isEmpty()){
System.out.println("queue is empty");
empty.await();
}
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(250));
/*模擬消費商品*/
Goods goods = queue.remove();
System.out.println(goods);
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(250));
/*喚醒阻塞的生產者線程*/
full.signal();
} catch (InterruptedException e) {
}finally{
lock.unlock();
}
/*釋放鎖后,隨機延時一段時間*/
try {
Thread.sleep(rnd.nextInt(250));
} catch (InterruptedException e) {
}
}
System.out.println(
"customer "
+ Thread.currentThread().getName()
+ " is over");
}
}
/*=================定義生產者線程==================*/
public class ProductThread implements Runnable{
@Override
public void run(){
while(true){
lock.lock();
try{
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(250));
if(productNum < TOTAL_NUM){
productNum++;
}else{
/*這里會自動執行finally的語句,釋放鎖*/
break;
}
Thread.sleep(rnd.nextInt(250));
while(queue.size() == MAX_SLOT){
System.out.println("queue is full");
full.await();
}
Thread.sleep(rnd.nextInt(250));
Goods goods = null;
/*根據序號產生不同的商品*/
switch(id%3){
case 0 : goods = new Goods("A", 1, id);
break;
case 1 : goods = new Goods("B", 2, id);
break;
case 2 : goods = new Goods("C", 3, id);
break;
}
queue.add(goods);
id++;
/*喚醒阻塞的消費者線程*/
empty.signal();
}catch(InterruptedException e){
}finally{
lock.unlock();
}
/*釋放鎖后,隨機延時一段時間*/
try {
Thread.sleep(rnd.nextInt(250));
} catch (InterruptedException e) {
/*什么都不做*/
}
}
System.out.println(
"producter "
+ Thread.currentThread().getName()
+ " is over");
}
}
/*=================main==================*/
public static void main(String[] args) throws InterruptedException{
ProducterComsumerDemo3 pcd = new ProducterComsumerDemo3();
ComsumeThread c = pcd.new ComsumeThread();
ProductThread p = pcd.new ProductThread();
new Thread(p).start();
new Thread(p).start();
new Thread(p).start();
new Thread(c).start();
new Thread(c).start();
new Thread(c).start();
System.out.println("main Thread is over");
}
}
2.2利用線程池,原子類,阻塞隊列,以更優雅的方式實現
LinkedBlockingQueue阻塞隊列僅在take和put方法上鎖,所以productNum和comsumedNum必須定義為原子類。
package demo;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/*使用線程池,多個緩存位置(有界),多生產者,多消費者, 有限商品個數*/
public class LinkedBlockingQueueDemo {
/*需要生產的總商品數*/
private final int TOTAL_NUM = 20;
/*已產生商品的數量*/
volatile AtomicInteger productNum = new AtomicInteger(0);
/*已消耗的商品數*/
volatile AtomicInteger comsumedNum = new AtomicInteger(0);
/*最大緩存商品數*/
private final int MAX_SLOT = 5;
/*同步阻塞隊列,隊列容量為MAX_SLOT*/
private LinkedBlockingQueue<Goods> lbq = new LinkedBlockingQueue<Goods>(MAX_SLOT);
/*隨機數*/
private Random rnd = new Random();
/*pn表示產品的編號,產品編號從1開始*/
private volatile AtomicInteger pn = new AtomicInteger(1);
/*=================定義消費者線程==================*/
public class CustomerThread implements Runnable{
@Override
public void run(){
while(comsumedNum.getAndIncrement() < TOTAL_NUM){
try{
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(500));
/*從隊列中取出商品,隊列空時發生阻塞*/
Goods goods = lbq.take();
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(500));
/*模擬消耗商品*/
System.out.println(goods);
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(500));
}catch(InterruptedException e){
}
}
System.out.println(
"customer "
+ Thread.currentThread().getName()
+ " is over");
}
}
/*=================定義生產者線程==================*/
public class ProducerThread implements Runnable{
@Override
public void run(){
while(productNum.getAndIncrement() < TOTAL_NUM){
try {
int x = pn.getAndIncrement();
Goods goods = null;
/*根據序號產生不同的商品*/
switch(x%3){
case 0 : goods = new Goods("A", 1, x);
break;
case 1 : goods = new Goods("B", 2, x);
break;
case 2 : goods = new Goods("C", 3, x);
break;
}
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(500));
/*產生的新產品入列,隊列滿時發生阻塞*/
lbq.put(goods);
/*隨機延時一段時間*/
Thread.sleep(rnd.nextInt(500));
} catch (InterruptedException e1) {
/*什么都不做*/
}
}
System.out.println(
"producter "
+ Thread.currentThread().getName()
+ " is over ");
}
}
/*=================main==================*/
public static void main(String[] args){
LinkedBlockingQueueDemo lbqd = new LinkedBlockingQueueDemo();
Runnable c = lbqd.new CustomerThread();
Runnable p = lbqd.new ProducerThread();
ExecutorService es = Executors.newCachedThreadPool();
es.execute(c);
es.execute(c);
es.execute(c);
es.execute(p);
es.execute(p);
es.execute(p);
es.shutdown();
System.out.println("main Thread is over");
}
}
