一、前言
在我們日常的開發中,經常用到數據同步的更新,這時我們采用的是spring的定時任務和java的多線程進行數據的更新,進行時實的服務調用。
二.實現思路
1.創建線程類
2.創建ExecutorService線程連接池
3.調用線程池操作
4.spring的Scheduled(定時任務)配置
三.創建線程類
創建的線程類,我們采用的是實現Runnable接口,使用該類可以共享,在線程中要么繼承Thread,或者實現Runnable接口,其中Thread是實現了Runnable的接口,實現Runnable接口可以更好的擴展和共享類,因為java是單繼承多實現的程序。
public class ArchiveTask implements Runnable { /** * log. */ private static Logger logger = LoggerFactory.getLogger(ArchiveTask.class); //注:由於spring事務配置采用的注解的方式來配置傳播性配置,此類沒有在spring傳播配置里面,所以不能用自動裝配的方式來配置 private static FaxRecvArchiveCtrl faxRecvArchiveCtrl = AppContextAware.getBean(FaxRecvArchiveCtrl.class); private FaxRecvArchiveDTO faxRecvArchiveDTO; /** * CountDownLatch是一個同步輔助類,在一組線程在執行操作之前,允許一個或多線程處於等待 * 主要方法 * public CountDownLatch(int count); * public void countDown(); * public void await() throws InterruptedException * * 構造方法參數指定了計數的次數 * countDown方法,當前線程調用此方法,則計數減一 * awaint方法,調用此方法會一直阻塞當前線程,直到計時器的值為0 * */ private CountDownLatch latch; public ArchiveTask(FaxRecvArchiveDTO faxRecvArchiveDTO, CountDownLatch latch) { this.faxRecvArchiveDTO = faxRecvArchiveDTO; this.latch = latch; } @Override public void run() { try { faxRecvArchiveCtrl.updateArchiveFileFlag(ServiceFactory.getCurrentSessionId(), faxRecvArchiveDTO); } catch (Exception ex) { logger.debug("["+this.faxRecvArchiveDTO.getFaxKey()+"]線程執行異常,錯誤信息為:"+ex.getMessage()); } finally{ logger.debug("線程執行完:"+this.faxRecvArchiveDTO.getFaxKey()); latch.countDown(); } } }
四、調用線程池操作
public class EasyFaxArchiveFileMgr { /** * 日志 */ Logger logger = LoggerFactory.getLogger(EasyFaxArchiveFileMgr.class); /** * 線程池數量 */ private int ThreadNum = 5; private static FaxRecvArchiveCtrl faxRecvArchiveCtrl = AppContextAware.getBean(FaxRecvArchiveCtrl.class); private static List<FaxRecvArchiveDTO> noArchiveList = new ArrayList<FaxRecvArchiveDTO>(); <br> public void archiveFile() throws ShineException{ noArchiveList = faxRecvArchiveCtrl.getFaxRecvNoArchiveFile(); ExecutorService executorService = Executors.newFixedThreadPool(ThreadNum); List<ArchiveTask> taskList = new ArrayList<ArchiveTask>(); synchronized (noArchiveList) { while(CollectionUtils.isEmpty(noArchiveList) == false) { try { //定義計數器 int countLatch = (noArchiveList.size() > ThreadNum) ? ThreadNum :noArchiveList.size(); CountDownLatch latch = new CountDownLatch(countLatch); //組織任務 for(int i = 0; i < ThreadNum; i++){ if(CollectionUtils.isEmpty(noArchiveList) == false) { taskList.add(new ArchiveTask(noArchiveList.remove(0),latch)); }else{ break; } } //提交任務 if (CollectionUtils.isEmpty(taskList) == false) { for (int i = 0, size = taskList.size(); i < size; i++) { executorService.submit(taskList.get(i)); } }//等待所有線程都執行完 latch.await(); } catch(Exception ex) { logger.error("線程池處理出錯,原因:"+ex.getMessage()); } finally { taskList.clear(); }<br> //關閉線程連接池 executorService.shutdown(); } } } }
五.spring配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"> <bean id="timerFactory" class="org.springframework.scheduling.timer.TimerFactoryBean"> <property name="scheduledTimerTasks"> <list> <ref bean="archiveFileScheduler"/> </list> </property> </bean><br> <bean id="archiveFileScheduler" class="org.springframework.scheduling.timer.ScheduledTimerTask"> <property name="delay"> <!-- 啟動30秒后開始運行 --> <value>30000</value> </property> <property name="period"> <!-- 每10秒運行 --> <value>10000</value> </property> <property name="timerTask"> <ref bean="archiveFileHandleMethodInvoker" /> </property> </bean> <bean id="archiveFileHandleMethodInvoker" class="org.springframework.scheduling.timer.MethodInvokingTimerTaskFactoryBean"> <property name="targetObject"> <ref bean="com.shine.emofs.thirdparty.easyfax.ctrl.EasyFaxArchiveFileMgr" /> </property> <property name="targetMethod"> <value>archiveFile</value> </property> </bean> </beans>
六、參考:
ExecutorService線程連接池:http://www.itzhai.com/the-executorservice-common-method-newfixedthreadpool-of-create-fixed-size-thread-pool.html#read-more