1.線程池管理類:
public class ThreadPoolManager {
private static ThreadPoolManager instance = new ThreadPoolManager();
private ExecutorService secLogThreadPool;
private ExecutorService sysLogThreadPool;
public ExecutorService getSysLogThreadPool() {
return sysLogThreadPool;
}
public void setSysLogThreadPool(ExecutorService sysLogThreadPool) {
this.sysLogThreadPool = sysLogThreadPool;
}
public ExecutorService getSecLogThreadPool() {
return secLogThreadPool;
}
public void setSecLogThreadPool(ExecutorService secLogThreadPool) {
this.secLogThreadPool = secLogThreadPool;
}
public static ThreadPoolManager getInstance(){
return instance;
}
private ThreadPoolManager() {
secLogThreadPool = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadPoolExecutor.CallerRunsPolicy());
sysLogThreadPool = Executors.newFixedThreadPool(3);
}
}
注:
線程池類為 Java.util.concurrent.ThreadPoolExecutor,常用構造方法為:
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
參數含義如下:
corePoolSize: 線程池維護線程的最少數量
maximumPoolSize:線程池維護線程的最大數量
keepAliveTime: 線程池維護線程所允許的空閑時間
unit: 線程池維護線程所允許的空閑時間的單位
workQueue: 線程池所使用的緩沖隊列
handler: 線程池對拒絕任務的處理策略
2. 生產者類:
public class SecLogProduceThread implements Runnable {
SecLogEntity entity = null;
public SecLogProduceThread(SecLogEntity entity) {
this.entity = entity;
}
@Override
public void run() {
SecLogStorage.getInstance().produce(entity);
}
}
3.消費者類:
public class SecLogConsumeThread implements Runnable {
@Override
public void run() {
while(true){
//TODO do something here
SecLogStorage.getInstance().consume();
}
}
}
4.日志倉儲類:BlockingQueue方式
public class SecLogStorage {
private final int MAX_SIZE = 100;
private LinkedBlockingDeque<SecLogEntity> list = new LinkedBlockingDeque<SecLogEntity>(MAX_SIZE);
private static SecLogStorage instance = new SecLogStorage();
private SecLogStorage() {
}
public static SecLogStorage getInstance() {
return instance;
}
public void produce(SecLogEntity seclog) {
if (list.size() == MAX_SIZE) {
System.out.println("seclog庫存量為" + MAX_SIZE + ",不能再繼續生產!");
}
try {
list.put(seclog);
System.out.println("生產SecLog:"+ JSONObject.fromObject(seclog));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public SecLogEntity consume(){
SecLogEntity entity = null;
if(list.isEmpty()){
System.out.println("seclog庫存量為0,不能再繼續消費!");
}
try {
entity = list.take();
System.out.println("消費SecLog:"+JSONObject.fromObject(entity));
} catch (InterruptedException e) {
e.printStackTrace();
}
return entity;
}
}
5. log bean :
public class SecLogEntity {
private String logName;
private String logSrc;
public String getLogName() {
return logName;
}
public void setLogName(String logName) {
this.logName = logName;
}
public String getLogSrc() {
return logSrc;
}
public void setLogSrc(String logSrc) {
this.logSrc = logSrc;
}
}
6. 測試類:
public class ThreadPoolTest {
public static void main(String[] args) {
SecLogEntity log1 = new SecLogEntity();
log1.setLogName("test1");
log1.setLogSrc("seclog1");
SecLogEntity log2 = new SecLogEntity();
log2.setLogName("test2");
log2.setLogSrc("seclog2");
SysLogEntity log3 = new SysLogEntity();
log3.setLogName("test3");
log3.setLogSrc("syslog1");
SysLogEntity log4 = new SysLogEntity();
log4.setLogName("test4");
log4.setLogSrc("syslog2");
ThreadPoolManager.getInstance().getSecLogThreadPool().execute(new SecLogProduceThread(log1));
ThreadPoolManager.getInstance().getSecLogThreadPool().execute(new SecLogProduceThread(log2));
ThreadPoolManager.getInstance().getSecLogThreadPool().execute(new SecLogConsumeThread());
ThreadPoolManager.getInstance().getSysLogThreadPool().execute(new SysLogProduceThread(log3));
ThreadPoolManager.getInstance().getSysLogThreadPool().execute(new SysLogProduceThread(log4));
ThreadPoolManager.getInstance().getSysLogThreadPool().execute(new SysLogConsumeThread());
}
}
7. 測試結果:
生產SecLog:{"logName":"test1","logSrc":"seclog1"}
生產syslog:{"logName":"test3","logSrc":"syslog1"}
消費syslog: {"logName":"test3","logSrc":"syslog1"}
生產SecLog:{"logName":"test2","logSrc":"seclog2"}
消費syslog: {"logName":"test4","logSrc":"syslog2"}
syslog庫存量為0,無法再消費syslog!
生產syslog:{"logName":"test4","logSrc":"syslog2"}
消費SecLog:{"logName":"test1","logSrc":"seclog1"}
消費SecLog:{"logName":"test2","logSrc":"seclog2"}
seclog庫存量為0,不能再繼續消費!
