EventNext
是.net core
下的一個事件驅動的應用框架,通過它代理創建的接口行為都是通過事件驅動的模式進行調用.由於EventNext
的所有調用都是基於事件隊列來進行,所以在資源控制上非常方便;它可以進行多樣性的線程分配,其中Actor
應用就是它的一種基礎實現;在新的版中EventNext
增加了一個新的特性就是線程容器,通過線程容器可以讓N個類的行為在指定線程資源下運行。接來詳細分析這個功能的應用。
線程容器的便利性
平時在多線程應用的時候一般都用線程池或自己定義線程處理,但這些都需要手動去處理和調用。實際有些情況是希望不同實例的所有方法運行在指定線程資源下,但又不想入侵式修改代碼,這樣處理起來就比較復雜了。但EventNext
新版本的線程容器就能解決這種事情,它可以創建一個線程容器並指定那些對象的所有方法都在這個容器里執行,而容器的線程則可以根據實際情況來指定,這種完全透明的線程控制使用起來就非常方便(不過這版本的Actor暫不支持在線程容器中創建)。
接口定義准則
EventNext的所有任務都在由隊列接管,但它實現的隊列和平常的隊列有所不同;EventNext的事件驅動隊要求所有任務都用異步描述,它的特點是由上一個任務的完成來驅動下一下任務,不像同步處理隊列在處理異步的時候不得不做線程等待。為了達到異步任務的要求接口的所有方法必須定義Task作為返回值(暫不支持屬性處理)。
創建線程容器
組件提供一個EventNext.GetThreadContainer
方法來獲取一個線程容器,方法有兩個參數,一個是容器的名稱和對應容器的線程數量;當容器被創建后內部的線程數量不變,具體代碼如下:
var tc = EventCenter.GetThreadContainer("t1");
以上是獲取一個名稱為t1
的線程容器,在不指定線程數的情況下為一個線程處理。接下來就可以通過容器創建相應的接口實例:
var account = tc.Create<IAccount>();
以上是創建一個IAccount
的實例,這個實例不管在多少個線程下調用最終都由t1
這個線程容器去執行處理;每個容器可以創建多個接實例。
IAcouunt的實現
[Service(typeof(IAccount))] public class AccountImpl : IAccount { static AccountImpl() { Redis.Default.Host.AddWriteHost("192.168.2.19"); } public async Task Income(string name, int value) { await Redis.Default.Incrby(name, value); } public async Task<string> Value(string name) { return await Redis.Default.Get<string>(name); } }
以上是針對一個Redis
操作的實現,主要存在IO操作更容易測試到容器在不同線程下的差異。
測試代碼
為了測試容器的效果,使用了不同的方式進行測試,分別是:不用線程容器和使用不同線程數的容器,測試代碼如下:
-
非線程容器
static async Task None(int count) { var account = EventCenter.Create<IAccount>(); List<Task> tasks = new List<Task>(); var now = BeetleX.TimeWatch.GetElapsedMilliseconds(); for(int i=0;i<20;i++) { var t = Task.Run( async ()=> { for (int k = 0; k < count; k++) await account.Income("ken",k); }); tasks.Add(t); } await Task.WhenAll(tasks); var value = await account.Value("ken"); Console.WriteLine($"none use time:{BeetleX.TimeWatch.GetElapsedMilliseconds()-now} ms"); }
-
線程容器
static async Task Threads(int threads, int count) { var tc = EventCenter.GetThreadContainer($"t{threads}",threads); var account = tc.Create<IAccount>(); List<Task> tasks = new List<Task>(); var now = BeetleX.TimeWatch.GetElapsedMilliseconds(); for (int i = 0; i < 20; i++) { var t = Task.Run(async () => { for (int k = 0; k < count; k++) await account.Income("ken", k); }); tasks.Add(t); } await Task.WhenAll(tasks); var value = await account.Value("ken"); Console.WriteLine($"{threads} thread use time:{BeetleX.TimeWatch.GetElapsedMilliseconds() - now} ms"); }
運行代碼
static async void Test() { Console.WriteLine("Warm-Up..."); await None(10); await Threads(1, 10); await Threads(2, 10); await Threads(4, 10); for (int i = 0; i < 4; i++) { Console.WriteLine("Test..."); await None(2000); await Threads(1, 2000); await Threads(2, 2000); await Threads(4, 2000); } }
測試結果
Test...
none use time:1231 ms
1 thread use time:3479 ms
2 thread use time:2025 ms
4 thread use time:1208 ms
Test...
none use time:1246 ms
1 thread use time:3358 ms
2 thread use time:2025 ms
4 thread use time:1179 ms
Test...
none use time:1344 ms
1 thread use time:3385 ms
2 thread use time:1954 ms
4 thread use time:1187 ms
Test...
none use time:1210 ms
1 thread use time:3338 ms
2 thread use time:1933 ms
4 thread use time:1181 ms
默認情況下組件會針對請調用進行一個線程分配,這個分配機制依據隊列的負載情況進行分配調用;線程容器則會根據當前容器的線程數來進行一個平均分配處理,從測試結果可以看到不同線程數下完成所需要的時間。
示例代碼
https://github.com/IKende/BeetleX-Samples