Spring企業集成流 Spring Integration


spring集成(Spring Integration)

 在本章中,將看到如何使用 Spring Integration 的通用集成模式。Spring Integration 是由 Gregor Hohpe 和 Bobby Woolf 在*《企業級集成模式》*一書中編目的許多集成模式的實現。每個模式都被實現為一個組件,消息通過該組件傳輸管道中的數據。使用 Spring 配置,可以將這些組件組裝到數據流經的管道中。讓我們從定義一個簡單的集成流開始,它引入了 Spring Integration 的許多特性。

Why??

那么,我們為什么用它呢?spring-integration的官網上,給出了以下說法

  • spring-integration的目標

    提供一個簡單的模型來實現復雜的企業集成解決方案為基於spring的應用添加異步的、消息驅動的行為讓更多的Spring用戶來使用他看這種解釋,我的直觀感覺是:啥玩意?不懂啊!接着看到spring-integration的原則組件之間應該是松散的,模塊性的易測的應用框架應該強迫分離業務邏輯和集成邏輯擴展節點應該有更好的抽象和可以再使用的能力

    感覺,這個應該說的是解耦吧。另外看了下其他人的理解,如果你的系統處在各個系統的中間,需要JMS交互,又需要Database/Redis/MongoDB,還需要監聽Tcp/UDP等,
    還有固定的文件轉移,分析。還面對着時不時的更改需求的風險。那么,它再適合不過了。

導入依賴

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-file</artifactId>
		</dependency>

第一項依賴是Spring Integration的spring Boot start。不管我們與哪種流進行交互,對於Spring Integration流的開發來講,這個依賴是必須的。第二個依賴是Spring integration的文件端點模塊。這個模塊是與外部系統集成的二十多個模塊之一。但是,就目前而言,要知道文件端點模塊提供了將文件從文件系統提取到集成流或將數據從流寫入文件系統的能力。

創建一個網關

//綁定要發送到的通道名稱
@MessagingGateway(defaultReplyChannel = "textInChannel")
public interface FileWriterGateway {
    void writeToFile(
        //Header添加頭信息 及文件名
            @Header(FileHeaders.FILENAME) String filename,
            String data
    );
    

}

盡管它是一個簡單的 Java 接口,但是關於 FileWriterGateway 還有很多要說的。首先會注意到它是由 @MessagingGateway 注解的。這個注解告訴 Spring Integration 在運行時生成這個接口的實現 —— 類似於 Spring Data 如何自動生成存儲庫接口的實現。當需要編寫文件時,代碼的其他部分將使用這個接口。

@MessagingGatewaydefaultRequestChannel 屬性表示,對接口方法的調用產生的任何消息都應該發送到給定的消息通道。在本例中,聲明從 writeToFile() 調用產生的任何消息都應該發送到名為 textInChannel 的通道。

對於 writeToFile() 方法,它接受一個文件名作為字符串,另一個字符串包含應該寫入文件的文本。關於此方法簽名,值得注意的是 filename 參數使用 @Header 進行了注解。在本例中,@Header 注解指示傳遞給 filename 的值應該放在消息頭中(指定為 FileHeaders),解析為 file_name 的文件名,而不是在消息有效負載中。另一方面,數據參數值則包含在消息有效負載中

配置集成流

xml方式配置

<?xml version="1.0" encoding="UTF-8"?>
<beans  xmlns:context="http://www.springframework.org/schema/context"
        xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-file="http://www.springframework.org/schema/integration/file"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/integration
           http://www.springframework.org/schema/integration/spring-integration.xsd
           http://www.springframework.org/schema/integration/file
           http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
    
    <int:channel id="textInChannel"/>
    <int:transformer input-channel="textInChannel"
                     output-channel="fileWriterChannel"
                     id="upperCase"
                     expression="payload.toUpperCase()"
    />
    <int:channel id="fileWriterChannel"/>
    <int-file:outbound-channel-adapter id="writer" channel="fileWriterChannel"
                                       directory="G:\2020\temp\files\" mode="APPEND"
                                       append-new-line="true"/>
</beans>
  • 配置了一個名為 textInChannel 的通道,這與為 FileWriterGateway 設置的請求通道是相同的。當在 FileWriterGateway 上調用 writeToFile() 方法時,結果消息被發布到這個通道。
  • 配置了一個轉換器來接收來自 textInChannel 的消息。它使用 Spring Expression Language(SpEL)表達式在消息有效負載上調用 toUpperCase()。然后將大寫操作的結果發布到 fileWriterChannel 中。
  • 配置了一個名為 fileWriterChannel 的通道,此通道用作連接轉換器和外部通道適配器的管道。
  • 最后,使用 int-file 命名空間配置了一個外部通道適配器。這個 XML 命名空間由 Spring Integration 的文件模塊提供,用於編寫文件。按照配置,它將接收來自 fileWriterChannel 的消息,並將消息有效負載寫到一個文件中,該文件的名稱在消息的 file_name 頭中指定,該文件位於 directory 屬性中指定的目錄中。如果文件已經存在,則將用換行來追加文件,而不是覆蓋它。

image-20210202222931640

如果希望在 Spring Boot 應用程序中使用 XML 配置,則需要將 XML 作為資源導入 Spring 應用程序。最簡單的方法是在應用程序的 Java 配置類上使用 Spring 的 @ImportResource 注解

@Configuration
@ImportResource("classpath:/filewriter-config.xml")
public class FileWriterIntegrationConfig { ... }

編寫測試類進行測試

@SpringBootTest(classes = SpringIntegrationApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class Test01 {
    @Autowired
    private FileWriterGateway fileWriterGateway;
    @Test
    public void test01(){
        fileWriterGateway.writeToFile("aaa","nihao");
    }
}
image-20210112164603094
  • 查看文件內容

image-20210112164754611

注意我們在xml配置中已經配置為將內容全部大寫

java配置

@Configuration
public class FileWriterIntegrationConfig {
    @Bean
    @Transformer(inputChannel = "textInChannel",outputChannel = "fileWriterChannel")
    public GenericTransformer<String,String> upperCaseTransformer(){
        return new GenericTransformer<String, String>() {
            @Override
            public String transform(String s) {
                return s.toUpperCase();
            }
        };
    }

    @Bean
    // 聲明服務激活器 處理消息
    @ServiceActivator(inputChannel = "fileWriterChannel")
    public FileWritingMessageHandler fileWriter(){
        FileWritingMessageHandler handler = new FileWritingMessageHandler(new File("G:\\2020\\temp\\files\\"));
        //設置沒有答復通道 沒有返回值
        handler.setExpectReply(false);
        //設置文件存在時的處理方式
        handler.setFileExistsMode(FileExistsMode.APPEND);
        //允許追加新行
        handler.setAppendNewLine(true);
        return handler;
    }
}

這里的通道如果沒有spring則會自動創建

也可通過注入bean的方式進行配置

@Bean
public MessageChannel textInChannel() {
    return new DirectChannel();
}
...
@Bean
public MessageChannel fileWriterChannel() {
    return new DirectChannel();
}

可以說,Java 配置選項更易於閱讀,也更簡潔,而且與我在本書中所追求的純 Java 配置完全一致。但是,通過 Spring Integration 的 Java DSL(領域特定語言)配置風格,它可以變得更加精簡。

使用java DSL配置

 @Bean
    public IntegrationFlow fileWriterFlow(){
        //入站通道
        return IntegrationFlows
                .from(MessageChannels.direct("textInChannel"))
                .channel(MessageChannels.direct("fileWriterChannel"))
                //聲明轉換器
                .<String,String>transform(t->t.toUpperCase())
                .handle(Files.outboundAdapter(new File("G:\\2020\\temp\\files\\"))
                        .fileExistsMode(FileExistsMode.APPEND)
                        .appendNewLine(true)
                ).get();
    }

這個新配置盡可能簡潔,用一個 bean 方法捕獲整個流。IntegrationFlows 類初始化了這個構建者 API,可以從該 API 聲明流。

注意,與 Java 配置示例一樣,不需要顯式地聲明通道 bean。雖然引用了 textInChannel,但它是由 Spring Integration 自動創建的,因為沒有使用該名稱的現有通道 bean。但是如果需要,可以顯式地聲明通道 bean。

Spring Integration 功能概覽

  • 通道 Channels —— 將信息從一個元素傳遞到另一個元素。
  • 過濾器 Filters —— 有條件地允許基於某些標准的消息通過流。
  • 轉換器 Transformers —— 更改消息值或將消息有效負載從一種類型轉換為另一種類型。
  • 路由器 Routers —— 直接將信息發送到幾個渠道之一,通常是基於消息頭。
  • 切分器Splitters —— 將收到的信息分成兩條或多條,每條都發送到不同的渠道。
  • 聚合器 Aggregators —— 與分離器相反,它將來自不同渠道的多條信息組合成一條信息。
  • 服務激活器 Service activators —— 將消息傳遞給某個 Java 方法進行處理,然后在輸出通道上發布返回值。
  • 通道適配器 Channel adapters —— 將通道連接到某些外部系統或傳輸。可以接受輸入,也可以向外部系統寫入。
  • 網關 Gateways —— 通過接口將數據傳遞到集成流。

消息通道

​ 消息通道意指消息移動的集成管道移動。它們是連接 Spring Integration 所有其他部分的管道。

Spring Integration 提供了多個管道的實現,包括以下這些:

  • PublishSubscribeChannel —— 消息被發布到 PublishSubscribeChannel 后又被傳遞給一個或多個消費者。如果有多個消費者,他們都將會收到消息。
  • QueueChannel —— 消息被發布到 QueueChannel 后被存儲到一個隊列中,直到消息被消費者以先進先出(FIFO)的方式拉取。如果有多個消費者,他們中只有一個能收到消息。
  • PriorityChannel —— 與 QueueChannel 類似,但是與 FIFO 方式不同,消息被冠以 priority 的消費者拉取。
  • RendezvousChannel —— 與 QueueChannel 期望發送者阻塞通道,直到消費者接收這個消息類似,這種方式有效的同步了發送者與消費者。
  • DirectChannel —— 與 PublishSubscribeChannel 類似,但是是通過在與發送方相同的線程中調用消費者來將消息發送給單個消費者,此通道類型允許事務跨越通道。
  • ExecutorChannel —— 與 DirectChannel 類似,但是消息分派是通過 TaskExecutor 進行的,在與發送方不同的線程中進行,此通道類型不支持事務跨通道。
  • FluxMessageChannel —— Reactive Streams Publisher 基於 Project Reactor Flux 的消息通道。(我們將會在第 10 章討論 Reactive Streams、Reactor 和 Flux)

在 Java 配置和 Java DSL 樣式中,輸入通道都是自動創建的,默認是 DirectChannel。但是,如果希望使用不同的通道實現,則需要顯式地將通道聲明為 bean 並在集成流中引用它。例如,要聲明 PublishSubscribeChannel,需要聲明以下 @Bean 方法:

 @Bean
    public MessageChannel orderChannel(){
        return new PublishSubscribeChannel();
    }

然后在集成流定義中通過名稱引用這個通道。例如,如果一個服務 activator bean 正在使用這個通道,那么可以在 @ServiceActivatorinputChannel 屬性中引用它:

@ServiceActovator(inputChannel="orderChannel")

或者,如果使用 Java DSL 配置方式,需要通過調用 channel() 方法引用它:

@Bean
public IntegrationFlow orderFlow() {
    return IntegrationFlows
        ...
        .channel("orderChannel")
        ...
        .get();
}

需要注意的是,如果使用 QueueChannel,則必須為使用者配置一個輪詢器。例如,假設已經聲明了一個這樣的 QueueChannel bean:

@Bean
public MessageChannel orderChannel() {
    return new QueueChannel();
}

需要確保將使用者配置為輪詢消息通道。在服務激活器的情況下,@ServiceActivator 注解可能是這樣的:

@ServiceActivator(inputChannel="orderChannel", poller=@Poller(fixedRate="1000"))

在本例中,服務激活器每秒(或 1,000 ms)從名為 orderChannel 的通道輪詢一次。

過濾器

過濾器置於集成管道之間,它能夠根據斷言允許或拒絕消息進入流程的下一步.

​ 例如,假設消息包含整型的值,它們要通過名為numberChannel的通道進行發布,但是我們只想讓偶數進入名為evenNumberChannel的通道。在這種情況下,我們可以使用@Filter注解定義一個過濾器:

 @Filter(inputChannel = "numberChannel",outputChannel = "evenNumberChannel")
    public boolean evenNumberFilter(Integer integer){
        return integer%2==0;
    }

過濾器可以放置在集成管道的中間,以允許或不允許消息進入流中的下一個步驟。

例如,假設包含整數值的消息通過名為 numberChannel 的通道發布,但是只希望偶數傳遞到名為 evenNumberChannel 的通道。在這種情況下,可以使用 @Filter 注解聲明一個過濾器,如下所示:

@Bean
public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {
    return IntegrationFlows
        ...
        .<Integer>filter((p) -> p % 2 == 0)
        ...
        .get();
}

轉換器

​ 轉換器會對消息執行一些操作,一般會形成不同的消息,有可能會產生不同的載荷類型

  • 這里將數字轉換為羅馬數
 @Bean
    @Transformer(inputChannel = "textInChannel",outputChannel = "fileWriterChannel")
    public GenericTransformer<Integer,String> upperCaseTransformer(){
        return RomanNumbers::toRoman;
    }

也可用java DSL配置

 @Bean
    public IntegrationFlow transformerFlow(){
        return IntegrationFlows
                .from(MessageChannels.direct("textInChannel"))
                .channel(MessageChannels.direct("fileWriterChannel"))
                //聲明轉換器
                .<Integer,String>transform(RomanNumbers::toRoman)
                .handle(Files.outboundAdapter(new File("G:\\2020\\temp\\files\\"))
                        .fileExistsMode(FileExistsMode.APPEND)
                        .appendNewLine(true)
                ).get();
    }

路由器

假設我們有一個名為numberChannel的通道,他會傳輸整型值。我們想要有偶數的消息定位到名為evenChannel通道,奇數消息定位到oddChannel通道。要在集成流中創建這樣一個路由器,我們可以聲明一個AbstractMessageRouter類型的bean,並為其添加@Router注解:

 @Bean
    @Router(inputChannel = "numberChannel")
    public AbstractMessageRouter evenOddRouter(){
        return new AbstractMessageRouter() {
            @Override
            protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
                Integer payload = (Integer) message.getPayload();
                if (payload%2==0){
                    return Collections.singleton(evenChannel());
                }
                return Collections.singleton(oddChannel());

            }
        };
    }

    @Bean
    public MessageChannel evenChannel(){
        return new DirectChannel();
    }

    @Bean
    public MessageChannel oddChannel(){
        return new DirectChannel();
    }

java DSL風格的配置

@Bean
public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
    return IntegrationFlows
        ...
        .<Integer, String>route(n -> n%2==0 ? "EVEN":"ODD", mapping ->
            mapping.subFlowMapping("EVEN", sf -> 
               sf.<Integer, Integer>transform(n -> n * 10).handle((i,h) -> { ... }))
                 .subFlowMapping("ODD", sf -> 
                     sf.transform(RomanNumbers::toRoman).handle((i,h) -> { ... }))
            )
        .get();
}

雖然仍然可以聲明 AbstractMessageRouter 並將其傳遞給 route(),但是本例使用 lambda 表達式來確定消息有效負載是奇數還是偶數。

如果是偶數,則返回一個偶數的字符串值。如果是奇數,則返回奇數。然后使用這些值來確定哪個子映射將處理消息。

分割器

有時,在集成流中,將消息拆分為多個獨立處理的消息可能很有用。Splitter 將為分割並處理這些消息。

Splitter 在很多情況下都很有用,但是有兩個基本用例可以使用 Splitter:

  • 消息有效載荷,包含單個消息有效載荷相同類型的項的集合。例如,攜帶產品列表的消息可能被分成多個消息,每個消息的有效負載是一個產品。
  • 信息有效載荷,攜帶的信息雖然相關,但可以分為兩種或兩種以上不同類型的信息。例如,購買訂單可能包含交付、帳單和行項目信息。交付細節可能由一個子流程處理,賬單由另一個子流程處理,每一項則由另一個子流程處理。在這個用例中,Splitter 后面通常跟着一個路由器,它根據有效負載類型路由消息,以確保正確的子流處理數據。
image-20210114162359243

當將消息有效負載拆分為兩個或多個不同類型的消息時,通常只需定義一個 POJO 即可,該 POJO 提取傳入的有效負載的各個部分,並將它們作為集合的元素返回。

例如,假設希望將攜帶購買訂單的消息拆分為兩條消息:一條攜帶賬單信息,另一條攜帶項目列表。下面的 OrderSplitter 將完成這項工作:

public class OrderSplitter {
    public Collection<Object> splitOrderIntoParts(PurchaseOrder po){
        ArrayList<Object> list = new ArrayList<>();
        list.add(po.getBillingInfo());
        //lineItems是一個list集合
        list.add(po.getLineItems());
        return list;
    }
}

接下來,我們生明一個OrderSplitter bean,並通過@Splitter注解將其作為集成流的一部分

@Bean
    @Splitter(inputChannel = "poChannel",outputChannel = "splitOrderChannel")
    public OrderSplitter orderSplitter(){
        return new OrderSplitter();
    }

在這里聲明訂單會到達名為poChannel的通道,他們會將OrderSplitter切分。然后,返回集合中的每個條目將會作為集成流中獨立的消息,他們會發布到名為splitOrderChannel的通道上,聲明一個路由器分別路由到他們自己的子流上:

 @Bean
    @Router(inputChannel = "splitOrderChannel")
    public MessageRouter splitOrderRouter(){
        PayloadTypeRouter payloadTypeRouter = new PayloadTypeRouter();
        payloadTypeRouter.setChannelMapping(BillingInfo.class.getName(),"billingInfoChannel");
        payloadTypeRouter.setChannelMapping(List.class.getName(),"lineItemsChannel");
        return payloadTypeRouter;
    }

可以再對lineItems進行分割

@Bean
    @Splitter(inputChannel = "lineItemsChannel",outputChannel = "lineItemChannel")
    public OrderSplitter orderSplitter(){
        return new OrderSplitter();
    }

這樣lineItems集合中的每個元素都會發布到一個消息中,這些消息將會被發布到名為lineItemChannel的通道中。

使用java DSL方式實現同樣的splitter/router配置,那么我們可以通過調用split()和router()來實現

return IntegrationFlows
    ...
    .split(orderSplitter())
    .<Object, String> route(p -> {
        if (p.getClass().isAssignableFrom(BillingInfo.class)) {
            return "BILLING_INFO";
        } else {
            return "LINE_ITEMS";
        }
    }, mapping ->
           mapping.subFlowMapping("BILLING_INFO", sf -> 
                      sf.<BillingInfo> handle((billingInfo, h) -> { ... }))
                  .subFlowMapping("LINE_ITEMS", sf -> 
                       sf.split().<LineItem> handle((lineItem, h) -> { ... }))
    )
    .get();

服務激活器

服務激活器接受來自輸入通道的消息並將這些消息發送到一個MessageHandler的實現。

image-20210115193910725

Spring integration 提供了多個開箱即用的MessageHandler (payloadTypeRouter甚至就是messageHandler的實現),但是我們通常會需要提供一些自定義的實現作為服務激活器。作為樣例,如下代碼聲明了如何聲明MessageHandler bean並將其配置為服務激活器:

@Bean
    @ServiceActivator(inputChannel = "someChannel")
    public MessageHandler sysoutHandler(){
        return message -> System.out.println("Message payload:"+message.getPayload());

    }

​ 這個bean使用了@ServiceActivator注解,表明它會作為一個服務激活器處理來自someChannel通道的消息。

 @Bean
    @ServiceActivator(inputChannel = "someChannel")
    public GenericHandler<Order> sysoutHandler(){
        return ((payload, headers) -> orderRepo.save(payload));

    }

在本例中服務器會接受一個載荷類型為Order的消息。當訂單抵達時我們會通過一個repository將它保存起來,並返回保存一個order,這個order隨后被發送至名為completeChannel的輸出通道。

GenericHandler不僅可以獲取載荷,並且可以獲取消息頭

java DSL配置

public IntegrationFlow someFlow() {
    return IntegrationFlows
        ...
        .handle(msg -> {
            System.out.println("Message payload: " + msg.getPayload());
        })
        .get();
}
  • GenericHandler<Order>用java DSL配置
public IntegrationFlow orderFlow(OrderRepository orderRepo) {
    return IntegrationFlows
        ...
        .<Order>handle((payload, headers) -> {
            return orderRepo.save(payload);
        })
        ...
        .get();
}

當利用 GenericHandler 時,lambda 表達式或方法參考接受該消息的有效載荷和報頭作為參數。另外,如果選擇在一個流程的結束使用 GenericHandler,需要返回 null,否則會得到這表明有沒有指定輸出通道的錯誤。

網關

通過網關,應用可以提交數據到集成流中,並且能夠可選地接收流的結果作為響應。網關會聲明為接口。

image-20210115202519169
//FileWriterGateway就是一個網關
@Component
@MessagingGateway(defaultRequestChannel = "textInChannel",defaultReplyChannel="outChannel")
public interface FileWriterGateway {
    void writeToFile(
            @Header(FileHeaders.FILENAME) String filename,
            String data
    );

}

這個接口不需要實現Spring integration會自動在運行時提供一個實現,它會通過特定的通道發送和接受消息。

當接口中的方法被調用時,給定的string流會發送到集成流中,進入名為“inChannel”的通道。不管流是如何定義的或者它都干了些什么,當數據進入名為outChannel的通道時,他將會從方法返回。

return IntegrationFlows
                .from("inChannel")
                .<String,String>transform(s->s.toUpperCase())
                .channel("outChannel")
                .get();

​ 按照定義,這個流會從進入InChannel通道的數據開始,消息載荷會由轉換器進行處理,也就是執行大寫操作(通過lambda表達式進行)。結果形成的消息被發送到名為outChannel通道,也就是我們在UpperCaseGateWay中聲明的答復通道。

通道適配器

​ 通道適配器代表了集成流入口和出口。數據通過入站通道適配器(inbound channel adapter)進入一個集成流,通過出站通道適配器離開一個集成流,

image-20210117144715443

通道適配器代表集成信息流的入口點和出口點。數據通過入站信道適配器的方式進入到集成流中,通過出站信道適配器的方式離開集成流。

入站信道的適配器可以采取多種形式,這取決於它們引入到流的數據源。例如,聲明一個入站通道適配器,它采用從 AtomicInteger 到流遞增的數字。使用 Java 配置,它可能是這樣的:

@Bean
@InboundChannelAdapter(
    poller=@Poller(fixedRate="10"), channel="numberChannel")
public MessageSource<Integer> numberSource(AtomicInteger source) {
    return () -> {
        return new GenericMessage<>(source.getAndIncrement());
    };
}

測試這個適配器

要測試這個通道適配器還必須要聲明一個服務激活器 也就是出站通道適配器

@Bean
    @ServiceActivator(inputChannel = "numberChannel")
    public MessageHandler header(){
        return System.out::println;
    }

讓他直接打印結果

  • 聲明隊列
 @Bean
    public MessageChannel numberChannel() {
        return new QueueChannel();
    }

注意:還必須聲明一個全局的輪詢器

@Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller() {

        PollerMetadata pollerMetadata = new PollerMetadata();
        pollerMetadata.setTrigger(new PeriodicTrigger(10));
        return pollerMetadata;
    }

也可已通過ServiceActivator注解配置poller屬性

xml中如下配置

<int:poller default="true" fixed-delay="50"/>

不寫的話會報錯 https://stackoverflow.com/questions/20799737/spring-integration-no-poller-has-been-defined-for-endpoint

java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2', and no default poller is available within the context.

編寫測試類

@Autowired
    private FileWriterGateway fileWriterGateway;
    @Autowired
    private AtomicInteger atomicInteger;
    @Test
    public void test01(){
        //調用網關接口
        fileWriterGateway.writeToFile(atomicInteger);
    }
  • 結果
image-20210117165009991

當使用 Java 配置時,@InboundChannelAdapter 意味着是一個入站通道適配器,from() 方法就是使用 Java DSL 來定義流的時候,表明它是怎么處理的。下面對於流定義的一個片段展示了在 Java DSL 配置中類似的輸入通道適配器:

@Bean
public IntegrationFlow someFlow(AtomicInteger integerSource) {
    return IntegrationFlows
        .from(integerSource, "getAndIncrement",
              c -> c.poller(Pollers.fixedRate(1000)))
        ...
        .get();
}

通常情況下,通道適配器通過的 Spring Integration 的多端點模塊之一進行提供。舉個例子,假設需要一個入站通道適配器,用它來監視指定的目錄,同時將任何寫入到那個目錄中的文件作為消息,提交到名為 file-channel 的通道中。下面的 Java 配置使用 FileReadingMessageSource 從 Spring Integration 的文件端點模塊來實現這一目標:

@Bean
@InboundChannelAdapter(channel="file-channel",
                       poller=@Poller(fixedDelay="1000"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource sourceReader = new FileReadingMessageSource();
    sourceReader.setDirectory(new File(INPUT_DIR));
    sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
    return sourceReader;
}

當在 Java DSL 中寫入同樣的 file-reading 入站通道適配器時,來自 Files 類的 inboundAdapter() 方法達到的同樣的目的。出站通道適配器位於集成信息流的最后位置,將最終消息扇出到應用程序或是其他系統中:

@Bean
public IntegrationFlow fileReaderFlow() {
    return IntegrationFlows
        .from(Files.inboundAdapter(new File(INPUT_DIR))
              .patternFilter(FILE_PATTERN))
        .get();
}

服務激活器(作為消息處理的實現)往往是為出站通道適配器而存在的,特別是當數據需要被扇出到應用程序本身的時候。

讀取信息實戰

https://blog.csdn.net/qq_40929047/article/details/89569887


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM