Spring integration總結


1、概念

  可以理解成一個隊列,在該隊列不同節點部分進行相應的邏輯操作,實現輕量級消息傳遞

 2、結構流程

 

 3、代碼結構:

  3.1配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:integration="http://www.springframework.org/schema/integration"
       xmlns:beans="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">

    <context:annotation-config />

    <context:component-scan base-package="ch.javaee.integration.example.helloWorld"/>

    <!-- this channel is called by the application and the message is passed to it -->
    <integration:channel id="inputChannel"/>

    <integration:channel id="middleChannel"/>

    <!-- this channel receive the modified message -->
    <integration:channel id="outputChannel"/>

    <integration:channel id="finalChannel"/>

    <!-- this service transform the message in input-channel and send the result to output-channel -->
    <!-- the service method to call is referenced in explicitly -->
    <integration:service-activator input-channel="inputChannel" ref="myTestProducerService" method="poll" output-channel="middleChannel"/>

    <!-- this service receives a message and pass it to printerService -->
    <!-- the method that consumes the message is implicitly defined by the @ServiceActivator annotation or it should be the only
    method in the class -->
    <integration:service-activator input-channel="middleChannel" ref="myTestConsumerService" output-channel="outputChannel"/>

    <integration:service-activator input-channel="outputChannel" ref="myTestFinalConsumerService" output-channel="finalChannel"/>

</beans>
View Code

 

             

 

 

 其中1、2、3、4都是定義的channel 

 5、6、7是針對各個channel中數據處理,ref是處理input-channel數據的bean名稱,method是ref中bean里的方法,output-channel存儲是處理后的結果

 Service Activator:可調用Spring的Bean來處理消息,並將處理后的結果輸出到指定的消息通道,其作用使用起來和在配置文件中的method="****"類似

3.2  channel數據處理邏輯

以5為例:
<integration:service-activator input-channel="inputChannel" ref="myTestProducerService" method="poll" output-channel="middleChannel"/>

   3.2.1 input-channel內數據內容: 

public class MyTestDemo {

    public static void main(String[] args) {
        MyTestProducerService myTestService=new MyTestProducerService();
        // create a message with the content "World"
//        buildMessageAndSend("notification", "identify");
        List<String> stringList=new ArrayList<String>();
        stringList.add("Monday");
        stringList.add("Tuesday");
        stringList.add("Wednesday");
        for(int i=0;i<stringList.size();i++){
            myTestService.putMessage(stringList.get(i));
        }
        buildMessageAndSend("notification", "shutdown");
    }


    private static Map<String, String> buildMessageMap(String type, String msg) {
        Map<String, String> msgMap = new HashMap<String, String>();
        msgMap.put("type", type);
        msgMap.put("message", msg);
        msgMap.put("context_id", "123");
        return msgMap;
    }

    private static void buildMessageAndSend(String type, String msg) {
        Map<String, String> notifyMap = buildMessageMap(type, msg);
        // load the Spring context
        ApplicationContext context = new ClassPathXmlApplicationContext("my-spring-config.xml");
        // get the reference to the message channel
        MessageChannel channel = context.getBean("inputChannel", MessageChannel.class);
        try {
            if(channel.send(MessageBuilder.withPayload(notifyMap).build()) == false) {
                throw new Exception("Unable to publish the shutdown notification.");
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}
View Code

 

 

 

 

 

 

   其中 1:加載配置文件

      2:獲取輸入channel名字

      3:發送數據到input-channel中

 3.2.2 處理input-channel中數據

@Service
public class MyTestProducerService {
    /** The message queue. */
    private static BlockingQueue<Object> messageQueue = new ArrayBlockingQueue<Object>(
            100, true);

    public static void putMessage(Object obj){
        messageQueue.add(obj);
    }

    public final Message<Object> poll() throws InterruptedException {
        Object obj = null;
        obj=messageQueue.take();
        Message<Object> message = null;
        if(obj instanceof String){
            String objStr=(String) obj;
            message= MessageBuilder.withPayload((Object)objStr)
                    .build();
        }
        return message;
    }
}
View Code

 

   其中1:對應3.1節中5中ref對應的bean

    2:對應3.1節中5中method對應的方法

    3:返回給3.1節中5中output-channel中的數據

 

3.3 其他channel處理邏輯與3.2節類似了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM