Enterprise Integration Pattern - 組成簡介


  近些年來,越來越多的Web應用正在逐漸向大型化的方向發展。它們通常都會包含一系列相互協作的子服務。在開發過程中,如何讓這些子服務協同工作常常是軟件開發人員所最為頭疼的問題,如各個子服務之間的數據表示不一致,處理並發的能力不同,進行溝通的網絡不穩定等。為了解決這些問題,世界各地的優秀程序員提出了一系列解決方案,並最終形成了一整套用來完成各個子服務之間溝通及集成所使用的解決方案。這些最佳實踐最終由Gregor Hohpe以及Bobby Woolf整理成為《Enterprise Integration Pattern》一書。

  從我個人的角度來講,我還是希望您能夠細細地品味這本書所給您帶來的各種數據流組織方式以及它在講解中所考慮的各種情況。當然,您也可以通過本文快速了解Enterprise Integration Pattern到底是什么。而在我的日程中,本文則是Cloud Orchestration的理論基礎,並進而逐漸地將這些知識點鋪開並最終向大家介紹什么是Application as a Service。

 

基於消息的集成

  閑話少說,讓我們直接介紹Enterprise Integration Pattern的最核心思想,那就是基於消息的集成。在這種方式下,各個組件之間的交互將不再使用遠程調用等同步方式,而是通過向目標子系統發送一個消息來令該子系統執行某個功能。而在消息成功發送之后,調用方即可以開始對其它任務進行處理,而不再是同步調用過程中的等待。在使用這種處理方式時,一個系統的吞吐量可以大大增加:

  在通過消息進行集成時,組成服務的各個子服務則可以通過消息完成彼此的隔離。這種隔離所帶來的好處很多。首先,只要子服務之間能夠在消息傳遞方式以及格式方面上達成一致,其它一系列有關實現的細節,如到底使用什么語言,什么類庫編寫服務並不會影響到相互通訊的另一方。這使得軟件開發人員可以根據需求選擇最合適的開發技術,進而降低開發成本,提高代碼質量。同時只要相互溝通的各個子服務之間進行通訊的方式不變,這種隔離還能夠讓相互通訊的各個子服務獨立地測試和升級。如果在服務運行過程中一個子系統實例產生了異常,那么該異常將只會影響到該子系統實例,而不會傳播到其它與該子系統進行溝通的各個子系統中。與此同時,消息的發送方還可以將消息暫時緩存在本地,並可以在接收方重新恢復到正常運行狀態之后再將該消息發送到接收方。

  反過來,在子系統中傳遞消息不僅僅是一對一傳遞這么簡單。例如在相互通訊的兩類服務A和B之間,服務A的並發處理能力較服務B的並發處理能力強很多,那么我們就需要在消息傳遞給服務B之前對消息進行分發,以通過多個服務B的實例並行處理服務A所發出的消息。除此之外,我們還常常需要將一個消息拆分成為多個不同的消息,並由多種不同的服務來處理它們。甚至有時候,我們需要將一個消息廣播到所有的目標類型服務實例上。因此在使用消息進行集成的時候,我們首先要理解的,就是對消息進行處理的多種不同方式。

  當然,這些消息處理方式並不需要我們自己寫代碼來完成。很多消息系統(我這里不用名詞消息中間件,即MOM,Message-oriented Middleware,因為有歧義)已經為我們提供了這些消息處理方式的通用實現。在使用時,我們只需要通過一系列配置文件來控制消息系統的運行方式即可。除了提供這些消息處理方式的通用實現之外,消息系統還為我們解決了一系列和網絡傳輸相關的問題:在兩個子服務之間發送消息時,發送方或接收方都有可能沒有准備完畢,從而導致消息發送失敗;抑或是在兩個子服務都已經准備完畢的情況下,子服務之間的網絡連接出現了問題,進而導致消息無法發送到目標子服務。在消息系統的幫助下,軟件開發人員不再需要關心如何處理這一系列復雜的網絡問題,而只需要在消息系統的幫助下將精力集中在業務邏輯的實現上即可。因此在以Enterprise Integration Pattern作為指導思想組織服務時,我們常常會使用一些較為成熟的消息系統來輔助集成。而在集成過程中,軟件開發人員需要對消息系統以及業務邏輯的邊界有一個清晰地認識:在消息處理的整個過程中,任何與消息分發傳遞相關的工作都需要由消息系統來完成,而對消息所包含信息的處理則是連接到消息系統上的各個子服務的責任。

  也正是因為如此,一個消息常常分為兩個部分:消息頭和負載。消息頭用來記錄消息的元數據,以用來控制消息被如何處理,是需要由消息系統來使用的數據,與參與集成的各個子服務無關。而負載則用來記錄消息的實際內容,是由參與集成的各個子服務所需要處理的數據,而與消息系統無關。

  在一個子服務希望與其它子服務進行溝通時,其常常只需關心需要發送出去的消息的內容,而基本不需要關心如何生成這些消息的元數據:

  從上圖中可以看到,一個子服務在需要向另外一個子服務發送消息時,其常常只需要提供消息的負載,甚至都不需要考慮為消息系統提供該消息應該如何被處理的元數據。而在整個消息進行路由的過程中,消息系統則可以根據消息元數據決定如何路由這些消息。

  在傳遞消息時,消息系統並不是將這些消息置於一個消息池中供各個子服務挑選,而是在各個需要溝通的子服務之間建立獨立的通道。消息的生產者只需要將消息放入到特定的通道中即可返回,而並不會知道到底是由哪個消息的消費者來對其進行處理的。它只能肯定的一點是:消息肯定會准確地送達通道的另一端,進而被某些消息的消費者處理。而通道的內部實現則負責緩存該消息,並向通道的另一端發送。

  所以就一個子服務的開發者而言,他所需要關心的僅僅是如何恰當地組織該子服務的業務邏輯,而不需要考慮如何對消息進行路由。而對於整個分布式服務的設計者而言,如何通過這些消息系統組織整個應用的數據流則會變得非常重要:通過消息路由的拓撲結構,我們需要防止服務的單點失效,同時也需要防止單個子服務出現過載等一系列問題。剩下的有關消息傳遞的細節,如如何對保證消息的可靠傳遞,如何保證消息的安全性等都留給消息系統來處理。

  而Enterprise Integration Pattern所提出的一系列模式就可以幫助您解決在設計整個系統數據流時所可能遇到的一系列問題。和四人幫所提出的23種經典模式一樣,它們並不能為您的問題提供一個整體的解決方案,但是通過靈活地使用並組合它們,您常常可以創建一個非常精巧穩定的數據流拓撲,從而使得您的服務在可用性,性能等眾多非功能需求上表現得較為優異。

 

消息系統的組成

  在了解消息系統對消息進行處理的各種模式之前,我們首先需要了解消息系統中的Pipes and Filters模型。就如其名字所描述的那樣,該模型主要由兩部分組成:用來傳遞消息的通道(Pipe)以及用來對消息進行處理的過濾器(Filter)。這些消息通道將過濾器串聯起來,而消息自身則會沿着這些通道流動:

  上圖中所顯示的就是對一個消息進行處理的最典型方式。在一個消息被發送到一個管道中后,過濾器將會在具有消息處理能力時從輸入管道中接收該消息並開始處理。一旦該消息處理完畢,那么過濾器將會把處理結果放到輸出管道中,並由與該管道相連接的各個過濾器完成消息的后續處理。一個過濾器可能對消息進行處理,也可能僅僅是對消息進行轉發。

  由於在經典的Pipes and Filters模型中,過濾器的輸入及輸出只能有一個,因此其所包含的各個過濾器並不可以被多條處理邏輯重用。因此Enterprise Integration Pattern放松了該約束,使得一個過濾器可以從多個管道接受消息,並向多個管道發送相同或不同的消息。在放松了該約束的情況下,我們才能真正地通過消息來組織我們的應用:您需要將業務邏輯分割為一系列順序執行的彼此獨立的步驟,然后才能通過Pipes and Filters模型對它們進行組織。對於一條獨立的業務邏輯,您只需要將他們組織成為線形結構即可。而對於整個應用所包含的多種業務邏輯而言,這些分割出的步驟將可能被多個業務邏輯重用,進而組成一種較為復雜的拓撲結構:

  上圖展示了消息是如何在一個由管道和過濾器所組成的拓撲結構中進行傳輸及轉發的。在一個消息到達該拓撲結構之后,第一個過濾器會根據自身的消息派發邏輯來選擇到底是由右上的過濾器繼續處理,還是由右下的過濾器來執行另一種業務邏輯。無論第一個過濾器選擇的是哪個過濾器,其都將會繼續產出消息並由后續的過濾器進行處理。同時您會發現,在該拓撲中心的兩個過濾器則是可以由右上及右下兩條處理邏輯所公用的。

  這種放松的約束將使得對消息的處理方式變得更為多樣,也更為靈活。在Pipes and Filters模型中,對消息的處理主要分為三步:消息從輸入管道進入到過濾器中,由過濾器處理,再由輸出管道流出。一個過濾器可能同時偵聽多個輸入管道所到來的消息。而對消息的繼續派發也可能從多個輸出管道中選擇,或者是向輸出管道進行廣播。而在經過過濾器處理后,輸出消息可能與輸入消息相同,也可能與輸入消息不再相同。

  從多個輸出管道中選擇消息的派發目標通常是通過路由器(Router)來完成的。一個路由器可以連接多個輸出管道,並根據正在處理的消息中所包含的信息來選擇消息所需要派發到的管道:

  輸出消息和輸入消息不同的情況則主要分為兩種:業務邏輯處理之后所產生的輸出,以及為了能夠讓前一個過濾器的輸出可以被后一個過濾器所接受進行的轉化。第一種實際上就是在執行業務邏輯,因此其輸入和輸出不同非常容易理解。而后一種情況則在不同應用進行集成時非常有用:兩個能夠相互集成的、原本獨立的應用常常會對同一種事物建立模型,但是這些模型的定義則常常擁有不匹配的地方。而在集成時,我們就需要在這兩種數據模型之間轉化。

  現在就剩下了最后一個問題,那就是應用是如何將消息放到管道中以及如何從管道中取出消息的呢?其實很簡單:消息系統常常提供了一系列客戶端,以允許應用通過該客戶端向管道中插入消息或取出消息。由於很多消息系統都作為一個或多個獨立的服務在運行,因此向管道插入消息的操作也就是向消息系統服務發送請求的過程。該請求的物理地址是目標消息服務所在的物理地址,而邏輯地址則是該服務中所包含的各個管道。因此剛剛所展示的拓撲結構也即是該應用的邏輯拓撲結構,而物理結構則常常是以消息服務為中心的星型結構,或者是基於星型結構的拓撲:

  而邏輯拓撲中的各個管道則運行在消息服務內部。由此可見,消息傳遞的時間主要消耗在各個子服務與消息服務通訊的過程中:

  因此在基於Enterprise Integration Pattern設計一個系統的時候,您對性能的考量不僅僅需要從邏輯拓撲上進行,還需要從系統的物理拓撲結構來考慮。

 

管道

  OK。由於過濾器包含了很多種類型,並且需要覆蓋對消息進行處理的絕大多數情況,因此我們從相對獨立的管道開始說起。

  首先要強調的一點就是,管道是有向的。在上面的講解中就已經提到過,管道實際上是消息進行流通的路徑。在需要從發送者傳遞到接收者時,一個消息就將會從管道的一個方向發送到另一個方向。由此可以看出,每個管道實際上都是一個消息池,而管道的使用者也就有了生產者和消費者之分。對於一個管道而言,如果與其連接的一個過濾器是生產者,那么它將只能向管道中放入消息,而不能從管道中取得消息,否則由該生產者所放入的消息將可能再次被自己所接收到。類似地,如果一個過濾器是消費者,那么它也只能從管道中取得消息,而不能向該管道發送消息。也就是說,管道是有向的。因此如果您希望兩個過濾器之間能夠雙向通訊,那么您就需要在這兩個過濾器之間創建兩個單向的管道。

  那到底一個管道可以擁有多少個生產者和消費者呢?答案是一個或者多個。通常情況下,管道的實現常常不會對生產者和消費者的個數進行限制。在一個具有較大吞吐量的網站中,常常有多個生產者和消費者在使用同一個管道:

  而在管道中存在的消息則對於連接在同一個管道上的所有消費者是等效的。也就是說,在消息系統這個范疇之內,如果管道上的一個消費者可以接收一個消息,那么另一個消費者也應該可以接收該消息。

  在管道的一個生產者生產出了一個消息之后,與該管道相連的接收者便可以從管道中取出消息並對該消息進行處理。那么到底有哪些消費者可以接收到這個消息呢?這取決於管道。一個管道可以有兩種不同的消息分發方式:Point-to-Point以及Publish-Subscribe。在一個消息置於Point-to-Point管道中后,將只有一個消費者能夠接收到該消息。而如果一個消息被放入Publish-Subscribe管道,那么所有的消費者都會接收到該消息:

  當一個消息被置於管道中時,消息系統將會根據消息中所包含的元數據來決定如何對消息進行處理。而在過濾器接收到一個消息時,其需要能夠知道消息負載所具有的格式以及如何對該負載進行解析。因此Enterprise Integration Pattern提供了Datatype Channel,以允許用戶顯式地標明在該管道中所傳輸的數據的類型。

  如果需要在兩個過濾器之間傳輸的數據類型分為很多種,那么我們是否需要在它們之間創建多個管道呢?答案是,我們可以這么做。只是由於每個管道都需要使用一個緩存來記錄送入的各個消息,因此這會對消息系統造成較大的壓力。另一種辦法則是通過Selective Consumer來有選擇地從一個管道上讀取數據。有關Selective Consumer的使用我們會在后面的章節中講解。

  那么對消息進行處理的過程中出現了問題該怎么辦?我們前面已經提到過,消息系統主要通過消息的元數據來決定如何對消息進行分發,而對消息負載的處理則由各個過濾器自身來完成。而這兩部分都有可能發生一系列異常:消息可能由於網絡的原因無法發送到接收方,一個消息被放到了錯誤的管道中,消息負載所包含的數據非法,進而無法通過驗證等。

  我們顯然不能對這些異常情況置之不理。Enterprise Integration Pattern提供了兩種用來處理異常的管道:Dead Letter Channel以及Invalid Message Channel。當消息系統無法根據當前消息中所包含的信息將消息發送到一個過濾器的情況下,其會嘗試着將該消息放入Dead Letter Channel中。而如果一個消息能夠正常地發送到過濾器中,但是過濾器無法對該消息進行處理,那么它就需要將該消息置於Invalid Message管道中。

  通常情況下,一個消息系統會提供一個預設的Dead Letter Channel並提供對這些消息的默認處理。但對Invalid Message的處理則常常需要軟件開發人員自行編寫這些消息的處理邏輯。最為常見的一種處理方式便是在Invalid Message Channel的另一邊放置一個用來接收這些非法消息的過濾器並在非法消息到達時向系統添加一條日志,同時在該條日志中記錄有關消息的詳細信息。

  Dead Letter和Invalid Message之間的不同主要就在於其是否可以成功地發送到過濾器中。如果一個消息無法成功地發送到一個過濾器中,那么其將會被置於Dead Letter Channel中。而如果一個消息能夠被成功發送,卻無法被過濾器正確地處理,例如消息負載的格式不對,或者消息中缺少了必要的頭,那么該消息就將被置於Invalid Message Channel中。這里需要注意的一點就是,如果錯誤出現在與業務邏輯相關的部分,如消息中沒有給出足夠的信息等,那么它應該歸類為應用業務邏輯方面的錯誤。因此對該錯誤的處理應該由應用的業務邏輯完成,而不是置於消息系統的Invalid Message Channel中。

               

消息

  看到這里,相信您的感覺一定和我一樣:事情越來越復雜了。一個程序中的數據流動常常伴隨着數據的分發,轉換,合並,而且程序在執行過程中常常有不同的運行方式,如函數調用,事件廣播等。為了能夠支持這些功能,消息系統中的消息遠比我們想象的復雜。

  首先讓我們從函數調用說起。一個普通應用中的函數調用常常具有如下形式:

1 result = function(param1, param2);

  但是在一個基於消息的系統中執行跨子服務調用則不是那么容易的事。一個函數調用常常是同步的:調用方需要等到被調用方處理完畢並返回后才繼續執行。而基於消息的調用則是異步的:如果調用消息發出后調用方即阻塞,那么調用方的吞吐量將受到很大的影響。因此在一個基於消息的系統中,跨子服務調用需要消息系統中的消息支持如下的功能:

  1. 能夠通過消息來發送一個請求,以完成對另一個子系統中的功能的調用。該請求能夠包含功能調用所需要的各個參數。而在請求發送完畢以后,發送方需要能夠繼續處理其它事務。
  2. 被調用的子系統能夠通過消息將功能執行的結果返回給調用方。此時被調用的子系統需要知道如何將消息發送到調用方。

  當然,如果一個跨子服務調用並沒有任何返回值,那么消息系統只需要保證調用消息被發送到被調用方即可,而被調用的子系統就不再需要通過消息將執行結果返回給調用方。但是在很多時候,我們需要從子服務中得到執行結果。在這種情況下,我們不能簡單地將包含結果的消息直接放到調用消息所發來的那個管道中。這是因為管道中消息的流動是有向的。為了返回子服務的執行結果,我們需要使用另外一個管道來傳遞結果消息:

  是的,從圖中您就可以看到,該消息調用的組織方式被稱為Request-Reply。而實際情況則常常更為復雜。在一個具有較高吞吐量的系統中,發送方和接收方常常不止一個:

  在被調用方需要將響應消息發送給對應的接收方時,它該如何知道傳送響應消息所需要使用的那個管道呢?答案就是在請求消息中添加一個標示返回管道的信息(Return Address)。此時被調用方只需要查找請求消息中所記錄的返回管道信息並使用相應的管道傳輸響應消息即可。

  由於基於消息的調用是異步的,因此在接收到響應消息之前,調用方可能已經發送了許多請求。為了處理這種情況,消息的發送方需要知道當前到達的消息到底是哪個請求的響應。完成這個功能的消息組成就是Correlation ID。在調用方發送一個請求之前,其將為該請求添加一個ID。而在為該請求生成相應的響應消息時,被調用方會將請求的ID標示為響應的Correlation ID的值。這樣在響應到達調用方的時候,調用方就可以通過查看響應中的Correlation ID來知曉該響應到底對應着哪個請求。當然,我們常常不需要擔心如何設置及管理Correlation ID。消息系統常常已經幫我們處理了大部分工作。

  另一個與消息的異步特性有關的事情就是消息的先后順序。如果一個消息較為龐大,那么消息系統常常會將它分割為多個消息再進行傳送。相應地,接收方則需要將這些消息組合到一起才能得到響應消息的全部內容。由於這一系列消息在傳輸過程中不可避免地存在着到達的先后順序,因此接收方需要有一種方式知曉它們的先后順序。消息系統常常提供了一種叫做Message Sequence的組成:在需要發送的數據較為龐大時,消息系統會將它切割成一系列較小的子消息,並在這些消息中記錄這些消息的先后順序以及總的子消息數量。在接收方接收到這些消息后,其可以通過其內部記錄的先后順序以及總的子消息數量判斷到底是否所有的子消息都已經到達接收方,並在子消息全部到達后將它們組合起來。

  除此之外,軟件開發人員還可以標示消息的過期時間。由於該概念較容易理解,因此在這里不再贅述。

 

消息的路由

  在前面一節對消息的講解中,我們已經介紹了消息所包含的用來為各種消息處理方式進行支持所添加的各個域。在這些域的幫助下,消息系統能夠在各個過濾器之間完成各種消息的傳遞,並將消息處理后的結果返回。但是這並不能滿足一個復雜的基於消息的服務對於消息分發的需求:一個消息可能有多個可選的目標過濾器,而其需要被發送到一個或多個目標過濾器中;或者是一個消息需要分割為多個彼此獨立執行的消息以交由不同的過濾器進行處理等。而在為消息設計它們如何在系統中路由時,您需要考慮子系統的性能,擴展性,高可用性等一系列需求,因此如何設計消息在系統中的路由常常是設計基於消息的服務中的重中之重。

  在編寫代碼時,我們常常需要根據計算所得到的數據來決定到底執行哪一段邏輯:

1 if (condition) {
2     // Logic 1
3 } else {
4     // Logic 2
5 }

  而在基於消息的系統中,我們也會遇到同樣的問題。在前面對管道進行介紹的過程中我們提到過,由於管道需要保證在其中傳輸的消息能夠到達接受方,因此其常常使用一個緩存來記錄當前需要發送的消息。但是在一個系統中,需要傳遞的消息類型有很多種,甚至每種類型的消息所記錄的內容差異非常大,因而需要由不同的子系統來進行后續處理。由此我們可以看出,為每種需要處理的情況創建一個獨立的管道是一個不現實的設計,因為這種設計方式需要創建太多的緩存,並維護太多的管道,進而增加了消息系統中管道出現問題的概率。同時消息的發送方還需要知道消息的路由邏輯,而這本應該是消息系統的責任。一個較為貼近現實的情況則是:一個管道常常用來傳遞一類具有類似意義的消息,而由具有路由功能的過濾器來決定到底哪些過濾器可以用來接收並處理這些消息。這樣消息的發送方就不需要知道消息的目標地址,從而使得消息的路由邏輯與業務邏輯分離。而在需要修改消息的路由方式時,我們也不必再在業務邏輯代碼中尋找向管道發送消息的代碼了。

  通常情況下,消息系統都會提供一系列用來進行路由的過濾器實現。這些過濾器通常從一個管道中讀取一個消息,並根據其所知的路由條件來決定到底向哪些管道中輸出消息。

  最常見的用於路由的過濾器就是基於內容的路由器:Content-Based Router。其有一個輸入管道及多個輸出管道。當一個消息從輸入管道到達該路由器時,其會根據消息所包含的內容決定將這些消息派發到哪個管道中,從而由管道另一端的過濾器對消息進行處理:

  而該路由器的一個缺點就是:其無法應付接收端所發生的各種變化。當我們向系統中添加、刪除或修改一個接收端時,我們也同時需要修改Content-Based Router所包含的路由邏輯。這在一個接收端經常發生變動的系統中是一個非常顯著的缺點。因此使用Content-Based Router的前提就是接收端不經常發生變化。

  如果接收端經常發生變化怎么辦?Enterprise Integration Pattern中提到的叫做Message Filter的過濾器或許能解決我們的問題。該類型的過濾器擁有一個輸入管道和一個輸出管道。當輸入管道中傳來的消息滿足Message Filter中所標明的條件時,其將被傳入輸出管道,否則該消息將被直接丟棄:

  如果將Message Filter與Publish-Subscribe管道結合,我們就可以處理接收端經常發生變化的問題了。在該方案中,所有到來的消息將被發送到一個Publish-Subscribe管道中,而每個接收端之前都會使用一個Message Filter連接到該管道上。當一個消息到達Publish-Subscribe管道時,所有的Message Filter都將接收到這個消息,而其所包含的篩選邏輯將會用來辨別到底哪些消息可以被Message Filter之后的接收端處理。一旦某個消息通過了該篩選,那么該消息將被送到Message Filter的輸出管道,進而傳入其后的接收端。當我們需要添加或刪除一個接收端的時候,只需要添加或者刪除相應的Message Filter即可。而如果一旦接收端可以接收的消息條件發生了變化,那么我們只需要修改相應的Message Filter即可:

  但是相較於Content-Based Router,基於Message Filter的路由邏輯也有自己的問題。那就是可能有消息被多個Message Filter所通過,而另一些消息則不會被任何Message Filter通過。也就是說,該消息有可能被直接丟失或者重復處理。而這在某些情形下則是不被允許的。

  一個解決該問題的方法就是讓各個參與消息處理的接收端將自身的處理能力注冊到一個路由器中,而當一個消息到達的時候,路由器會根據自身所記錄的處理能力來對消息進行派發:

  在系統啟動時,各個消息處理端會通過Control Channel注冊自身以及其可以處理的消息。一旦有消息到達,Dynamic Router就會根據其所記錄的處理端的信息對消息進行派發。在這種運行模式下,Dynamic Router可以保證每個消息將只有一個接收端被處理,而且在沒有接收端可以處理到來的消息時能夠執行相應的運行邏輯,如在系統中添加一條記錄等。而在系統運行過程中,接收端也可以通過Control Channel來通知Dynamic Router其自身的加入、離開以及偵聽條件的修改。這樣我們就可以在一個接收端失效后為系統添加該接收端的備份,從而保持整個系統的高可用性。

  但是Dynamic Router並沒有完全解決基於Message Filter的路由模式所導致的問題。從上面的敘述中可以看到,Dynamic Router,甚至是前面所講到的Content-Based Router實際上都是將一個消息傳遞給一個接收端。而Message Filter的一個好處則是可以將消息發送給多個接收端。為此Enterprise Integration Pattern還包含了另外一種模式:Recipient List。

  該模式會為每個接收端定義個管道。在一個消息到達之后,其將會根據各接收條件將消息放入符合接受條件的多個管道中:

  而與Content-Based Router需要考慮接收端的添加和刪除一樣,我們也需要考慮如何處理Recipient List中接收端的添加和刪除。解決方案也和Dynamic Router一樣。那就是通過一個Control Channel來允許接收端的加入、離開以及偵聽條件的修改:

  當然,對消息進行處理的方式不僅僅局限於消息的分發。一個消息可能包含了多種組成的實例,而對單個消息組成的處理都可能會消耗大量的時間。如果我們能讓這些組成被並行地處理,那么整個系統對單個消息的響應速度將會顯著增加。舉例來說,在一個監控軟件中,我們常常需要將多個攝像頭所記錄的信息傳遞給圖像分析系統。由於我們常常需要按照不同的方式來分析圖像中所包含的信息,因此整個圖像分析功能常常包含了不同的子分析系統。由於每種圖像分析本身也是一個較為耗時的操作,因此每個子分析系統也常常會包含多個實例。在包含這些圖像的消息到達圖像分析系統之后,我們就需要將這些圖像分發到這些子系統實例中,由這些子系統實例分別對這些圖像進行處理。

  因此在一個基於消息的系統中,我們常常需要將一個消息分割為多個獨立的消息並由不同的過濾器實例進行處理。為此Enterprise Integration Pattern中提出了Splitter模式。該模式擁有一個輸入管道和一個輸出管道。如果一個消息從輸入管道進入到Splitter,那么其將會根據Splitter的內部邏輯分割為多個子消息,並被置於輸出管道中:

  而在Splitter之后,我們常常需要通過一種叫Aggregator的過濾器將這些子消息的處理結果合並起來。其會在所有子消息處理結果到達之后才開始對它們進行處理,並生成輸出消息。而對這些子消息進行處理的方式也分為很多種:Wait for All,即等待所有的子消息都到達;Time Out,即等待一段時間並根據這段時間之內到達的子消息響應生成最終的消息處理結果;First Best,即在第一個子消息的響應到達以后就忽略其它的子消息響應;Time Out with Override,即等待一段時間,並在某個滿足要求的子消息響應到達后就生成最終的響應;External Event,即由一個外部消息來結束子消息的收集並由此開始生成最終的響應。

  有時候,我們還希望某些消息能按照某種特征進行排列。此時我們就需要使用Resequencer。其通過輸入管道接收到了一系列消息,並在其內部根據自身所設置的算法對它們進行排序,並將排好序的各個消息依次輸出。

  當然,一個基於消息的系統對消息進行分發的方式常常復雜得多。此時我們常常通過將這些簡單模式組合在一起來完成對消息的處理。例如Enterprise Integration Pattern一書中提到了一系列常見的將他們組合在一起的方法:Composed Message Processor,Scatter-Gather,Routing Slip,Process Manager以及Message Broker等。

  Composed Message Processor包含一個輸入管道和一個輸出管道。在一個復雜的消息到達輸入管道后,其內部所包含的Splitter將會把該復雜消息切分為一系列粒度較小的子消息,並通過一個Router(Content-Based Router,Dynamic Router,Message Filter等)將消息分發到不同的子系統中。而在所有的子系統將這些子消息處理完畢以后,Composed Message Processor將通過Aggregator再將這些響應組織在一起:

  一個與Composed Message Processor模式類似的模式則是Scatter-Gather模式。它們兩者之間最主要的不同就是Scatter-Gather模式是將消息廣播到一系列接受方,然后再由Aggregator進行匯總。也就是說,其同樣只有一個輸入管道和一個輸出管道。當一個消息通過輸入管道傳遞到Scatter-Gather管道之后,其內部將會把該消息分發到多個接受方。而后其會使用Aggregator將眾多消息處理結果歸結在一起:

  上圖中所列出的一個Scatter-Gather的實現就是通過一個Publish-Subscribe管道來完成的。在這種情況下,所有連接在該管道上的各個過濾器都將會接收到該消息。如果需要控制哪些過濾器會接收到該消息,那么我們就需要使用Recipient List來對消息進行分發:

  另一種組合模式就是Routing Slip。當一個消息到達該模式實現時,其將首先決定該消息應該如何在各個過濾器之間路由,並將各路由路徑附加到該消息上。接下來,該消息就會依次通過該路由信息所記錄的各個過濾器:

  該模式中存在着兩個假設:消息的路由路徑是固定的,而且需要被這些過濾器線性地處理。但是事情往往並不是如此美好:對一個消息的處理結果常常會決定消息下一步應該如何路由,同時一個消息常常需要被拆分為多個子消息並被並行處理。在這種情況下,Routing Slip模式並不能滿足需求。因此Enterprise Integration Pattern又介紹了另外一種模式:Process Manager。在一個消息到達時,其將會根據消息中所包含的內容以及被各個過濾器處理的結果決定該消息應該如何被分發:

  而其與Message Broker非常類似。只不過Message Broker則是從更高層次上來討論消息到底應該如何組織的。

 

消息的轉化

  基於消息的應用常常需要處理的另外一個問題就是各個子系統之間所需要的數據並不一致。這里的不一致主要表現在幾個方面:數據表現方式不一致,數據組成不一致,甚至有時還需要數據的加密和解密。

  我們在前面已經提到過,一個消息主要分為消息頭以及消息內容兩部分。消息頭主要由消息系統使用,而消息的內容則在各個系統之間交換數據。但是參與集成的系統則常常不知道消息系統所需要使用的各個消息頭,以及消息內容的格式等信息。

  解決這個問題的方法就是使用一個Envelope Wrapper。該組成會根據消息系統的要求對數據進行處理(如加密),並添加消息系統所需要的各個消息頭。接下來,消息系統將根據消息頭中所包含的信息對消息進行傳遞。而在到達接收端之后,這些消息頭將被解析,接收端也會執行對消息內容的逆向操作,從而完成對消息的傳遞:

  而另一種系統間無法順利進行通訊的方式則是信息的缺失。例如在一個股票查詢系統中,我們常常可以通過股票名稱,股票名稱縮寫以及股票的代碼對股票信息進行查詢。但是由於股票的名稱可能會根據主營業務的更改以及售殼等多種商業行為而發生改變,因此股票系統的內部實現常常只能使用股票的代碼對相關信息進行查詢。因此如果一個查詢消息中只有股票名稱或其縮寫,卻沒有股票的代碼,那么消息系統就需要通過一個外部系統將該查詢消息中的股票代碼部分補全。這種通過外部數據源將必要數據補全或執行數據轉換的組成被稱為Content Enricher:

  反過來,我們也常常需要從一個消息中屏蔽一些信息。這些信息可能是一些機密信息,而不應該對后續的過濾器可見,或是一些數據量非常大卻對后續的處理無用的信息等。在這些情況下,我們就需要創建一個新的消息。該新消息將只包含對后續處理有用的信息,卻將那些不該被暴露的機密信息移除,進而減少單個消息所需要占用的帶寬,提高安全性。如果原有的消息表現形式較為復雜,我們還可以借此機會來生成一個具有簡化的表現形式的消息。而這個功能就是由Content Filter來完成的:

  那如果我們只是在一段時間內不使用某些數據,而是在一些后續的環節使用它們,我們是不是就需要一直傳輸這些數據呢?答案是否定的。一個解決方案就是將這些數據暫時存儲起來,並在消息中添加一個對應的Key。而在再次需要這些數據的時候通過該Key將它們取出:

  從上圖中可以看出,Claim Check的運行主要分為幾個步驟:首先在消息到達Claim Check的時候由Check Luggage將特定數據提取並保存起來,並賦予該塊數據一個特定的Key。該Key將被用來替換這些被提取出的數據,進而減小了需要傳輸的數據量。而在需要這些數據之前,我們則使用Data Enricher組成(實際上就是一個Content Enricher)將這些數據取回,以供后面的各個過濾器使用。

  還有一個和消息轉化相關的問題就是多個子系統對同一事物使用了不同的表現形式。在這種情況下,我們就需要通過Normalizer對消息進行轉化。其內部使用了一個Router將傳入的各個消息根據它們的類型轉發給不同的轉換器,並由這些轉換器將消息中所包含的信息轉化為具有同一表現形式的消息。其工作原理如下圖所示:

  但是如果一個系統需要集成太多的應用,那么Normalizer可能就已經不那么適用了。試想一下,如果一個系統包含了八個獨立的子系統,我們就需要編寫八個Normalizer,而每個Normalizer都需要能夠轉化其它七個子系統所傳來的消息:

  萬一再增加一個新的子系統呢?在這種情況下,我們還需要編寫一個Normalizer,而且對其它幾個Normalizer進行更改,以能夠接受這個新引入系統所使用的數據類型。可以看出,這種維護實際上是一個非常繁瑣的過程。而該問題的解決方案就是為這些數據類型添加一個公用表示,即是Canonical Data Model:

  Canonical Data Model為所有需要集成的各個應用定義了一種通用的數據表示。當需要與其它應用通信時,其將首先把自身的數據表示轉化為Canonical Data Model,然后再將轉化后的數據傳遞給其它應用。如果需要再向系統中添加一個子系統,我們只需要為其添加一個Transformer以能夠在子系統所使用類型和Canonical Data Model之間轉化即可。

 

Endpoints

  接下來,我們要討論的就是Enterprise Integration Pattern中的Endpoint。其用來定義一個應用應該如何與消息系統交互。之所以使用Endpoint這個英文單詞,是因為我一直覺得中文中沒有一個准確的詞能夠清晰地表示其所對應的概念。

  首先我們要介紹的是與數據處理相關的各個Endpoint。這其中最常用的就是Messaging Gateway。其主要功能就是將參與系統的各個子系統轉化包裝成為對消息系統友好的組成,從而彌補兩者之間的差距。試想一下,一個消息系統所接受的接入形式常常是:“向XX發送一個消息”,而一個子系統自身所提供的API則常常是一些具有明確意義的API,如getPrice()。而且由於基於消息的系統是一個異步的系統,而參與該系統的各個子應用常常是按照同步調用設計的。甚至有時子系統所提供的API的力度也會過細。因此通過為該子系統添加一個包裝層來使得該它能夠提供一個粒度合適,完全支持消息系統的API。而這就是Messaging Gateway要做的事情:

  在Messaging Gateway的幫助下,子系統將不再擁有和消息系統相關的信息,從而完成了子系統和消息系統之間的解耦。

  另一個擁有與之類似功能的Endpoint則是Messaging Mapper。其用來完成對數據的轉換。我們在集成一個子系統時常常遇到這樣的情況,那就是該子系統中所使用的數據並不符合要求。在前面對“消息的轉化”一節中我們已經介紹過各個可以用來完成數據轉換的各個組成。與它們不同的是,在使用消息轉化器對消息內容進行轉化時,應用需要向該消息轉化器發送消息,而在轉化完成后再由消息轉化器向外發送消息。也就是說,該過程包含了兩次消息的傳遞:

  但是如果我們可以直接在子系統外圍包裝一層並完成對消息的轉化,那我們不就減少了一次消息的傳輸,提高了效率么:

  當然,這只是Messaging Mapper的一個作用。除此之外,我們還可以在Messaging Mapper中完成一系列其它工作,如解析數據中所包含的引用等。畢竟,在一個子系統中,其內部的表現形式可能並不適合使用消息進行傳輸。因此,將子系統的內部表現形式轉化為適合消息傳遞的形式也是Messaging Mapper的一部分職責。

  有時候,我們需要通過消息系統傳遞一系列相互關聯的消息。由於任何消息的缺失都會導致這組消息失去意義,因此我們需要通過事務保證它們的完整傳遞。而這就是Transactional Client的職責。

  在講解了與數據處理有關的各個Endpoint之后,我們接下來要講解的就是與消息發送/接收相關的各個Endpoint。

  在一個基於消息的系統中,各個子系統在運行時將得到需要處理的消息,對消息進行處理,並在消息處理完畢之后將處理結果通過管道傳遞給其它需要的子系統。那么這些子系統是如何得到消息的呢?其中的一種方法就是通過一次調用從管道中主動讀取數據。這被稱為是Polling Consumer。如果管道中沒有消息,那么從管道中讀取數據的線程將被阻塞,直到新的消息到達。而另一種方法則是由消息的生成方將消息發送到子系統中。而這種方式被稱為Event-Driven Consumer。

  但是有時候,單個的子系統並不能及時處理所有的信息,因此我們就需要多個子系統協同工作來完成這些信息的處理。在這種情況下,我們需要令多個子系統連接到一個Publish-Subscribe管道上,並在消息到來時通過競爭決定需要處理該消息的子系統。這種處理消息的方式被稱為是Competing Consumers:

  然而此時這些子系統都需要添加一個額外的組成,以在一個消息到達管道時決定到底由誰來對該消息進行處理。而另外一種方式則是由一個組成對該消息進行分發,即是Message Dispatcher:

  如果我們僅僅需要處理某個管道上的特定信息,那么我們就需要通過Selective Consumer來有選擇地讀取一些信息:

  在一個Publish-Subscribe管道上,如果一個子系統由於某種原因暫時無法接收消息,那么該消息就會丟失。為了解決這個問題,Enterprise Integration Pattern提出了Durable Subscriber。其會在子系統無法接收消息時將這些消息保存起來,從而允許在該子系統恢復時重新讀取這些消息。一個相反的問題則是對多次接收同一個消息的處理,Enterprise Integration Pattern則提出了Idempotent Receiver。該組成將可以保證在接收到重復消息的時候,這些冗余的消息將不被處理。

  如果一個原本並不支持消息調用方式的服務既要求能夠以消息的方式被調用,那么我們就需要使用一個Service Activator。在一個消息到達時,其內部將會轉化為對該服務的調用。

  好了。到此為止,我們已經將所有Enterprise Integration Pattern中所介紹的消息系統基本組成介紹完畢了。而在下一篇文章中,我們將對這些組成之間易發生混淆的各個組成加以比較,並介紹在消息系統中常見的一些解決方案。

 

轉載請注明原文地址並標明轉載:http://www.cnblogs.com/loveis715/p/5185332.html

商業轉載請事先與我聯系:silverfox715@sina.com

公眾號一定幫忙別標成原創,因為協調起來太麻煩了。。。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM