Apache Camel的認識和應用詳解


一,什么是apache camel?

apache camel官網:https://camel.apache.org/

apache camel 是輕量級ESB框架(什么是ESB框架?

它有幾個比較重要的概念就是:

  1. endpoint,所謂的endpoint,就是一種可以接收或發送數據的組件。可以支持多種協議,如jms,http,file等。
  2. processor,它是用來處理具體業務邏輯的組件。
  3. route,用來路由,指示數據從哪里來到哪里去,中間用哪個processor處理。

而processor之間用exchange對象來傳送數據,有點像jms,通俗一點就像上學時傳的小紙條,

架構圖如下

camel就是企業信息集成框架,它提供了很多簡單好用而又強大的組件,用戶可以根據場景來選擇不同的EIP(企業集成模式)來實現自己的需求,以響應快速變化的業務。可以把它當成企業信息總線(ESB)的輕量級實現。

引用別人的文章:https://www.jianshu.com/p/1bdc821423ee

camel是一款基於規則快速實現消息流轉的開發組件,集成該組件后,你的程序可以編寫最少的代碼實現復雜的消息在不同的協議規則間流轉。

例如:

程序實現從Ftp獲得.xml文件,然后將收到的文件內容值轉換后,發送到Jms Queue中,並且將Request寫入到數據庫log表。

Ftp組件->Jms組件->Db組件

只需要短短的幾行代碼就可以實現這樣一個功能,但是如果用其他框架一個個功能的寫,將會有非常多的代碼量並且可能會出現一些紕漏,而camel已經將這些功能都封裝在camel組件中了,節省開發成本。

from("ftp://xxxxxxxxxxxxx").bean("bean:JmsQueueCovertBean?method=convert").to("jms://xxxxxxxxxxx")..setBody(simple("insert into xxxxxxxxxxx")).to("jdbc:testdb");

轉載灰信網:https://www.freesion.com/article/1267830277/

1,我們要解決的問題以及解決方案

我們要解決的問題

  1. 完成組織內外的各種異構系統、應用、數據源之間共享和交換信息。
  2. 優化現有結構,使整個系統易於拓展和維護。
  3. 保證多個系統各自獨立互不干擾。

總結發現我們實際要解決的問題是:企業應用集成(Enterprise Application Integration,EAI) 是完成在組織內、外的各種異構系統,應用和數據源之間共享和交換信息和協作的途徑,方法學,標准和技術。

2,EAI的常用解決方案

SOA(Service Oriented Architecture) 中文釋義為 “面向服務的架構”它是一種設計理念,其中包含多個服務, 服務之間通過相互依賴最終提供一系列完整的功能。各個服務通常以獨立的形式部署運行,服務之間通過網絡進行調用。要求各個服務遵循統一的規范和契約。

ESB企業服務總線

ESB(Enterprise Service Bus,即企業服務總線) 就是一根管道,用來連接各個服務節點。ESB的存在是為了集成基於不同協議的不同服務,ESB 做了消息的轉化、解釋以及路由的工作,以此來讓不同的服務互聯互通

3,如何實現ESB

當前實現ESB比較成熟的模型為EIP(Enterprise Integration Patterns)1。他的包含如下規范:

  • 集成方式(Integration Styles):EIP規定所有集成模式要基於消息傳送模式。
  • 通道模式(Channel Patterns):消息通過通道進行傳遞。
  • 消息體模式(Message Construction Patterns):描述了在消息系統中交互的消息的規范。
  • 路由模式(Routing Patterns):消息如何從發送者分發到正確的接收者,中間一般不進行修改。
  • 轉換模式(Transformation Patterns):將消息體的內容修改為接收者可以理解的結構,中間可能要對數據進行修改或者篩選。
  • 終端模式(Endpoint Patterns):生成或者接收消息的客戶。
  • 系統管理模式(System Management Patterns):提供監控整個系統狀態的工具,包括錯誤處理,壓力測試或者監控系統變化。

4,為什么使用CAMEL

當前熱門的EIP集成框架分別有:Spring Integration、Mule ESB、Apache Camel。接下來分別對三個框架進行分析(打分為博主的評估,僅供參考):

Spring Integration只提供了非常基礎的支持,如文件,FTP,JMS,TCP,HTTP或Web服務。集成是通過編寫大量的XML代碼(沒有一個真正的DSL)實現的。使用它,寓意着大量的XML編寫工作

Mule ESB不是僅僅一個集成框架,而是一個包括一些額外功能的完整ESB,比Spring集成它更像是一個DSL。
因為是一個完整的ESB,所以集成邏輯會比較復雜

Apache Camel實現了你能想到的幾乎每一個技術,提供很多組件,同時你可以很容易的自定義組件。而且Camel和Spring的集成很完善。Camel可以實現用到才依賴,不用不依賴。

二,CAMEL可以做什么

1,APACHE CAMEL簡介

Apache camel 是一個基於EIP的開源框架。實現了EIP定義的一些不同應用系統之間的消息傳輸模型,包括常見的Point2Point、Pub/Sub模型。

Camel的消息傳遞系統(Message System)2:

  • 終端(Message Endpoint):可以是異構的業務系統,都需要提供Endpoint實現集成。
  • 通道(Message Channel):兩個應用之間進行信息通訊的通道。
  • 消息(Message):Endpoint之間交互的標准化單位。
  • 路由(Message Router):根據一定的條件,將消息傳遞給不同的過濾器以實現對單個處理步驟的解耦。
  • 轉換器(Message Translator):消息在傳輸過程中的轉換和數據映射,包括報文格式轉換和內容轉換映射。
  • 管道和過濾器(Pipes & Filters):在保持獨立性和靈活性的基礎上,對復雜的消息進行處理。

2,CAMEL的應用場景

  • 消息匯聚:比如將來自不同服務器的數據,有ActiveMQ、RabbitMQ、WebService等的數據合成報表。
  • 消息分發:將消息從消息生產者轉發給消息接收者,分發方式分為兩種:順序分發&並行分發。
from("amqp:queue:order")
.to("uri:validateBean", "uri:handleBean", "uri:emailBean");

from("amqp:queue:order")
.multicast()
.to("uri:validateBean", "uri:handleBean", "uri:emailBean");
  • 消息轉換:將消息內容進行轉換,比如xml轉為json格式。
from("amqp:queue:order")
.process(new XmlToJsonProcessor())
.to("bean:orderHandler");
  • 規則引擎:可以使用Spring XML配置或DSL來定義route。同時camel提供了大量內置Processor,用於邏輯運算、過濾等,這樣更容易靈活的管理route。
<route>
    <from uri="amqp:queue:order"/>
    <multicast>
        <to uri="uri:validateBean"/>
        <to uri="uri:handleBean"/>
        <to uri="uri:emailBean"/>
    </multicast>
</route>
from("amqp:queue:order")
.filter(header("foo")
.isEqualTo("bar"))
.choice()
.when(xpath("/person/city = &#39;London&#39;"))
  .to("file:target/messages/uk")
.otherwise()
  .to("file:target/messages/others");

3,CAMEL的核心要素

Camel有以下五要素:

  • Endpoint:用於收發消息。
  • Exchange:消息本體。
  • Processor:消息處理器。
  • Routing:路由規則。
  • Service:Camel基礎概念。

4,ENDPOINT

  • Endpoint是Camel與其他系統進行通信的設定點。
  • Camel自身提供了廣泛的通信協議支持,例如:RPC協議、HTTP協議、FTP協議……
  • Camel中的Endpoint使用URI描述對目標系統的通信。
  • 對Endpoint實例的創建通過對Camel中org.apche.camel.Component接口的實現來實現的。
  • Camel通過Plug方式提供對各種協議的Endpoint支持,如果需要使用某種Endpoint,需要引入響應的plug。例如要使用Camel對Netty4-Endpoint的支持,要引入camel-netty4的依賴包。

5,EXCHANGE

  • Properties:Exchange對象貫穿整個路由執行過程中的控制端點、處理器甚至還有表達式、路由條件判斷。為了讓這些元素能夠共享一些開發人員自定義的參數配置信息,Exchange以K-V結構提供了這樣的參數配置信息存儲方式。
  • Patterns:Exchange中的pattern屬性非常重要,它的全稱是:ExchangePattern(交換器工作模式)。其實現是一個枚舉類型:org.apache.camel.ExchangePattern。可以使用的值包括:InOnly, RobustInOnly, InOut, InOptionalOut, OutOnly, RobustOutOnly, OutIn, OutOptionalIn。從Camel官方已公布的文檔來看,這個屬性描述了Exchange中消息的傳播方式。
  • Message IN/OUT:當Endpoint和Processor、Processor和Processor間的Message在Exchange中傳遞時,Exchange會自動將上一個元素的輸出作為這個元素的輸入使用。

6,PROCESSOR

Processor用於接受從Endpoint、Routing或者另一個Processor的Exchange中傳來的消息,並進行處理。
Camel核心包和各個Plugin組件都提供了很多Processor的實現,開發人員也可以通過實現org.apache.camel.Processor接口自定義Processor。

// 一個自定義處理器的實現
public class OtherProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        Message message = exchange.getIn();
        String body = message.getBody().toString();
        //===============
        // 您可以在這里進行數據格式轉換
        // 並且將結果存儲到out message中
        //===============
        // 存入到exchange的out區域
        if(exchange.getPattern() == ExchangePattern.InOut) {
            Message outMessage = exchange.getOut();
            outMessage.setBody(body + " || other out");
        }
    }
}

7,ROUTING

Routing用於處理Endpoint和Processor之間、Processor和Processor之間的路由跳轉。
Camel中支持的路由規則非常豐富,包括基於內容、接收者列表、循環動態路由等。

8,SERVICE

在Apache Camel中有一個比Endpoint、Component、CamelContext等元素更基礎的概念元素:Service。
包括Endpoint、Component、CamelContext等元素在內的大多數工作在Camel中的元素,都是一個一個的Service。
Camel應用程序中的每一個Service都是獨立運行的,各個Service的關聯銜接通過CamelContext上下文對象完成。每一個Service通過調用start()方法被**並參與到Camel應用程序的工作中,直到它的stop()方法被調用。也就是說,每個Service都有獨立的生命周期。

9,CAMELCONTEXT上下文

CamelContext橫跨了Camel服務的整個生命周期,並且為Camel服務的工作環境提供支撐。

三,代碼實踐

1,引入相關Jar包

 		<!-- apache camel -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>2.24.2</version>
        </dependency>
        <!-- apache camel 集成 activemq中間件 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-camel</artifactId>
            <version>5.15.4</version>
        </dependency>

2,傳輸文件到消息件

public static void main(String[] args) throws Exception {
        DefaultCamelContext context = new DefaultCamelContext();
        ActiveMQConnectionFactory connectionFactory =
                new ActiveMQConnectionFactory("admin","admin","http://172.16.2.221:8161");
        context.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("file:input_box?noop=true")
                        .to("activemq:queue:my_queue");
            }
        });
        context.start();
}

3,傳輸對象到消息中間件

public static void main(String[] args) throws Exception {
        DefaultCamelContext context = new DefaultCamelContext();
        ActiveMQConnectionFactory connectionFactory =
                new ActiveMQConnectionFactory("admin","admin","tcp://172.16.2.221:61616");
        context.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start")
                        .to("activemq:queue:my_queue");
            }
        });
        context.start();
        ProducerTemplate producerTemplate = context.createProducerTemplate();
        producerTemplate.sendBody("direct:start","測試消息");
    }

4,生產者和消費者示例

process是一個處理器,可以處理消費者消費消息之前的消息。

public static void main(String[] args) throws Exception {
        DefaultCamelContext context = new DefaultCamelContext();
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start")
                        .process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws Exception {
                                System.out.println("進入消息處理器...");
                                //提供者發送的消息
                                String msg = exchange.getIn().getBody(String.class);
                                msg = msg + "-By FanJiangFeng";
                                System.out.println("消息被我修改成:"+msg);
                                //重新發送
                                exchange.getOut().setBody(msg);
                            }
                        })
                        .to("seda:end");
            }
        });

        context.start();
        //提供者
        ProducerTemplate producerTemplate = context.createProducerTemplate();
        producerTemplate.sendBody("direct:start","Hello Everyone");

        //消費者
        ConsumerTemplate consumerTemplate = context.createConsumerTemplate();
        String message = consumerTemplate.receiveBody("seda:end", String.class);

        System.out.println("消費者取出的消息:"+message);

    }

5,消息生產者生產的數據為數據庫中sql查詢到的數據

需要Jar包

 		<!-- apache camel 集成 數據庫 -->
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-jdbc</artifactId>
            <version>2.22.1</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

測試類

public static void main(String[] args) throws Exception {
        MysqlDataSource dataSource = new MysqlDataSource();
        dataSource.setURL("jdbc:mysql://localhost:3306/test");
        dataSource.setUser("root");
        dataSource.setPassword("1234");

        SimpleRegistry simpleRegistry = new SimpleRegistry();
        simpleRegistry.put("myDataSource",dataSource);

        DefaultCamelContext context = new DefaultCamelContext(simpleRegistry);
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start")
                        .to("jdbc:myDataSource")
                        .bean(new ResultHandlerTest(),"printResult");
            }
        });

        context.start();
        //生產者
        ProducerTemplate producerTemplate = context.createProducerTemplate();
        producerTemplate.sendBody("direct:start","select * from user");
    }

    public static class ResultHandlerTest{
        private void printResult(List list){
            for(int i=0;i<list.size();i++){
                System.out.println(list.get(i));
            }
        }
    }

6,消息生產者發送消息,由某個類的某個方法進行消費消息(第一種方式)

消息生產者發送消息,由某個類的某個方法進行消費消息(此方法必須為public方法,否則報方法找不到的異常)

public class CallMethodUsingClassComponent {

    public void consumerMethod(String message){
        System.out.println("消費者方法接收消息:"+ message);
    }

    public static void main(String[] args) throws Exception{
        DefaultCamelContext context = new DefaultCamelContext();
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start")
                        .to("class:com.ftx.camel.test.CallMethodUsingClassComponent?method=consumerMethod");
            }
        });
        context.start();
        //創建消息生產者
        ProducerTemplate producerTemplate = context.createProducerTemplate();
        producerTemplate.sendBody("direct:start","測試消息");
    }
}

7,消息生產者發送消息,由某個類的某個方法進行消費消息(第二種方式)

此方法必須為public方法,否則報方法找不到的異常

public class CallMethodUsingClassComponent2 {

    public void consumerMethod(String message){
        System.out.println("消費者方法接收消息:"+ message);
    }

    public static void main(String[] args) throws Exception{

        CallMethodUsingClassComponent2 callMethodUsingClassComponent = new CallMethodUsingClassComponent2();
        SimpleRegistry registry = new SimpleRegistry();
        registry.put("myService",callMethodUsingClassComponent);

        DefaultCamelContext context = new DefaultCamelContext(registry);
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start")
                        .to("bean:myService?method=consumerMethod");
            }
        });
        context.start();
        //創建消息生產者
        ProducerTemplate producerTemplate = context.createProducerTemplate();
        producerTemplate.sendBody("direct:start","測試消息");
    }
}

8,消費者從activeMQ中消費消息

public class ActiveMQConsumer {
    public static void main(String[] args) throws Exception {
        DefaultCamelContext context = new DefaultCamelContext();
        ActiveMQConnectionFactory connectionFactory =
                new ActiveMQConnectionFactory("admin","admin","tcp://172.16.2.221:61616");
        context.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("activemq:queue:my_queue").to("seda:end");
            }
        });
        context.start();
        ConsumerTemplate consumerTemplate = context.createConsumerTemplate();
        String message = consumerTemplate.receiveBody("seda:end", String.class);
        System.out.println(message);
    }
}

如有后續,則會及時更新!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM