[轉] Spring Integration 系統集成


【From】 http://blog.csdn.net/w_x_z_/article/details/53316618

 

pring Ingegration 提供了基於Spring的EIP(Enterprise Integration Patterns,企業集成模式)的實現。Spring Integration 主要解決的問題是不同系統之間交互的問題,通過異步消息驅動來達到系統交互時系統之間的松耦合。

Spring Integration 主要有Message、Channel、Message EndPoint組成。

 

Message

Message是用來在不同部分之間傳遞的數據。Message有兩部分組成:消息體(payload)與消息頭(header)。消息體可以是任何數據類型;消息頭表示的元數據就是解釋消息體的內容。

/**
 * A generic message representation with headers and body.
 *
 * @author Mark Fisher
 * @author Arjen Poutsma
 * @since 4.0
 * @see org.springframework.messaging.support.MessageBuilder
 */
public interface Message<T> {

    /**
     * Return the message payload.
     */
    T getPayload();

    /**
     * Return message headers for the message (never {@code null} but may be empty).
     */
    MessageHeaders getHeaders();

}

 

Channel

在消息系統中,消息發送者發送消息到通道(Channel),消息接受者從通道(Channel)接收消息。

 

1、頂級接口

(1) MessageChannel

MessageChannel 是Spring Integration消息通道的頂級接口:

public interface MessageChannel {

    /**
     * Constant for sending a message without a prescribed timeout.
     */
    long INDEFINITE_TIMEOUT = -1;


    /**
     * Send a {@link Message} to this channel. If the message is sent successfully,
     * the method returns {@code true}. If the message cannot be sent due to a
     * non-fatal reason, the method returns {@code false}. The method may also
     * throw a RuntimeException in case of non-recoverable errors.
     * <p>This method may block indefinitely, depending on the implementation.
     * To provide a maximum wait time, use {@link #send(Message, long)}.
     * @param message the message to send
     * @return whether or not the message was sent
     */
    boolean send(Message<?> message);

    /**
     * Send a message, blocking until either the message is accepted or the
     * specified timeout period elapses.
     * @param message the message to send
     * @param timeout the timeout in milliseconds or {@link #INDEFINITE_TIMEOUT}
     * @return {@code true} if the message is sent, {@code false} if not
     * including a timeout of an interrupt of the send
     */
    boolean send(Message<?> message, long timeout);

}

 

MessageChannel 有兩大子接口,分別是PollableChannel (可輪詢)和SubscribableChannel(可訂閱)。我們所有的消息通道類都是現實這兩個接口。

 

(2) PollableChannel

PollableChannel 具備輪詢獲得消息的能力。

public interface PollableChannel extends MessageChannel {

    /**
     * Receive a message from this channel, blocking indefinitely if necessary.
     * @return the next available {@link Message} or {@code null} if interrupted
     */
    Message<?> receive();

    /**
     * Receive a message from this channel, blocking until either a message is available
     * or the specified timeout period elapses.
     * @param timeout the timeout in milliseconds or {@link MessageChannel#INDEFINITE_TIMEOUT}.
     * @return the next available {@link Message} or {@code null} if the specified timeout
     * period elapses or the message reception is interrupted
     */
    Message<?> receive(long timeout);

}

 

(3) SubscribableChannel

SubscribableChannel 發送消息給訂閱了MessageHanlder的訂閱者

public interface SubscribableChannel extends MessageChannel {

    /**
     * Register a message handler.
     * @return {@code true} if the handler was subscribed or {@code false} if it
     * was already subscribed.
     */
    boolean subscribe(MessageHandler handler);

    /**
     * Un-register a message handler.
     * @return {@code true} if the handler was un-registered, or {@code false}
     * if was not registered.
     */
    boolean unsubscribe(MessageHandler handler);

}

 

 2、常用消息通道

(1)、PublishSubscribeChannel

PublishSubscribeChannel允許廣播消息給所有訂閱者,配置方式如下:

    /**
     * 允許廣播消息給所有訂閱者,當前消息通道的id為publishSubscribeChannel
     * @return
     */
    @Bean
    public PublishSubscribeChannel publishSubscribeChannel(){
        PublishSubscribeChannel channel = new PublishSubscribeChannel();
        return channel;
    }

 其中,當前消息通道的id為publishSubscribeChannel。

 

(2)、QueueChannel

QueueChannel允許消息接收者輪詢獲得消息,用一個隊列(queue)接收消息,隊列的容量大小可配置,配置方式如下:

    @Bean
    public QueueChannel queueChannel(){
        QueueChannel channel = new QueueChannel(10);
        return channel;
    }

 其中,QueueChannel構造參數10即為隊列的容量。

 

(3)、PriorityChannel

PriorityChannel可按照優先級將數據存儲到隊列,它依據於消息的消息頭priority屬性,配置方式如下:

    @Bean
    public PriorityChannel priorityChannel(){
        PriorityChannel channel = new PriorityChannel(10);
        return channel;
    }

 

(4)、RendezvousChannel

RendezvousChannel確保每一個接收者都接收到消息后再發送消息,配置方式如下:

    @Bean
    public RendezvousChannel rendezvousChannel(){
        RendezvousChannel channel = new RendezvousChannel();
        return channel;
    }

 

(5) DirectChannel

DirectChannel是Spring Integration默認的消息通道,它允許將消息發送給為一個訂閱者,然后阻礙發送直到消息被接收,配置方式如下:

    @Bean
    public DirectChannel directChannel(){
        DirectChannel channel = new DirectChannel();
        return channel;
    }

 

(6)、ExecutorChannel

ExecutorChannel可綁定一個多線程的task executor,配置方式如下:

    @Bean
    public ExecutorChannel executorChannel(){
        ExecutorChannel channel = new ExecutorChannel(executor());
        return channel;
    }

    @Bean
    public Executor executor(){
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(25);
        taskExecutor.initialize();
        return taskExecutor;
    }

 

3、通道攔截器

Spring Integration給消息通道提供了通道攔截器(ChannelInterceptor),用來攔截發送和接收消息的操作.

ChannelInterceptor接口定義如下,我們只需要實現這個接口即可:

 public interface ChannelInterceptor {

        Message<?> preSend(Message<?> message, MessageChannel channel);

        void postSend(Message<?> message, MessageChannel channel, boolean sent);

        void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);

        boolean preReceive(MessageChannel channel);

        Message<?> postReceive(Message<?> message, MessageChannel channel);

        void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);

    }

 通過如下代碼給所有的channel增加攔截器

channel.addInterceptor(someInterceptor);

 

Message EndPoint

消息端點(Message EndPoint)是真正處理消息的(Message)組件,它還可以控制通道的路由。我們可用的消息端點包含如下:

 

(1) Channel Adapter

通道適配器(Channel Adapter)是一種連接外部系統或傳輸協議的端點(EndPoint),可以分為入站(inbound)和出站(outbound)。 
通道適配器是單向的,入站通道適配器只支持接收消息,出站通道適配器只支持輸出消息。

Spring Integration內置了如下的適配器:

RabbitMQ、Feed、File、FTP/SFTP、Gemfire、HTTP、TCP/UDP、JDBC、JPA、JMS、Mail、MongoDB、Redis、RMI
Twitter、XMPP、WebServices(SOAP、REST)、WebSocket

 

(2) Gateway

消息網關(Gateway)類似於Adapter,但是提供了雙向的請求/返回集成方式,也分為入站(inbound)和出站(outbound)。 
Spring Integration 對響應的Adapter都提供了Gateway。

 

(3) Service Activator

Service Activator 可調用Spring的Bean來處理消息,並將處理后的結果輸出到指定的消息通道。

 

(4) Router

路由(Router) 可根據消息體內容(Payload Type Router)、消息頭的值(Header Value Router) 以及定義好的接收表(Recipient List Router) 作為條件,來決定消息傳遞到的通道。

 

(5) Filter

過濾器(Filter) 類似於路由(Router),不同的是過濾器不決定消息路由到哪里,而是決定消息是否可以傳遞給消息通道。

 

(6) Splitter

拆分器(Splitter)將消息拆分為幾個部分單獨處理,拆分器處理的返回值是一個集合或者數組。

 

(7) Aggregator

聚合器(Aggregator)與拆分器相反,它接收一個java.util.List作為參數,將多個消息合並為一個消息。

 

(8) Enricher

當我們從外部獲得消息后,需要增加額外的消息到已有的消息中,這時就需要使用消息增強器(Enricher)。消息增強器主要有消息體 
增強器(Payload Enricher)和消息頭增強器(Header Enricher)兩種。

 

(9) Transformer

轉換器(Transformer)是對獲得的消息進行一定的轉換處理(如數據格式轉換).

 

(10) Bridge

使用連接橋(Bridge)可以簡單的將兩個消息通道連接起來。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM