奇思妙想-java實現另類的pipeline模式



磕叨

在公司做項目是見到前輩們寫的一段任務鏈的代碼,大概如下

Runnable task = new TaskA(new TaskB(new TaskC(new taskD())));
task.run();

taskA執行run調用並完成TaskA聲明的任務邏輯之后,內部會自動調用構造參數傳入的TaskB的run方法,過程類似TaskA,TaskB完成之后一樣會調用參數傳入的task,直到最后一個沒有帶下一個task類傳入的任務完成,即完成一個管道式調用。

愛思考的我在想,可用,不好用重用,於是動手改改。


准備

經過一段時間開發后,有了一個常用的工具類,方便快速開發,但是這里用到的東西很少,還是要說明一下,這里用到一個我稱作ecommon的包,當然我只用了兩個很基礎的額部分。這兩個部分完全可以用你自己的實現,是非常簡單的。

思路簡述

我們先明確,jdk8以下的情況不作考慮。

pipeline我更多的印象是來自終端上的應用

命令終端中使用管道

pipeline是單向的,上個task的輸出作為下個task的輸入,直到沒有下一個task,最后一個task的作用就應該是你期望的。且后續任務只關心前者的輸出結果,對於的他是誰,怎么做的,是不關心的。記為 Point1

這個特點是我視為管道與切面或職責鏈模式的區別所在。

首先,我們得有第一推動,讓管道流能有個開始,再就是有中間task,他必定是能接收到上一個任務的輸出的,並且,可能有自帶參數,並且有自己的輸出,最后,有latest的task 與中間task區別在於他不用返回了,latest一般是以副作用的形式實現我們的企圖的,如上圖的 wc -l 作為最后一個任務是直接把結果打印到屏幕上,而不是返回一個變量給我們讀取。根據java的強類型屬性,以及剛剛一段的分析,可以得知,有3種類型的任務,開始任務,中間任務,最后任務,並且中間任務的個數是不限的,所有任務至於相鄰的任務有一個關聯點,那就是 前者的輸出類型與后者的輸入類型一致 (網文中大部分說自己實現的pipeline的模式都是傳遞Object類型,到各個子任務中自己強轉到需要的類型的,不說好與不好,但我肯定不喜歡)。這個特性記為 Point2

而且,每個子任務,本身是可以帶參數的,這是一個需要支持的點。像上圖命令中的管道,每個子命令(除第一個)都是同時接受前一個命令的輸出作為輸入,且自帶參數的。但是java在這里其實並不靈活,因此我們約定 后續任務的第一個參數就是前一個任務的輸入 , 這個約定是直接影響到我們的代碼實現的。這個特性記為 Point3

另外,管道的入口唯一的,一定是從開始任務往后流的。如果入口不一樣,那么就是像個不同的管道,他們的意圖以及輸入輸出的期望都是不同的。這個特性記為 Point4

最后,在java中使用,我肯定不能像終端那種,錯了重敲命令就是了,所以需要異常控制以及做一些相鄰任務承上啟下的時刻做點什么,例如日子打印,斷言等。這個算附加題。

提起鍵盤擼

(因為我已經寫完並測試完了,所以我就反過來解析我是怎么想的了)

這里以Runnable接口作為基礎接口。給出其中一個測試的例子

這里初始任務是給出一個日期,中間任務是拼接成人類友好的1句話,最終任務是直接打印到屏幕。(現實中要實現這樣一句話,當然是直接擼啦。這里只是為了演示),看看Pipeline初始任務的定義

先不看其他屬性,看構造方法,傳入一個 IFunction<R> ,按照准備一節的定義,他是一個返回類型為聲明泛型R,且無參數輸入的閉包函數(或稱作lambda表達式)。對照上面PipelineTest中就是那個 () -> { return new Date(); } , (得益於jdk8的類型推斷,在 new Pipeline<> 構造時,不用再聲明其泛型,編譯器能根據閉包函數的return類型推斷出這里是個Date類型)。next, end 是指明管道的下接任務,這可以看出管道是極其類似於任務鏈/職責鏈的(需要注意nextend同時只能有個一個存在)。hook是異常管理以及任務間承接時做一個切面方法的,argCxt是記下傳遞參數,方便hook中的方法使用(這個是因為java需要的,跟管道模式並沒有關系)。

再看add方法的一個重載,添加並返回中間task

add方法傳入一個IFunction1<RT, R>的閉包(lambda),尾數為1,意味着接受一個 R 類型的輸入,並在方法升聲明了 RT ,以 RT 類型作為輸出。其中 R 的泛型聲明在類上,就是與構造方法的 R 是同一個類型。而 RT 的具體類型的推斷會根據具體的lambda的返回類型決定。這里add方法會返回剛剛構造出來的中間任務的聲明對象。add方法需要保證當前任務是沒被聲明過后續任務的。

再看MiddlePipeline類的定義

先看構造方法,他就是接受add方法傳入的閉包。他聲明了兩個泛型變量 分別是 <C, P>,其中 C代表他的輸入類型, P代表的他的產出類型。同初始任務一樣,他也有nextend 指明他的管道后接任務(next)。可以注意到這里的 next 和初始任務的屬性 next 的產出類型都是被放上了泛型通配符 ? ,是因為任務並沒辦法知道他的子任務的產出類型的(后面會再說一下這個問題)。

再看add方法的一個另一個重載,添加並返回最終task

類似返回中間態的task,只不過這里用了無返回的閉包。

再看EndPipeline類的定義

最終task的定義清爽很多,他只關心輸入,並執行。並且他沒有后續任務。

再來補充下AbstractPipeline的解析

這個寫法是為了實現Point4所描述的事的,只要是同一個pipeline上的task所有入口都是初始任務上的那個run方法。(為了省事實現,后續任務的基類和所有派生類都是初始任務的非靜態內部類)

再看看初版版本run方法

邏輯很簡單,執行初始任務,得到結果,然后找后續任務,把結果作為輸入來執行后續任務,(其中循環時滿足上一個輸出作為下一個輸入),直到有一個管道類的中間態任務為null,然后判斷最終任務是否為null ,非空則執行它。

需要說明一下這里用 @SuppressWarnings 壓制了警告,是因為確信java編譯器能確保連續兩個add進來的task之間的輸入輸出的類型關系是一致的(這一點,如果不一致,在編寫代碼時IDE就會報錯了)。

到此,一個簡單的java實現的pipeline模式基本可以用,跑最開始那個demoTest是沒有問題了。

再給一個樣例demo

管道中的3個方法的職責就如他的名字那樣(實現上我這里只是簡單的new一下),然后同過Pipeline類以及它的add方法串起來,執行結果如紅色部分。聰明的人肯定能想到,那么像那個java的stream的?嗯很像,stream是類似把元素放到單個跑到上,按照定義那樣的自己跑到終點(這也是使用方代碼方便地切換到並行流的原因,因為邏輯一致,當然,並發問題是另一個層面的問題)。

而pipeline則橫向的一階段一階段地執行,如果要增加吞吐量怎么搞?聰明的你肯定能想到分片了,這樣走下去就跟parallelstream的意圖不謀而合了。那么還有別的好處嗎?嗯,你想想Mock測試?職責上有沒有讓你更好切分了(正如這里命令的方法名那這樣)?


進一步完善

上面一節基本上能把 Point1Point2的一半 、 Point3Point4 實現了,剩下 Point2中說到的,除了接收前一個任務的輸入,還允許管道聲明時傳入參數的這個功能,以及那個附加題說到的java應用上的妥協。

pipeline聲明上附帶參數

這個時候就要好好用到 准備 一節中的那些 函數接口 了。說起來並不好解析,但是如果你了解過curry柯里化這個概念的話,那一看圖你就懂了,看圖。

就是把帶參數的lambda重新包裝一次為不帶參數的lambda表達式。后面middlePipeline的帶參數部分則是重新封裝為一個只接受一個參數且返回類型相同的lambda表達式,這是類似的。

來一個測試看看,並附上圖中說明

對java友好支持

附加題說的這個就跟簡單了,找個地方分別設置好兩個玩意,在對應的地方執行他們就是了

public class PipelineHook {
	
	private boolean preventThrow = false;

    // 異常發生時執行此表達式
	public final IVoidFunction3<PipelineHook, Exception, Object[]> exceptionHandler;
	
    // 調用后續任務錢執行此lambda表達式
	public final IVoidFunction1<Object> aspecter;

	public PipelineHook(IVoidFunction3<PipelineHook, Exception, Object[]> exceptionHandler, IVoidFunction1<Object> aspecter) {
		this.exceptionHandler = exceptionHandler;
		this.aspecter = aspecter;
	}

	public PipelineHook(IVoidFunction1<Object> aspecter) {
		this(null, aspecter);
	}

	public PipelineHook(IVoidFunction3<PipelineHook, Exception, Object[]> exceptionHandler) {
		this(exceptionHandler, null);
	}

    // 是否阻止異常拋出
	public boolean isPreventThrow() {
		return preventThrow;
	}

    // 設置標記阻止異常拋出
	public void setPreventThrow() {
		this.preventThrow = true;
	}
	
}

通過兩個lambda變量構造出hook對象,並通過初始任務的的 addPipelineHook 方法set進去,他們具體在 run 方法體中發揮作用,現在,run方法更新為

其中 getCxtInfo 方法會把當前子任務的參數轉化是字符串,讓異常信息能夠被人讀懂。


今天先到這里了,整體下來,覺得跟stream太像了,我發現用stream碼起來特爽,讀起來特慘(特別是讀別人的多重stream的時候),而這個pipeline正好相反耶。總的來說,就是個模式,需要提高吞吐量的話,使用分片配合線程池的話,吞吐量會得到巨量提升哦(把每個分配的大小設置為1不就是我們的parallelStream嗎?哈哈)。

issue在: https://github.com/kimffy24/EJoker/issues/30

初次提交: https://github.com/kimffy24/EJoker/commit/c71e5d76a0904249b7c1399bd8ba52ec72fe9a0e


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM