前言背景
在做新項目,作為中間件的項目,主要做數據服務。這次想把項目做的簡潔一些,之前用的什么ActiveMq等中間件產品,這次全部不用,能自己實現就自己實現。自己用BlockingQueue阻塞隊列,按照自己的數據量,1G內存也能存上兩千多萬數據。設計上,需要一個線程去阻塞隊列中拿數據,必須是系統啟動的時候就去取。沒有則阻塞,直到有數據來。
首先一個問題是,在spring項目中,自定義的New對象和線程,是不受spring管理的。所以在以前的處理中,經常是寫一個單例,獲取ApplicationContext,這種方式是可行的。但是在用注解的時候,使用這種方式,看上去會比較丑陋。所以,我們還是希望,在使用線程的時候,可以 像以前那樣,可以自動的注入我們需要的bean。
開始動手
還是用實際的項目來舉個栗子。
作為數據服務,入口是一個webservices的接口,這里因為歷史原因,還是采用soap方式。
入口如下:
@WebMethod
public FeedResult send(NocPacket nocPacket) {
logger.info("Receive data");
QueueManager.getInstance().put(nocPacket,Const.nocQueue);
return "successful";
}
這里采用生產者和消費者的模式,其中的Const.nocQueue,是我們的BlockingQueue,作為數據緩沖區。
既然如此,還應該有一個消費者。
之前說過,我們要在系統啟動的時候,就去queue中取我們的數據,有則拿出來,作為業務處理,沒有則阻塞,等待數據到來。同時,拿到數據后,將這些數據入庫。
這里需要兩個類,一個是隨系統一起啟動的,暫且叫守護程序吧。另一個是做主業務的。
守護程序代碼:
/**
* Created by Wl on 15-6-17.
* 業務消費者
*/
@Component
public class Business {
private Logger logger = Logger.getLogger(Business.class);
@Autowired
private FlowDataService<FlowData> flowDataService;
public void doBusi() {
logger.info("業務線程開始");
ExecutorService executor = Executors.newCachedThreadPool();
BusiTask task = new BusiTask(Const.nocQueue,flowDataService);
executor.submit(task);
FlowData data = flowDataService.queryById(1);
logger.info("Get Data :"+data.getPlateNo());
}
}
處理業務的類:
/**
* Created by Wl on 15-6-17.
* 業務處理主方法
*/
public class BusiTask implements Callable<BlockingQueue<NocPacket>> {
private Logger logger = Logger.getLogger(BusiTask.class);
private BlockingQueue<NocPacket> queue;
private FlowDataService<FlowData> flowDataService;
public BusiTask(BlockingQueue<NocPacket> queue,FlowDataService<FlowData> flowDataService) {
this.queue = queue;
this.flowDataService = flowDataService;
}
public BlockingQueue<NocPacket> call() throws Exception {
NocPacket nocPacket = queue.take();
logger.info("Data:"+nocPacket.getGWName());
Payload sc = NocUtil.getPayload(nocPacket);
FlowData data = new FlowData();
flowDataService.add(data);
logger.info("成功插入數據");
return null;
}
}
此處,守護程序是隨系統一起啟動的,這一點可以用spring配置。這個守護程序不能有特殊性,它不能是我們自建的一個線程,也不能new一個方法,再去調用doBusi,因為這樣就不受spring管理了。它的第一個目的是拿到注入的service。
這兩個類,采用Future和Callable的方式進行異步線程處理的。如果直接去處理BusiTask中的任務,就會阻塞到queue的take上。同時,通過守護程序,BusiTask也可以拿到services,進行入庫等操作。
結尾
spring是可以進行多線程開發的,目前沒使用過,以后有時間好好研究下。