C#版企業總線(ESB)


      C#版企業總線(ESB)設計說明書

   從開源的ESB項目,基本上都是java版本,c#的esb開源項目太少,這對搞c#來說是一大遺憾,所以本人很想對c#開源世界做一份小小的貢獻,因些開始寫一版真正開源的ESB企業開發框架。園子里有興趣的園友也可以一起來做。

 1,ESB架構圖

 

 

2,ESB系統架構要求

Windows Server 2003 R2

Windows Server 2008

 .Net 4.0 Framework

 處理器要求:  最少1 GHz or faster

 內存要求: 最少2 GB RAM 

 

 服務器需要打開的端口:

 Redis服務端口: 6379~6384

 JS, WP7, Java API 端口: 8078

 router and gateway service 端口:8000, 8085, and 8084
websocket 端口:8081 

 

3,ESB配置

  負載配置說明(LoadBalance)

  

4,ESB功能說明

  (1)實現集成權限認證提供安全管理

  (2)支持數據交換方式有:

    WCF(NetTcp),MSSQL,WP7,MSMQ,RabbitMQ,Redis,TCP,WebService,Http,Email 等 

  (3)可插拔的接口

  (4)提供簡單API訪問ESB

  (5)開發者可以通過Javascript APIESB交互

  (6)提供REST 服務與ESB進行交互

 

5,ESB測試結果

 

測試機器
Intel Core 2 Quad Q9000 @2.00GHZ 6GB RAM x64bit

壓力測試結果: StressRequest - Result

Published Message to Bus in 0.2240128 Second(s)
Finished processing(publishing/receiving) 1000 in 0 second(s)

Published Message to Bus in 2.0591178 Second(s)
Finished processing(publishing/receiving) 10000 in 2.4851422 second(s)

Published Message to Bus in 22.8293057 Second(s)
Finished processing(publishing/receiving) 100000 in 23.4173394 second(s)

Published Message to Bus in 236.3805202 Second(s)
Finished processing(publishing/receiving) 1000000 in 237.2985727 second(s)

Published Message to Bus in 2440.2385737 Second(s)
Finished processing(publishing/receiving) 10000000 in 2440.4655866 second(s)

 

6,序列化方案

MessageShark(Open Source Binary Serializer to replace .Net Binary Formatter)

 

7,操作說明

1,首先配置ESB指向地址總線

(1),配置負載地址

//Configure router fail-over addresses in cases where multiple router services exists

ESB.AddRouterAddress("tcp://127.0.0.1:8001");

ESB.AddRouterAddress("tcp://127.0.0.1:8002");

(2),配置默認ESB路由結點:localhost:8000/ESBGateway

 

//Configure default ESB router endpoint to localhost:8000/ESBGateway

ESB.ConfigWithAddress("tcp://127.0.0.1:8000")

 

2,設置服務認證安全與權限 

為了控制誰連接或者調用到ESB服務,你需要實現兩個接口:IAuthenticationProvider和 IAuthorizationProvider 

ESB.Authenticate("username", "password");

下面是實現了這兩個接口的例子,這兩個引用需要引用ServiceBus.Core.dll 

Public class DefaultAuthenticationProvider:IAuthenticationProvider

{

     Public bool Authenticate(string username,string password)

     {

          //添加你的驗證用戶的業務邏輯:例如連接數據庫或者其它 

     }

}

public class DefaultAuthorizationProvider : IAuthorizationProvider 

{

public bool CanEditSubscriber(string username, string subscriberName)

{

     //Do something to verify that user can edit 

     specified subscriber       information

}

}

 

3,注冊一個新的話題(topic)

  什么是Topic(話題)

  一個主題包括名稱,描述和參數。主題名稱的方法.

 Contract Topic :需要嚴格參數要求

 Open Topic: 發布者可以發布任意參數

Dynamic Topic Creation (動態話題創建)

Topic.New("MyTopic")

.Description("My Topic Description")

.AddParameter("Param1", "Param1 Description")

.AddParameter("Param2", "Param2 Description")

.Register();

Class Based Topic Creation (靜態話題創建)

[Description("My topic Description")]

public class MyTopic {

    [Description("Param1 Description")]

public string Param1 { get; set; }

    [Description("Param2 Description")]

public string Param2 {get; set;}

}

Topic.Register<MyTopic>();

4,發布一個消息給一個話題(Topic)

 給一個話題發布消息有兩個方式:你可以發布一個單個的消息,或者你可以發布一個消息集合,你要發布一個消息,你要定義這個消息你想發布到哪個話題,你可以通過選擇是通過名稱或者類型()來查找話題。
Retrieve Topic object by using the type name as the topic name

var topic = Topic.Select<MyTopic>() 

//Retrieve Topic object by using a string based name

var topic = Topic.Select("MyTopic")

發布一個單個消息

There are fours ways to go about publishing a single message.

//Explicitly add the parameter for topic message

topic.SetParameter("Param1", "Value1")

.SetParameter("Param2", "Value2")

.Publish()

//Map an anonymous object or any instance of a class that matches the topic contract

//This option allow you to build up the parameter that would be published from one or more object as needed

topic.Message(new {Param1 = "Value1", Param2 = "Value2"})

.Publish()

//Map with instance of a class

topic.Message(new MyTopic(){ Param1 = "Value1", Param2 = "Value2" })

.Publish();

//Publish using anonymous object

topic.Publish(new {Param1 = "Value1", Param2 = "Value2"})

//Publish using instance of a class

topic.Publish(new MyTopic(){ Param1 = "Value1", Param2 = "Value2" })

//Publish message and infer the specific topic to publish to by using the class name

//This approach does not need the instance of topic object since it is inferring the topic name

Topic.PublishMessage(new MyTopic(){ Param1 = "Value1", Param2 = "Value2"})



給話題發布多個消息體

There are three ways to publish messages in batches

//Publish using a collection of anonymous objects

topic.PublishMany(new List<object>(){ 

new {Param1 = "Value1", Param2 = "Value2"},

new {Param1 = "Value1", Param2 = "Value2"}

} );

//Publish using a collection of instance of a class

topic.PublishMany(new List<MyTopic>(){

new MyTopic(){ Param1 = "Value1", Param2 = "Value2"},

new MyTopic(){ Param1 = "Value1", Param2 = "Value2"}

});

//Publish by infer the topic name from the type constrain of the collection

Topic.PublishMessages(new List<MyTopic>(){

new MyTopic(){ Param1 = "Value1", Param2 = "Value2"},

new MyTopic(){ Param1 = "Value1", Param2 = "Value2"}

});

6,獲取所有的話題

 如果你想知道目前ESB中有多少話題(topics),可以通過下面的方法來獲取,ESB會反回一個集合,包括話題的名稱,描述,參數。

ESB.GetTopics()

7,對一個話題進行消費(subscription)

 要開始接收發布給話題的消息,你要配置參數(filters, transports,topic)

var topicName = "ChatTopic";

var subscriberName = "MySubscriber";

Subscriber.New(subscriberName).Durable(false)

.SubscribeTo(Topic.Select<ChatTopic>().NotEqual("UserName", _userName))

.AddTransport("Tcp", Transport.New<TcpTransport>(

transport => { 

transport.Format = TransportFormat.Json;

transport.IPAddress = "127.0.0.1"; 

transport.Port = port; 

}), topicName)

.Save()


以下解釋一個每個方法的意思:

Create a new instance of subscriber with the name supplied in the New method

Subscriber.New(subscriberName)


如果消息發送失敗,esb會把失敗的消息進行保存,默認情況下,每個消息保存30天不過期

subscriber.SubscribeTo(Topic.Select<ChatTopic>().NotEqual("UserName", _userName))

subscriber.AddTransport("Tcp", Transport.New<TcpTransport>(

transport => { 

transport.Format = TransportFormat.Json;

transport.IPAddress = "127.0.0.1"; 

transport.Port = port; 

}), topicName)

 

subscriber.Save()

為一個消費者(subscriber)在傳輸協議上添加消息監聽

subscriber.OnMessageReceived<ChatTopic>("Tcp", msg => HandleMessage(msg),

            interval: TimeSpan.FromMilliseconds(1),

            errorAction: h => {

                h.Continue = true;

                Console.WriteLine(h.Error.ToString());

            });

方法解釋說明:

8,把消息存儲服務更新到數據庫中


<add name="default" type="ServiceBus.MessageStores.DefaultMessageStore" 

address="Server=.\SQLExpress;Database=servicebus;Trusted_Connection=yes;MultipleActiveResultSets=True"/>

 

創建數據庫,執行下面的SQL語句
Src\ServiceBus.Data.Entities\Scripts\SentMessageStorage.sql

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM