利用多線程可以通過並行執行任務來提升效率,但是很多場景下,不是所有的任務都是可以一起執行的,現實情況是有的任務必須要等到之前那些可並行的任務都執行后才可以繼續執行的。考慮如下任務場景:
任務一和任務二可並行執行,但是任務三必須等到一、二執行完后才能執行,任務四執行完后任務五、六、七才能再並行,一般我們的實現方式是按階段順序處理,在可並行處利用多線程手段,但是這種方式往往依賴任務的特點,下面我提供可串並新的框架,根據任務的配置靈活實現並行處理及串行保證,此框架我將會利用jdk7引入的fork-join及akka並行處理框架來分別實現。
實現思路:
將要執行的任務抽象一下,引入一個標記參數isAsyn來表示是否可異步執行,如果true,表示當前任務可以與之前環節的任務並行執行,如果為false,表示當前任務為串行任務,必須等到之前的任務直接完成后才可以執行。引入此標記后,上述任務場景參數分別為:
任務一(isAsyn=true),
任務二(isAsyn=true),
任務三(isAsyn=false),
任務四(isAsyn=false),
任務五(isAsyn=true),
任務六(isAsyn=true),
任務七(isAsyn=true)。此場景的任務按先后組合在一起即為整個任務鏈,並行處理框架得到此場景的任務鏈后,從該場景中解析分辨出可並行和需要串行的任務子鏈,對可並行的任務鏈利用多線程執行,對需串行的任務鏈需要在執行前進行一下判斷,即如果有它之前的任務鏈正在處理,則等待其執行完成,否則執行此串行鏈。
實現方式一:ForkJoin實現
- 將任務放入到待處理鏈中,所謂的鏈其實是一個List:
-
-
List<AbstractProcessor> processorList; //初始化processorList后調用addProcessor放入任務 publicvoid addProcessor(AbstractProcessor processor)throwsException{ this.addProcessor(this.processorList.size(), processor); }
- 初始化ForkJoinPool,並創建一個繼承自 RecursiveAction 的類,以便ForkJoinPool調用:
privatestaticfinalForkJoinPool forkJoinPool =newForkJoinPool(); // forkjoin框架執行類 classProcessorActionextendsRecursiveAction{ privatestaticfinallong serialVersionUID =-176563833918642543L; privatefinalProcessorChain pChain; //參數即待處理的任務鏈 ProcessorAction(ProcessorChain pChain){ this.pChain = pChain; } @Override protectedvoid compute(){ // 見下文 } }
-
- 調用 forkJoinPool.invoke(new ProcessorAction(processorChain)); 進行處理,此方法會自動調用 ProcessorAction 類的compute方法,串並行的關鍵在於此方法的內部實現。
- 以下是compute方法的實現:
-
@Override protected void compute() { AbstractProcessor processor = null; if (pChain.size() == 1) { try { //如果此子鏈中只有一個任務了,開始執行此任務 processor = pChain.nextProcessor(); // 取出任務 processor.process(processContext); } catch (Exception e) { this.completeExceptionally(e); } } else { // 否則,說明此鏈包含多個任務,則需要判斷哪些是可並行的,哪些是只能串行的 List<AbstractProcessor> asynProcessorList = new ArrayList<>(); while (null != (processor = pChain.nextProcessor())) { if (processor.isASyn()) { // 循環遍歷鏈中的任務,如果是可並行的,放到並行任務列表中 asynProcessorList.add(processor); } else { // 如果這個任務是串行的,則需要分情況進行處理 if (!asynProcessorList.isEmpty()) { // 情況一是此任務之前有待並行的任務,則需要將那些任務先執行 invokeAllProcessor(asynProcessorList); ForkJoinTask.helpQuiesce(); // 等待之前的任務執行完成 } // 處理完之前的任務后開始處理自己 ProcessorAction synProcessorAction = new ProcessorAction(pChain.buildNewProcessorChain(processor)); synProcessorAction.invoke(); // 等待自己處理完成 // 此並行任務處理完后,將之后的任務當成一個新的任務鏈,重復上述過程 ProcessorAction newProcessorAction = new ProcessorAction(pChain.subConfigProcessorChain(pChain.getIndex(processor)+1, pChain.size())); newProcessorAction.fork(); ForkJoinTask.helpQuiesce();// 等待新的任務鏈執行完成 asynProcessorList.clear(); // 情況異步任務列表 break; } } if (!asynProcessorList.isEmpty()) { // 如果沒有遇到可串行任務,說明當前任務鏈都是可並行的,則全部並行 invokeAllProcessor(asynProcessorList); } } }
private void invokeAllProcessor(List<AbstractProcessor> asynProcessorList) { List<ProcessorAction> taskList = new ArrayList<>(); for(AbstractProcessor processor : asynProcessorList){ taskList.add(new ProcessorAction(pChain.buildNewProcessorChain(processor))); } invokeAll(taskList); }
-
實現方式二:AKKA實現
AKKA的實現原理與forkjoin類似,只不過akka利用消息機制,在各處理線程間進行通信,其效率更高,通過實際測試也可以看到,利用akka的實現比forkjoin實現速度更快。建議有類似並行控制處理需求的優先選擇akka,值得注意的是akka是scala實現的,調試起來有一點小麻煩。以下給出主要代碼及注釋:
- 創建執行system及master:
-
privatestaticActorSystem execSystem =ActorSystem.create("processorHandleSystem"); masterRef = execSystem.actorOf(Props.create(ProcessMaster.class,newCreator<ProcessMaster>(){ privatestaticfinallong serialVersionUID =-7303628261382312794L; @Override publicProcessMaster create()throwsException{ returnnewProcessMaster(processorChain, processContext, processInfo); } }),"master"+ UUID.randomUUID());
-
- ProcessMaster是任務分發類,也是整個任務鏈執行的開始:
private final static class ProcessMaster extends UntypedActor { private ProcessorChain pChain; private Set<ActorRef> actorRefSet; private ActorRef startSender ; //保存啟動方法的sender ProcessMaster(ProcessorChain processChain) { this.pChain = processChain; this.actorRefSet = new HashSet<>(); } @Override public void onReceive(Object message) throws Exception { if (message instanceof String) { if ("doProcessor".equals(message)) { // 如果是master自己告訴的消息,說明要開始干活了 tellWorkerDoProcessor(actorRefSet); startSender = getSender(); } else if ("workDone".equals(message)) { // worker執行完后發送workDone消息 if (null == this.pChain.getNextProcessor()) { // 如果鏈里面沒任務了,告訴master任務做完了 startSender.tell("finished", getSelf()); this.getContext().stop(getSelf()); } else { // 如果還有任務,則說明還有其他並行的任務還在做,則在set里面先把自己清除掉 actorRefSet.remove(this.getSender()); if (actorRefSet.isEmpty()) { // 此set為空,表示所以並行的任務做完了,可以到下一階段繼續做任務了 tellWorkerDoProcessor(actorRefSet); } } } } else if (message instanceof Exception) { startSender.tell(message, getSelf()); getContext().stop(getSelf()); } else { unhandled(message); } }
- tellWorkerDoProcessor即是告訴任務開始執行的方法:
private void tellWorkerDoProcessor(Set<ActorRef> actorRefSet) { System.out.println(AbstractExecutor.getPChainContent(pChain)); AbstractProcessor abstractProcessor = pChain.getNextProcessor(); do { if (null == abstractProcessor) { // 保險起見,可以不加 break; } if (!abstractProcessor.isASyn() && !actorRefSet.isEmpty()) { // 如果當前需要同步,但是set不為空,說明之前的異步任務沒做完,需要返回等到那些任務做完 break; } abstractProcessor = pChain.nextProcessor(); // 准備好一個帶處理的任務 ActorRef workActorRef = this.getContext().actorOf(Props.create(ProcessWorker.class, new Creator<ProcessWorker>() { private static final long serialVersionUID = -8655672550330372007L; @Override public ProcessWorker create() throws Exception { return new ProcessWorker(); } })); workActorRef.tell(abstractProcessor, this.getSelf()); // 告訴此任務開始執行了 actorRefSet.add(workActorRef); if (!abstractProcessor.isASyn()) { // 如果當前的任務是需要同步的,則調處while循環,沒必要找下一個了,這個同步的必須先執行完 break; } } while (null != (abstractProcessor = pChain.getNextProcessor())); }
- ProcessWorker是真正任務執行的地方:
// 處理Processor的類 private final static class ProcessWorker extends UntypedActor { public ProcessWorker() { } @Override public void onReceive(Object message) throws Exception { if (message instanceof AbstractProcessor) { AbstractProcessor processor = (AbstractProcessor) message; System.out.println("開始處理processor:" + processor); try { processor.process(); this.getSender().tell("workDone", this.getSelf()); } catch (Exception e) { this.getSender().tell(e, this.getSelf()); } } } }
以上即是兩種方式的不同實現,從效率上看,基於消息機制的akka比基於線程間通信的forkjoin效率更好,在並行及串行任務混雜的場景里,可以考慮利用akka來實現,提升效率。
題外話:
自從來商城后有三個多月沒有文章了,打破一個習慣比養成一個習慣容易得多了,很多時候文章不是為了給大家看,而是對自己的一個總結和反省,畢竟接觸的東西多了,容易雜而不精,只有通過不同的總結提煉,將他們概括成通用的可記憶的知識,才能融會貫通,更好內容關注后續吧。