每次都是隔很長時間才在博客中寫點什么,說自己忙吧,這是給自己的一個借口,其實呢還是懶啊。哎。。。
最近項目中有個對比的需求,需要從日志文件中獲取到參數,然后調用不同的API,進行結果的對比。但是不知用什么方式比較好,於是查了下jdk的手冊,發現了BlockingQueue這個好東西。
關於BlockingQueue的介紹,大家有興趣的可以自己看下:http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html
需求呢其實很簡單就是將參數放置到Queue中,然后交由下一個策略去消費。剛開始時是通過不同的線程往隊列中存放數據,然后返回給下個服務一個BlockingQueue的對象,下一個策略從隊列中消費,code如下:
@SuppressWarnings("rawtypes")
@Override
public BlockingQueue getTxtLogContent(String path) {
File file = new File(path);
BufferedReader reader = null;
String tempStr = null;
final BlockingQueue queue = new LinkedBlockingQueue();
try {
reader = new BufferedReader(new FileReader(file));
while ((tempStr = reader.readLine()) != null) {
final InputOutputPrameters parameter = new InputOutputPrameters();
String[] list = tempStr.split(";");
if (list != null && list.length > 0) {
parameter.setInputParamters(list[0]);
parameter.setOutputParameters(list[1]);
}
new Thread(){
@SuppressWarnings("unchecked")
public void run(){
try {
Thread.sleep((long)(Math.random()*100));
log.info("開始存入數據!");
queue.put(parameter);
log.info("已經存入數據,目前隊列中有 " + queue.size() +" 個隊列!輸入參數:"+ parameter.getInputParamters() + ";\n輸出參數:" + parameter.getOutputParameters());
} catch (Exception e) {
log.error("系統異常:" + e);
}
}
}.start();
}
reader.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally {
if (reader != null) {
try {
reader.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
return queue;
}
可是在實際運行時,由於日志比較大,下一個策略可能要等1hour或更長的時間才能開始處理,這明顯是不符合要求的,於是又優化了下,將BlockingQueue改為全局static的,然后下一個策略可以直接監控這個隊列中是否有值,有值就消費,沒值就阻塞線程等待或者超時等其他處理。
改進后的code:
1、新建一個隊列類:
public class ParameterQueue extends LinkedBlockingQueue<InputOutputPrameters> {
/**
*@Fields serialVersionUID:
*/
private static final long serialVersionUID = 6032356446145302484L;
private static BlockingQueue<InputOutputPrameters> queue = new LinkedBlockingQueue<InputOutputPrameters>();
/**
* @Fields log: 日志記錄
*/
private static final Logger log = LoggerFactory
.getLogger(ParameterQueue.class);
/**
* 獲取隊列中的對象
* @Method:getParameter
* @Description: 獲取隊列中的對象
* @return 獲取到的對象信息
*/
public static InputOutputPrameters getParameter(){
InputOutputPrameters result = null;
try {
result = (InputOutputPrameters)queue.take();
} catch (Exception e) {
log.error("獲取隊列異常,異常信息:" + e);
}
return result;
}
/**
* 獲取隊列的數量
* @Method:getQueueSize
* @Description: 獲取隊列的數量
* @return 數量
*/
public static Integer getQueueSize() {
return queue.size();
}
/**
* 放置參數到隊列中
* @Method:putParameter
* @Description: 放置參數到隊列中
* @param parameter 要放置的對象
*/
public static void putParameter(InputOutputPrameters parameter) {
try {
queue.put(parameter);
} catch (Exception e) {
log.error("插入隊列異常,異常信息:" + e);
}
}
}
2、讀取文件時,直接操作該隊列,往隊列中put值,下一個策略從該隊列中get值,put的code如下:
public void getSource(String path) {
try {
File file = new File(path);
BufferedReader reader = null;
String tempStr = null;
try {
reader = new BufferedReader(new FileReader(file));
while ((tempStr = reader.readLine()) != null) {
final InputOutputPrameters parameter = new InputOutputPrameters();
String[] list = tempStr.split(";");
if (list != null && list.length > 0) {
parameter.setInputParamters(list[0]);
parameter.setOutputParameters(list[1]);
}
putInQueue(parameter);
}
reader.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally {
if (reader != null) {
try {
reader.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
} catch (Exception e) {
log.error("系統異常: " + e);
}
}
/**
* 將參數存放至隊列中
* @Method:putInQueue
* @Description: 將參數存放至隊列中
* @param parameter 要存放的對象
*/
private void putInQueue(final InputOutputPrameters parameter) {
new Thread(){
public void run(){
try {
Thread.sleep((long)(Math.random()*100));
log.info("開始存入數據!");
ParameterQueue.putParameter(parameter);
log.info("已經存入數據,目前隊列中有 " + ParameterQueue.getQueueSize() +" 個隊列!輸入參數:"+ parameter.getInputParamters() + ";\n輸出參數:" + parameter.getOutputParameters());
} catch (Exception e) {
log.error("系統異常:" + e);
}
}
}.start();
}
於是這個要求就達到了。記錄下這個小需求,方便以后查閱。
簡要說下,BlockingQueue是線程安全的,常用的是ArrayBlockingQueue、LinkedBlockingQueue
ArrayBlockingQueue需要制定容量,而LinkedBlockingQueue不需要
同時在消費時,take()是會阻塞線程的,如果是單線程跑時,take()不到時整個線程就卡了
所以看具體環境需求,是用take還是其他的,我一般用poll,因為可以制定超時時間。
哎 不知道怎么寫了,就這樣吧。
