1,Grain持久化目標
①允許不同類型的存儲提供者使用不同類型的存儲提供者(例如,一個使用Azure表,一個使用ADO.NET表),或者使用不同類型的存儲提供者,但具有不同的配置(例如,兩者都使用Azure表, 存儲帳戶#1和一個使用存儲帳戶#2)
②允許配置存儲提供程序實例(例如Dev-Test-Prod),只更改配置文件,不需要更改代碼。
③提供一個框架,以便稍后由Orleans團隊或其他人編寫其他存儲提供程序。
④提供最少量的生產級存儲提供商
⑤存儲提供者可以完全控制他們如何在持久性后台存儲中存儲Grain狀態數據。 推論:Orleans沒有提供全面的ORM存儲解決方案,但允許定制存儲提供商在需要時支持特定的ORM要求。
2,Grain持久化Api
Grain類型可以通過以下兩種方式之一進行聲明:
- 如果它們沒有任何持久狀態,或者它們將自己處理所有的持續狀態,則擴展Grain
- 如果他們有一些他們想要Orleans運行時處理的持續狀態,請擴展Grain <T>。 換句話說,通過擴展Grain <T>,grain類型自動加入到Orleans系統管理的持久化框架中。
對於本節的其余部分,我們將只考慮選項#2 / Grain <T>,因為選項1的Grain將繼續像現在一樣運行而不會有任何行為改變。
3,Grain狀態存儲
從Grain <T>繼承的Grain類(其中T是需要持久化的特定於應用程序的狀態數據類型)將從指定的存儲區自動加載它們的狀態。
Grain將被標記為[StorageProvider]屬性,該屬性指定用於讀取/寫入谷物的狀態數據的存儲提供者的命名實例。
[StorageProvider(ProviderName="store1")] public class MyGrain<MyGrainState> ... { ... }
Orleans提供者管理框架提供了一種機制,指定和注冊倉儲配置文件不同的存儲供應商和存儲選項。
<OrleansConfiguration xmlns="urn:orleans"> <Globals> <StorageProviders> <Provider Type="Orleans.Storage.MemoryStorage" Name="DevStore" /> <Provider Type="Orleans.Storage.AzureTableStorage" Name="store1" DataConnectionString="DefaultEndpointsProtocol=https;AccountName=data1;AccountKey=SOMETHING1" /> <Provider Type="Orleans.Storage.AzureBlobStorage" Name="store2" DataConnectionString="DefaultEndpointsProtocol=https;AccountName=data2;AccountKey=SOMETHING2" /> </StorageProviders>
4,配置存儲提供程序
①AzureTableStorage
<Provider Type="Orleans.Storage.AzureTableStorage" Name="TableStore" DataConnectionString="UseDevelopmentStorage=true" />
以下屬性可以添加到<Provider />元素來配置提供程序:
DataConnectionString="..."(必需) - 要使用的Azure存儲連接字符串
TableName="OrleansGrainState"
(可選) - 表格存儲中使用的表格名稱,默認為OrleansGrainStateDeleteStateOnClear="false"
(可選) - 如果為true,則在grain狀態被清除時記錄將被刪除,否則將寫入空記錄,默認為falseUseJsonFormat="false"
(可選) - 如果為true,則使用json序列化程序,否則將使用Orleans二進制序列化程序,默認為falseUseFullAssemblyNames="false"(可選) - (如果UseJsonFormat =“true”)序列化具有完整程序集名稱(true)或簡單(false)的類型,默認為false
IndentJSON="false"(可選) - (如果UseJsonFormat =“true”)縮進序列化的json,默認為false
注意:狀態不應超過64KB,由表存儲限制。
②AzureBlobStorage
<Provider Type="Orleans.Storage.AzureTableStorage" Name="BlobStore" DataConnectionString="UseDevelopmentStorage=true" />
以下屬性可以添加到<Provider />元素來配置提供程序:
DataConnectionString="..."
(必需) - 要使用的Azure存儲連接字符串ContainerName="grainstate"
(可選) - 要使用的Blob存儲容器,默認為grainstateUseFullAssemblyNames="false"
(可選) - 使用完整程序集名稱(true)或簡單(false)序列化類型,默認為falseIndentJSON="false"
(可選) - 縮進序列化的json,默認為false
③DynamoDBStorageProvider
<Provider Type="Orleans.Storage.DynamoDBStorageProvider" Name="DDBStore" DataConnectionString="Service=us-wes-1;AccessKey=MY_ACCESS_KEY;SecretKey=MY_SECRET_KEY;" />
DataConnectionString="..."(必需) - 要使用的DynamoDB存儲連接字符串。 您可以在其中設置Service,AccessKey,SecretKey,ReadCapacityUnits和WriteCapacityUnits。
TableName="OrleansGrainState"
(可選) - 表格存儲中使用的表格名稱,默認為OrleansGrainStateDeleteStateOnClear="false"
(可選) - 如果為true,則在grain狀態被清除時記錄將被刪除,否則將寫入空記錄,默認為falseUseJsonFormat="false"
(可選) - 如果為true,則使用json序列化程序,否則將使用Orleans二進制序列化程序,默認為falseUseFullAssemblyNames="false"
(可選) - (如果UseJsonFormat =“true”)序列化具有完整程序集名稱(true)或簡單(false)的類型,默認為falseIndentJSON="false"
(可選) - (如果UseJsonFormat =“true”)縮進序列化的json,默認為false
④ADO.NET Storage Provider (SQL Storage Provider)
ADO .NET存儲提供程序允許您在關系數據庫中存儲grain狀態。 目前支持以下數據庫:
- SQL Server
- MySQL/MariaDB
- PostgreSQL
- Oracle
首先,安裝基礎包: Install-Package Microsoft.Orleans.OrleansSqlUtils
在與您的項目一起安裝軟件包的文件夾下,可以找到支持數據庫供應商的不同SQL腳本。 你也可以從OrleansSQLUtils倉庫獲取它們。 創建一個數據庫,然后運行適當的腳本來創建表。
接下來的步驟是安裝第二個NuGet軟件包(請參閱下表),並根據需要安裝數據庫供應商,並以編程方式或通過XML配置來配置存儲提供程序。
Database | Script | NuGet Package | AdoInvariant | Remarks |
---|---|---|---|---|
SQL Server | CreateOrleansTables_SQLServer.sql | System.Data.SqlClient | System.Data.SqlClient | |
MySQL / MariaDB | CreateOrleansTables_MySQL.sql | MySql.Data | MySql.Data.MySqlClient | |
PostgreSQL | CreateOrleansTables_PostgreSQL.sql | Npgsql | Npgsql | |
Oracle | CreateOrleansTables_Oracle.sql | ODP.net | Oracle.DataAccess.Client | No .net Core support |
以下是如何使用XML配置來配置ADO .NET存儲提供程序的示例:
<OrleansConfiguration xmlns="urn:orleans"> <Globals> <StorageProviders> <Provider Type="Orleans.Storage.AdoNetStorageProvider" Name="OrleansStorage" AdoInvariant="<AdoInvariant>" DataConnectionString="<ConnectionString>" UseJsonFormat="true" /> </StorageProviders> </Globals> </OrleansConfiguration>
在代碼中,你需要像下面這樣的東西:
var properties = new Dictionary<string, string>() { ["AdoInvariant"] = "<AdoInvariant>", ["DataConnectionString"] = "<ConnectionString>", ["UseJsonFormat"] = "true" }; config.Globals.RegisterStorageProvider<AdoNetStorageProvider>("OrleansStorage", properties);
本質上,您只需要設置數據庫供應商特定的連接字符串和標識供應商的AdoInvariant(參見上表)。 您也可以選擇保存數據的格式,可以是二進制(默認),JSON或XML。 雖然二進制是最緊湊的選項,但它是不透明的,你將無法讀取或處理數據。 JSON是推薦的選項。
您可以設置以下屬性:
Name | Type | Description |
---|---|---|
Name | String | 持久性Grain將用於引用這個存儲提供程序的任意名稱 |
Type | String | 設置為 Orleans.Storage.AdoNetStorageProvider |
AdoInvariant | String | 標識數據庫供應商(請參閱上表中的值;默認是System.Data.SqlClient) |
DataConnectionString | String | 供應商特定的數據庫連接字符串(必需) |
UseJsonFormat | Boolean | 使用JSON格式(推薦) |
UseXmlFormat | Boolean | 使用XML格式 |
UseBinaryFormat | Boolean | 使用緊湊的二進制格式(默認) |
StorageProviders示例提供了一些代碼,您可以使用它們來快速測試以上內容,並展示一些自定義存儲提供程序。 在軟件包管理器控制台中使用以下命令將所有的Orleans軟件包更新到最新版本:
Get-Package | where Id -like 'Microsoft.Orleans.*' | foreach { update-package $_.Id }
ADO.NET持久性具有對數據進行版本化的功能,並可以使用任意應用程序規則和流來定義任意(de)序列化程序,但目前沒有辦法將它們公開給應用程序代碼。
⑤MemoryStorage
MemoryStorage是一個簡單的存儲提供者,它並不真正使用下面的持久數據存儲。 了解如何快速與存儲提供商合作很方便,但不打算在實際情況下使用。
注意:這個提供者將狀態持久化到不穩定的內存中,該內存在倉儲關閉時被刪除。只使用進行測試。
使用XML配置設置內存存儲提供程序:
<?xml version="1.0" encoding="utf-8"?> <OrleansConfiguration xmlns="urn:orleans"> <Globals> <StorageProviders> <Provider Type="Orleans.Storage.MemoryStorage" Name="OrleansStorage" NumStorageGrains="10" /> </StorageProviders> </Globals> </OrleansConfiguration>
要在代碼中設置它:
siloHost.Config.Globals.RegisterStorageProvider<MemoryStorage>("OrleansStorage");
您可以設置以下屬性:
Name | Type | Description |
---|---|---|
Name | String | 持久性Grain將用於引用這個存儲提供程序的任意名稱 |
Type | String | 設置為 Orleans.Storage.MemoryStorage |
NumStorageGrains | Integer | 用來存儲狀態的Grain數量,默認為10 |
⑥ShardedStorageProvider
<Provider Type="Orleans.Storage.ShardedStorageProvider" Name="ShardedStorage"> <Provider /> <Provider /> <Provider /> </Provider>
簡單的存儲提供程序,用於編寫多個其他存儲提供程序共享的Grain狀態數據。
一致的散列函數(默認是Jenkins Hash)用於決定哪個碎片(按照它們在配置文件中定義的順序)負責存儲指定Grain的狀態數據,然后將讀/寫/清除請求橋接 到合適的底層提供者執行。
⑦存儲提供者的注意事項
如果沒有為Grain <T> grain類指定[StorageProvider]屬性,則會搜索名為Default的提供程序。 如果沒有找到,則將其視為缺少的存儲提供者。
如果在倉儲配置文件中只有一個提供者,它將被視為這個倉儲的默認提供者。
使用存儲提供程序的Grain(Grain裝載時不存在並定義在Grain配置中)將無法加載,但是倉儲里的其他Grain仍然可以裝載和運行。Orleans之后的任何一種Grain類型的調用都將失敗。指定未加載Grain類型的Storage.BadProviderConfigException錯誤。
用於給定Grain類型的存儲提供程序實例由該Grain類型的[StorageProvider]屬性中定義的存儲提供程序名稱,加上倉儲配置中定義的提供者的提供者類型和配置選項。
不同的Grain類型可以使用不同的配置存儲提供程序,即使它們是相同的類型:例如,兩個不同的Azure表存儲提供程序實例連接到不同的Azure存儲帳戶(請參閱上面的配置文件示例)。
存儲提供程序的所有配置詳細信息是在倉儲啟動時讀取的倉儲配置中靜態定義的。 目前沒有提供機制來動態更新或更改倉儲所使用的存儲提供商列表。 但是,這是一個優先/工作量約束,而不是一個基本的設計約束。
5,狀態存儲API
對於Grain 狀態/持久性api有兩個主要部分:grainto - runtime和runtimeto - storage - provider。
6,Grain狀態存儲API
Orleans Runtime中的Grain狀態存儲功能將提供讀寫操作,以自動填充/保存該Grain的GrainState數據對象。 在內部,這些功能將被連接到通過配置為該Grain適當持久性提供(由Orleans 客戶端根工具生成的代碼中)。
7,Grain狀態讀/寫功能
當Grain被激活時,Grain狀態將自動被讀取,但是Grain負責明確地觸發任何改變的Grain狀態的寫入。 有關錯誤處理機制的詳細信息,請參見下面的失敗模式部分。
在為該激活調用OnActivateAsync()方法之前,將自動讀取GrainState(使用base.ReadStateAsync()的等效項)。 在任何方法調用Grain之前,Grain狀態不會被刷新,除非這個Grain被激活了。
在任何grain方法調用期間,grain可以請求Orleans運行時通過調用base.WriteStateAsync()將該激活的當前grain狀態數據寫入指定的存儲提供者。 grain是負責執行明確寫操作時,他們做出顯著更新他們的狀態數據。 最常見的是,grain方法將返回base.WriteStateAsync()任務作為從grain方法返回的最終結果Task,但不要求遵循此模式。 在任何grain方法之后,運行時不會自動更新存儲的糧食狀態。
在grain中的grain方法或定時器回調處理程序期間,grain可以通過調用base.ReadStateAsync()來請求Orleans運行時從指定的存儲提供程序重新讀取當前的grain狀態數據。 這將使用從持久性存儲中讀取的最新值完全覆蓋當前存儲在Grain狀態對象中的當前狀態數據。
不透明的特定於提供者的Etag值(字符串)可能由存儲提供程序設置,作為在讀取狀態時填充的Grain狀態元數據的一部分。如果不使用Etags,一些提供者可能會選擇將其保留為null。
從概念上講,在任何寫操作中,奧爾良運行時都將對grain狀態數據對象進行深入的復制。在覆蓋范圍內,運行時可以使用優化規則和啟發式來避免在某些情況下執行某些或全部的深度拷貝,前提是保留預期的邏輯隔離語義。
8,Grain狀態讀取/寫入操作的示例代碼
Grain必須擴展Grain< T >類,以參與Orleans Grain狀態的持久性機制。以上定義中的T將被一種特定於應用的Grain狀態類所取代;看下面的例子。
grain類還應該用一個[StorageProvider]屬性進行注釋,該屬性告訴運行時哪個存儲提供者(實例)與這種類型的Grain一起使用。
public class MyGrainState { public int Field1 { get; set; } public string Field2 { get; set; } } [StorageProvider(ProviderName="store1")] public class MyPersistenceGrain : Grain<MyGrainState>, IMyPersistenceGrain { ... }
9,Grain狀態讀取
在Grain的OnActivateAsync()方法被調用之前,Grain狀態的初始讀取將由Orleans運行時自動發生;不需要應用程序代碼來實現這一點,從那時起,Grain的狀態將通過Grain<T>.State屬性獲取
10,Grain狀態寫入
在對grain的內存狀態進行任何適當的更改之后,grain應該調用base.WriteStateAsync()方法通過定義的存儲提供程序將這些更改寫入到持久存儲中。 此方法是異步的,並返回一個通常由grain方法作為其自己的完成任務返回的Task。
public Task DoWrite(int val) { State.Field1 = val; return base.WriteStateAsync(); }
11, Grain狀態刷新
如果Grain希望明確地從后備存儲中重新讀取這個Grain的最新狀態,Grain應該調用base.ReadStateAsync()方法。這將從持久性存儲中重新加載Grain狀態,通過為這種Grain類型定義的存儲提供程序,並且在ReadStateAsync()任務完成時,將覆蓋任何先前的Grain狀態的內存副本。
public async Task<int> DoRead() { await base.ReadStateAsync(); return State.Field1; }
12,Grain狀態持久性操作的失敗模式
①Grain狀態讀取操作的失敗模式
存儲提供者在初始讀取該特定Grain的狀態數據期間返回的故障將導致該Grain的激活操作失敗;在這種情況下,將不會有任何的調用
OnActivateAsync()生命周期回調方法。對引起激活的Grain的原始請求將會被錯誤地反饋給調用者,就像在Grain激活過程中其他的故障一樣。存儲提供程序遇到的錯誤讀取特定Grain的狀態數據將導致ReadStateAsync()任務出錯。就像Orleans的其他任務一樣,Grain可以選擇處理或忽略這一斷層任務
任何試圖發送一個消息到未能在筒倉中加載的Grain會拋Orleans.BadProviderConfigException錯誤。
②Grain狀態寫入操作的失敗模式
存儲提供程序遇到寫入特定Grain的狀態數據時遇到的故障將導致WriteStateAsync()任務出現故障。 通常情況下,如果WriteStateAsync()任務被正確鏈接到這個grain方法的最終返回Task中,這將意味着grain調用會返回給客戶端調用者。 但是,某些高級方案可能會編寫grain代碼來專門處理這種寫入錯誤,就像他們可以處理任何其他故障的Task一樣。
執行錯誤處理/恢復代碼的Grain必須捕獲異常/故障的WriteStateAsync()任務,而不是重新拋出以表示他們已成功處理寫入錯誤。
13,存儲提供商框架
有一個服務提供者API用於編寫額外的持久性提供者 - IStorageProvider。
持久性提供程序API涵蓋GrainState數據的讀取和寫入操作。
public interface IStorageProvider { Logger Log { get; } Task Init(); Task Close(); Task ReadStateAsync(string grainType, GrainReference grainReference, IGrainState grainState); Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState); }
14,存儲提供程序語義
當存儲提供程序檢測到Etag約束違規時,任何嘗試執行寫入操作都應該導致寫入任務出現瞬態錯誤Orleans.InconsistentStateException,並封裝基礎存儲異常。
public class InconsistentStateException : AggregateException { /// <summary>Etag值當前持久存儲。</summary> public string StoredEtag { get; private set; } /// <summary>Etag值目前保存在內存中,並試圖更新。</summary> public string CurrentEtag { get; private set; } public InconsistentStateException( string errorMsg, string storedEtag, string currentEtag, Exception storageException ) : base(errorMsg, storageException) { this.StoredEtag = storedEtag; this.CurrentEtag = currentEtag; } public InconsistentStateException(string storedEtag, string currentEtag, Exception storageException) : this(storageException.Message, storedEtag, currentEtag, storageException) { } }
來自寫入操作的任何其他故障情況都應該導致寫入任務被破壞,並且包含基礎存儲異常的異常。
15,數據映射
單獨的存儲提供商應該決定如何最好地存儲Grain狀態blob(各種格式/序列化的形式)或列每場是明顯的選擇。
Azure Table的基本存儲提供程序使用Orleans二進制序列化將狀態數據字段編碼為單個表列。
16,ADO.NET持久性原理
ADO.NET支持的持久性存儲的原則是:
- 在數據,數據和代碼格式不斷發展的同時,保持業務關鍵型數據的安全。
- 利用供應商和存儲特定的功能。
實際上,這意味着堅持ADO.NET implementation goals,並在ADO.NET特定的存儲提供程序中添加一些實現邏輯,允許演變存儲中的數據形狀。
除了通常的存儲提供者功能外,ADO.NET提供者還具有內置的功能
- 在往返狀態時將存儲數據格式從一種格式更改為另一種格式(例如,從JSON格式轉換為二進制格式)。
- 以任意方式對存儲的類型進行整形或從存儲中讀取。 這有助於演變版本狀態。
- 將數據流出數據庫。
兩個1和2。可用於任意選擇參數,如Grain ID、Grain Type、payload data等。
發生這種情況以便人們選擇一種格式,例如 簡單的二進制編碼(SBE)並實現IStorageDeserializer和IStorageSerializer。 內置的(de)序列化器是使用這種方法構建的。 OrleansStorageDefault(De)序列化程序可以用作如何實現其他格式的示例。
當(de)序列化器已經實現時,他們需要將ba添加到AdoNetStorageProvider的StorageSerializationPicker屬性中。 這是IStorageSerializationPicker的一個實現。 默認情況下將使用StorageSerializationPicker。 在RelationalStorageTests中可以看到更改數據存儲格式或使用(de)序列化器的示例。
目前沒有任何方法可以將這個信息暴露給Orleans應用程序,因為沒有方法可以訪問創建的AdoNetStorageProvider實例的框架。
Orleans運行時提供了兩種稱為定時器和提醒的機制,使開發人員能夠指定谷物的周期性行為。
1,定時器
①描述
定時器用於創建不需要跨越多個激活(Grain實例化)的周期性Grain行為。 它與標准的.NET System.Threading.Timer類基本相同。 另外,它在運行的Grain激活內受單線程執行保證。
每個激活可能有零個或多個與之關聯的定時器。 運行時在與其關聯的激活的運行時上下文中執行每個定時器例程。
②用法
要啟動計時器,請使用Grain.RegisterTimer方法,該方法返回一個IDisposable引用:
protected IDisposable RegisterTimer(Func<object, Task> asyncCallback, object state, TimeSpan dueTime, TimeSpan period)
- asyncCallback是當計時器計時時調用的函數。
- state是一個對象,當計時器計時時,它將被傳遞給asyncCallback。
- dueTime指定在發出第一個計時器之前等待的時間量。
-
period 指定計時器的周期。
取消定時器。
如果激活被停用或發生故障或發生故障,計時器將停止觸發。
重要的注意事項 -
啟用激活集合時,計時器回調的執行不會將激活狀態從空閑狀態更改為使用。這意味着計時器不能用於延遲其他空閑激活的停用。
- 傳遞給Grain.RegisterTimer的時間是,從asyncCallback返回的任務到下一次調用asyncCallback時所經過的時間。這不僅使連續調用asyncCallback無法重疊,還使得異步回調的時間長度影響到異步調用的頻率。這與system.thread.timer的語義有很大的偏差。
- 每次調用asyncCallback都會在一個單獨的回合中傳遞給一個激活,並且永遠不會與其他回合在同一個激活中同時運行。 但是,請注意,asyncCallback調用不作為消息傳遞,因此不受消息交錯語義限制。 這意味着asyncCallback的調用應該被認為是像對其他消息運行在一個可重入的Grain上一樣。
2,提醒
①描述
提醒與定時器類似,但有一些重要的區別:
- 除非明確取消,否則提醒將保持不變,並會在所有情況下繼續觸發(包括部分或全部群集重新啟動)。
- 提醒與Grain有關,而不是任何特定的激活。
- 如果Grain有沒有與之相關的激活和提醒蜱,一個將被創建。例如:如果激活變為空閑並且被停用,則與相同Grain相關聯的提醒將在接下來ticks時重新激活Grain。
- 提醒是通過消息傳遞的,並且與所有其他的grain方法一樣,都受到相同的交錯語義的影響。
- 提醒不應該用於高頻計時器——它們的周期應該以分鍾、小時或天數來衡量。
②配置
提醒,持久,依賴存儲功能。在提醒子系統將運行之前,您必須指定使用哪種存儲支持。提醒功能由服務器端配置中的SystemStore元素控制。它使用Azure表或SQL Server作為存儲。
<SystemStore SystemStoreType="AzureTable" /> OR <SystemStore SystemStoreType="SqlServer" />
如果您只是想要一個占位符實現的提醒,而不需要設置一個Azure帳戶或SQL數據庫,然后將這個元素添加到配置文件(在“Globals”下)將會給你一個提醒系統的開發實現:
<ReminderService ReminderServiceType="ReminderTableGrain"/>
③使用
使用提醒Grain必須實現IRemindable.RecieveReminder方法。
Task IRemindable.ReceiveReminder(string reminderName, TickStatus status) { Console.WriteLine("感謝提醒我 - 我差點忘了!"); return TaskDone.Done; }
要開始提醒,請使用Grain.RegisterOrUpdateReminder方法,該方法返回一個IOrleansReminder對象:
protected Task<IOrleansReminder> RegisterOrUpdateReminder(string reminderName, TimeSpan dueTime, TimeSpan period)
- reminderName是一個字符串,必須在上下文的范圍內唯一標識提醒。
- dueTime指定在發出第一個計時器tick之前等待的時間量。
- period指定定時器的周期。
由於提醒在任何一次激活的生命周期中都能夠存活,因此必須明確取消(而不是被處置)。 您可以通過調用Grain.UnregisterReminder來取消提醒:
protected Task UnregisterReminder(IOrleansReminder reminder)
提醒是由Grain.RegisterOrUpdateReminder返回的處理對象。
IOrleansReminder的實例不能保證超過激活的有效期。 如果您希望以某種方式識別提醒,請使用包含提醒名稱的字符串。
如果您只有提醒的名稱並需要IOrleansReminder的相應實例,請調用Grain.GetReminder方法:
protected Task<IOrleansReminder> GetReminder(string reminderName)
3,我應該使用哪個?
我們建議您在以下情況下使用計時器:
- 如果激活被停用或發生故障,則定時器停止工作並不重要(或不期望)。
- 如果計時器的頻率很小(例如幾秒鍾或幾分鍾)
- 定時器回調可以從Grain.OnActivateAsync啟動,或者在調用grain方法時啟動。
我們建議您在以下情況下使用提醒:
- 當周期性行為需要經受激活和任何失敗。
- 執行不頻繁的任務(例如在幾分鍾,幾小時或幾天內合理表達)。
結合計時器和提醒您可以考慮使用提醒和定時器的組合來實現您的目標。 例如,如果您需要一個需要在激活狀態下保留的小頻率的計時器,則可以使用每5分鍾運行一次的提醒,其目的是喚醒一個可重新啟動本地計時器的計時器,該計時器可能由於 停用。
1,什么是依賴注入
依賴注入(DI)是一種軟件設計模式,它實現了解決依賴關系的控制反轉。
Orleans正在使用由ASP.NET Core開發人員編寫的抽象概念。
2,Orleans中的DI
目前僅在Orleans的服務器端支持依賴注入。
Orleans可以將依賴關系注入到Grains應用程序中。
然而Orleans支持每個容器依賴的注入機制,其中最常用的方法是構造器注入。
理論上,任何類型都可以在Silo啟動期間注冊在IServiceCollection中。
注:由於新Orleans是不斷發展的,因為目前的計划,將有可能利用其他應用類的依賴注入,以及像StreamProviders。
3,配置DI
DI配置是一個全局配置值,必須在此配置。
Orleans使用與ASP.NET Core類似的方法來配置DI。 您的應用程序中必須包含一個啟動類,其中必須包含ConfigureServices方法。 它必須返回一個類型為IServiceProvider的對象實例。
通過下面介紹的方法之一來指定啟動類的類型來完成配置。
注意:以前的DI配置是在集群節點級別指定的,在最近的版本中進行了更改。
4,從代碼配置
可以通過基於代碼的配置告訴Orleans您喜歡使用什么Startup類型。。 在ClusterConfiguration類中有一個名為UseStartup的擴展方法,您可以使用它進行此操作。
var configuration = new ClusterConfiguration(); configuration.UseStartupType<MyApplication.Configuration.MyStartup>();
5,通過XML配置
要使用Orleans注冊Startup類,必須將Startup元素添加到Defaults部分,並在Type屬性中指定該類型的程序集限定名稱。
<?xml version="1.0" encoding="utf-8" ?> <tns:OrleansConfiguration xmlns:tns="urn:orleans"> <tns:Defaults> <tns:Startup Type="MyApplication.Configuration.Startup,MyApplication" /> </tns:Defaults> </tns:OrleansConfiguration>
6,例子
這是一個完整的Startup類示例:
namespace MyApplication.Configuration { public class MyStartup { public IServiceProvider ConfigureServices(IServiceCollection services) { services.AddSingleton<IInjectedService, InjectedService>(); return services.BuildServiceProvider(); } } }
這個例子顯示了Grain如何通過構造函數注入來利用IInjectedService,以及注入服務的完整聲明和實現:
public interface ISimpleDIGrain : IGrainWithIntegerKey { Task<long> GetTicksFromService(); } public class SimpleDIGrain : Grain, ISimpleDIGrain { private readonly IInjectedService injectedService; public SimpleDIGrain(IInjectedService injectedService) { this.injectedService = injectedService; } public Task<long> GetTicksFromService() { return injectedService.GetTicks(); } } public interface IInjectedService { Task<long> GetTicks(); } public class InjectedService : IInjectedService { public Task<long> GetTicks() { return Task.FromResult(DateTime.UtcNow.Ticks); } }
7,測試框架集成
與真正的測試框架結合在一起驗證代碼的正確性。
你需要做兩件事,設置DI和測試。首先,您需要實現服務的模擬。這是在我們的例子中使用Moq來完成的,Moq是.net的一個流行的mocking框架。這里有一個對服務進行mocking的例子。
public class MockServices { public IServiceProvider ConfigureServices(IServiceCollection services) { var mockInjectedService = new Mock<IInjectedService>(); mockInjectedService.Setup(t => t.GetTicks()).Returns(knownDateTime); services.AddSingleton<IInjectedService>(mockInjectedService.Object); return services.BuildServiceProvider(); } }
要將這些服務包含在您的測試倉庫中,您需要指定MockServices作為倉儲啟動類。 這是做這個的一個例子。
[TestClass] public class IInjectedServiceTests: TestingSiloHost { private static TestingSiloHost host; [TestInitialize] public void Setup() { if (host == null) { host = new TestingSiloHost( new TestingSiloOptions { StartSecondary = false, AdjustConfig = clusterConfig => { clusterConfig.UseStartupType<MockServices>(); } }); } } }
有些情況下,簡單的消息/響應模式不夠,客戶端需要接收異步通知。 例如,用戶可能希望在朋友發布新的即時消息時得到通知。
客戶端觀察者是一種允許異步通知客戶端的機制。 觀察者是從IGrainObserver繼承的單向異步接口,它的所有方法必須是無效的。 Grain通過調用Grain接口方法來發送通知給觀察者,除了它沒有返回值,Grain不需要依賴結果。 Orleans運行時將確保通知的單向傳遞。 發布此類通知的Grain應提供API以添加或刪除觀察者。 另外,公開允許取消現有訂閱的方法通常是方便的。 Grain開發人員可以使用 Orleans ObserverSubscriptionManager <T>泛型類來簡化觀察到的Grain類型的開發。
要訂閱通知,客戶端必須首先創建一個實現觀察者接口的本地C#對象。 然后調用觀察者工廠的一個靜態方法CreateObjectReference(),將C#對象變成一個Grain引用,然后可以將它傳遞給通知Grain上的訂閱方法。
其他Grain也可以使用此模型來接收異步通知。 與客戶端訂閱的情況不同,訂閱的grain只是將觀察者接口作為一個方面來實現,並傳入一個對自身的引用(例如this.AsReference <IMyGrainObserverInterface>)。
1,代碼示例
我們假設我們有一個周期性地向客戶發送消息的Grain。 為了簡單起見,我們示例中的消息將是一個字符串。 我們首先定義將接收消息的客戶端上的接口。
界面將如下所示:
public interface IChat : IGrainObserver { void ReceiveMessage(string message); }
唯一特別的是,該接口應該從IGrainObserver繼承。 現在任何想要觀察這些消息的客戶端都應該實現一個實現IChat的類。
最簡單的情況是這樣的:
public class Chat : IChat { public void ReceiveMessage(string message) { Console.WriteLine(message); } }
現在在服務器上,我們應該有一個Grain發送這些聊天消息到客戶端。 Grain也應該有一個機制,客戶訂閱和退訂自己接收通知。 對於訂閱,Grain可以使用實用工具類ObserverSubscriptionManager:
class HelloGrain : Grain, IHello { private ObserverSubscriptionManager<IChat> _subsManager; public override async Task OnActivateAsync() { // 我們在激活時創建了這個工具。 _subsManager = new ObserverSubscriptionManager<IChat>(); await base.OnActivateAsync(); } //客戶端調用此訂閱 public Task Subscribe(IChat observer) { _subsManager.Subscribe(observer); return TaskDone.Done; } //此外,客戶端使用這個取消訂閱自己不再接收消息。 public Task UnSubscribe(IChat observer) { _subsManager.Unsubscribe(observer); return TaskDone.Done; } }
要將消息發送到客戶端,可以使用ObserverSubscriptionManager <IChat>實例的通知方法。 該方法采用Action <T>方法或lambda表達式(其中T是IChat類型)。 您可以調用接口上的任何方法將其發送給客戶端。 在我們的例子中,我們只有一個方法ReceiveMessage,我們在服務器上的發送代碼如下所示:
public Task SendUpdateMessage(string message) { _subsManager.Notify(s => s.ReceiveMessage(message)); return TaskDone.Done; }
現在我們的服務器有一個向觀察者客戶端發送消息的方法,訂閱/取消訂閱的方法有兩種,客戶端實現了一個類來觀察這些消息。 最后一步是使用我們以前實現的Chat類在客戶端上創建一個觀察者引用,並在訂閱它之后讓它接收消息。
代碼看起來像這樣:
//首先創建Grain引用 var friend = GrainClient.GrainFactory.GetGrain<IHello>(0); Chat c = new Chat(); //創建可用於訂閱observable grain的引用。 var obj = await GrainClient.GrainFactory.CreateObjectReference<IChat>(c); //訂閱該實例以接收消息。 await friend.Subscribe(obj);
現在,當服務器上的Grain調用SendUpdateMessage方法時,所有訂閱的客戶端都將收到消息。 在我們的客戶端代碼中,變量c中的Chat實例將接收消息並將其輸出到控制台。
注意:傳遞給CreateObjectReference的對象通過WeakReference <T>被保存,因此如果不存在其他引用,將被垃圾回收。 用戶應該為每個觀察者保留一個他們不想被收集的參考。
注意:觀察者本質上是不可靠的,因為您沒有得到任何回應,知道是否由於分布式系統中可能出現的任何情況而收到並處理了消息或者僅僅是失敗了。 因為你的觀察者應該定期輪詢Grain,或者使用其他機制來確保他們收到了他們應該收到的所有信息。在某些情況下,你可以損失一些信息,你不需要任何額外的機制,但是如果你需要確保所有的觀察者都接收到這些信息並且接收所有的信息,定期的重新訂閱和輪詢觀察Grain,可以幫助確保最終處理所有消息。
默認情況下,Orleans運行時只會在集群內創建一個Grain的激活。 這是虛擬主角模型的最直觀的表達方式,每個Grain對應於具有唯一類型/標識的實體。 但是,也有一些情況是應用程序需要執行功能無狀態的操作,而這些操作不會綁定到系統中的特定實體。 例如,如果客戶端發送帶有壓縮有效載荷的請求,並在它們能夠被路由到目標Grain進行處理之前需要被解壓縮,則這樣的解壓縮/路由邏輯不綁定到應用中的特定實體,並且可以容易地向外擴展。
當[StatelessWorker]特性應用於Grain類時,它向Orleans運行時指示該類的Grain應被視為無狀態工作者Grain。 無狀態工作者Grain具有以下特性,使其執行與普通Grain類別的執行有很大不同。
- Orleans運行時可以並且將在集群的不同倉儲上創建無狀態工作者Grain的多個激活。
- 對無狀態工作者Grain的請求總是在當地執行,也就是在請求發起的同一個倉儲里,要么由Grain運行,要么由倉儲的客戶端網關接收。 因此,從其他Grain或客戶網關調用無狀態工作者Grain從不會產生遠程信息。
- Orleans運行時自動創建一個無狀態工作者Grain額外的激活,如果現有的忙。 除非可選的maxLocalWorkers參數明確指定,否則運行時創建的無狀態工作器的最大激活次數默認受機器上CPU內核數量的限制。
- 由於2和3,無狀態工作者Grain激活並不是單個可尋址的。對無狀態工作者Grain的兩個后續請求可以通過不同的激活來處理。
無狀態工作者Grain提供了一個直接的方式,創建一個自動管理的Grain激活池,根據實際負載自動擴展和縮減。運行時總是以相同的順序掃描可用的無狀態工作者Grain激活。因此,它總是將請求發送到它可以找到的第一個空閑本地激活,並且如果以前的所有激活都忙,則只能到最后一個激活。如果所有的激活都很忙,並且沒有達到激活限制,它會在列表的末尾再創建一個激活,並將請求發送給它。這意味着,當對無狀態工作者Grain需求量增加,而且現有的激活當前都很忙時,運行時將其激活池擴大到極限。相反,當負載下降,並且可以通過少量無狀態工作者Grain的激活來處理時,在列表尾部的激活將不會被發送到他們的請求。他們將變得閑置,並最終被標准的激活收集過程停用。因此,激活池將最終縮小以匹配負載。
以下示例使用默認的最大激活數限制定義無狀態工作者谷物類MyStatelessWorkerGrain。
[StatelessWorker] public class MyStatelessWorkerGrain : Grain, IMyStatelessWorkerGrain { ... }
調用無狀態工作者Grain和其他Grain一樣。 唯一的區別是,在大多數情況下,使用一個單一的GrainID,0或Guid.Empty。 具有多個無狀態工作者Grain池時,可以使用多個GrainID,每個ID需要一個Grain池。
var worker = GrainFactory.GetGrain<IMyStatelessWorkerGrain>(0); await worker.Process(args);
這個定義了一個無狀態的工作者Grain類每個倉儲不超過一個Grain激活。
[StatelessWorker(1)] // max 1 activation per silo public class MyLonelyWorkerGrain : ILonelyWorkerGrain { ... }
請注意,[StatelessWorker]屬性不會改變目標Grain類的重入性。 就像任何其他Grain一樣,無狀態工作者Grain默認情況下是不可重入的。 可以通過向Grain類添加一個[Reentrant]屬性來明確地重新定位它們。
可重入(reentrant)函數可以由多於一個任務並發使用,而不必擔心數據錯誤。相反,不可重入(non-reentrant)函數不能由超過一個任務所共享
1,State
“Stateless”部分的“Stateless Worker”部分並不意味着無狀態的工作者不能擁有狀態,並且僅限於執行功能操作。和其他Grain一樣,無狀態的工人Grain可以裝載並保存它需要的任何狀態。這只是因為在同一個和不同的集群上創建了一個無狀態的工作者Grain的多個激活,沒有簡單的機制來協調不同激活狀態所持有的狀態。
涉及Stateless Worker 有幾種有用的模式。
①向外擴展熱緩存項
對於經歷高吞吐量的熱緩存項目,將每個這樣的項目保持在無狀態工作者Grain中使得a)在倉儲中並在群集中的所有倉儲中自動擴展; b)通過客戶端網關在收到客戶端請求的倉儲上使數據始終本地可用,這樣就可以在不需要額外網絡跳轉到另一個倉儲的情況下應答請求。
②減少樣式聚合
在某些場景中,應用程序需要計算集群中特定類型的所有Grains的特定指標, 並定期報告聚集體。 舉例來說,每個游戲地圖的玩家數量,VoIP呼叫的平均持續時間, 等等,如果成千上萬的Grain中的每一個都將它們的指標報告給一個單一的全局聚合器,那么聚合器就會立即過載,無法處理大量的報告。另一種方法是將此任務轉換成一個2(或更多)步驟,減少樣式聚合。第一層的聚合是通過向無狀態的工作者預聚集的Gtrain發送他們的指標。Orleans運行時將自動為每個倉儲創建無狀態工作者Grain的多個激活。由於所有這些調用都將在本地處理,不需要遠程調用或序列化消息,因此此類聚合的成本將顯著低於遠程情況。現在,每個預聚集無狀態的工作者Grain激活,獨立地或與其他本地激活進行協調,可以在不超載的情況下將聚合報告發送到全局最終聚合器(或在必要時再進行另一個還原層)。
1,介紹
1)Orleans Streams
Orleansv.1.0.0增加了對編程模型的流式擴展的支持。 流式擴展提供了一系列抽象和API,使工作流更簡單、更健壯。 流式擴展允許開發人員編寫響應式應用程序,以結構化的方式對一系列事件進行操作。 流提供程序的可擴展性模型使得編程模型與大量現有排隊技術(如Event Hubs, ServiceBus, Azure Queues和Apache Kafka.)兼容並可移植。 不需要編寫特殊的代碼或運行專門的進程來與這樣的隊列進行交互。
2)我為什么要關心?
如果你已經知道了 Stream Processing 和熟悉各種技術 Event Hubs, Kafka, Azure Stream Analytics, Apache Storm, Apache Spark Streaming, 和Reactive Extensions (Rx) in .NET, 你可能會問,你為什么要關心這個。為什么我們需要另一個流處理系統,以及Actors如何與流相關? "Why Orleans Streams?" 是用來回答這個問題的。
3)編程模型
Orleans流編程模型背后有許多原理。
- 遵循Orleans的虛擬Actors,Orleans流是虛擬的。也就是說,流總是存在。它沒有被顯式地創建或銷毀,它永遠不會失敗。
- 流是由流id識別的,它們只是由GUIDs和字符串組成的邏輯名稱。
- Orleans Streams允許從時間和空間的處理中分離數據的生成。 這意味着,流生產者和流消費者可能在不同的服務器上,在不同的時間,並會承受失敗。
- Orleans流是輕量級和動態的。 Orleans Streaming Runtime旨在處理大量來來往往的高速流。
- Orleans流綁定是動態的。 Orleans Streaming Runtime旨在處理grain以高速連接和斷開流的情況。
- Orleans Streaming Runtime透明地管理流消耗的生命周期。 應用程序訂閱一個流之后,即使在出現故障的情況下,它也會收到流的事件。
- Orleans流在grain和Orleans的客戶中工作一致。
4)編程API
應用程序通過與.NET中眾所周知的Reactive Extensions(Rx)非常相似的API與流進行交互,通過使用Orleans.Streams.IAsyncStream <T>實現
Orleans.Streams.IAsyncObserver <T>和Orleans.Streams.IAsyncObservable <T>接口。
在下面的典型示例中,設備會生成一些數據,這些數據會作為HTTP請求發送到雲中運行的服務。 在前端服務器上運行的Orleans客戶端接收到這個HTTP調用並將數據發布到匹配的設備流中:
public async Task OnHttpCall(DeviceEvent deviceEvent) { // 將數據直接發布到設備的流中。 IStreamProvider streamProvider = GrainClient.GetStreamProvider("myStreamProvider"); IAsyncStream<DeviceEventData> deviceStream = streamProvider.GetStream<DeviceEventData>(deviceEvent.DeviceId); await deviceStream.OnNextAsync(deviceEvent.Data); }
在下面的另一個例子中,聊天用戶(實現為Orleans Grain)加入聊天室,獲取由該房間中所有其他用戶生成的聊天消息流,並訂閱該消息。 請注意,聊天用戶既不需要知道聊天室Grain本身(我們的系統中可能沒有這樣的Grain),也不需要知道該群中產生消息的其他用戶。 不用說,為了產生聊天流,用戶不需要知道誰當前訂閱了流。 這說明聊天用戶如何在時間和空間上完全分離。
public class ChatUser: Grain { public async Task JoinChat(string chatGroupName) { IStreamProvider streamProvider = base.GetStreamProvider("myStreamProvider"); IAsyncStream<string> chatStream = streamProvider.GetStream<string>(chatGroupName); await chatStream.SubscribeAsync((string chatEvent) => Console.Out.Write(chatEvent)); } }
5)快速啟動示例
快速入門示例是在應用程序中使用流的總體工作流程的快速概覽。 閱讀之后,您應該閱讀Streams Programming API,以深入了解這些概念。
6)流編程API
流編程API提供了編程API的詳細描述。
7)流提供者
流可以通過各種形狀和形式的物理通道來實現,並且可以具有不同的語義。 Orleans Streaming旨在通過流提供程序的概念來支持這種多樣性,這是系統中的一個可擴展點。 Orleans目前有兩個流提供程序的實現:基於TCP的簡單消息流提供程序和基於Azure隊列的Azure隊列流提供程序。 有關Steam提供商的更多詳細信息可以在Stream Providers上找到。
8)流語義
流Subsription語義:Orleans流保證Stream Subsription操作的順序一致性。 具體說就是,當消費者訂閱一個流時,一旦代表該subsription操作的Task被成功解決,消費者就會看到訂閱之后生成的所有事件。 另外,可重放的流允許通過使用StreamSequenceToken從任意時間點訂閱。
單個流事件傳送保證:單個事件傳送保證取決於各個流提供者。 一些服務器只提供一次交付(例如簡單消息流),而另一些則至少提供一次交付(例如Azure隊列流)。 甚至有可能建立一個能夠保證一次交付的流提供者(我們還沒有這樣的提供者,但是可以用可擴展性模型來構建一個提供者)。
事件傳遞順序:事件順序還取決於特定的流提供者。 在SMS流中,制作者通過控制發布它們的方式來控制消費者看到的事件的順序。 Azure隊列流不保證FIFO順序,因為下層的Azure隊列不能保證順序失敗。 應用程序還可以通過使用StreamSequenceToken來控制自己的流傳送順序。
10)流實現
Orleans Streams Implementation提供了內部實現的高層次概述。
11)流擴展性
Orleans Streams Extensibility介紹了如何使用新功能擴展流。
12)代碼示例
在這里可以找到更多關於如何在谷物中使用流API的例子。 我們計划在未來創造更多樣本。
2,流,快速啟動
本指南將向您展示安裝和使用Orleans Streams的快速方法。 要詳細了解流式傳輸功能的詳細信息,請閱讀本文檔的其他部分。
1)所需的配置
在本指南中,我們將使用基於簡單消息的流,它使用谷物消息向訂戶發送流數據。 我們將使用內存存儲提供商存儲的訂閱列表,所以它不是真正的生產應用明智的最佳的選擇。
<Globals> <StorageProviders> <Provider Type="Orleans.Storage.MemoryStorage" Name="Default" /> <Provider Type="Orleans.Storage.MemoryStorage" Name="PubSubStore" /> </StorageProviders> <StreamProviders> <Provider Type="Orleans.Providers.Streams.SimpleMessageStream.SimpleMessageStreamProvider" Name="SMSProvider"/> </StreamProviders>
現在我們可以創建流,使用它們作為生產者發送數據,也可以作為訂戶接收數據。
2)生產事件
為流生成事件相對容易。 您應該首先訪問您在上面的配置(SMSProvider)中定義的流提供程序,然后選擇一個流並將數據推送到它。
//選擇一個聊天室grain和聊天室流guid var guid = some guid identifying the chat room //獲取我們在配置中定義的提供者之一 var streamProvider = GetStreamProvider("SMSProvider"); //獲取流的引用 var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");
正如你可以看到我們的流有一個GUID和一個命名空間。 這將使識別獨特的流變得容易。 例如,在聊天室命名空間中,“Rooms”和GUID可以是擁有RoomGrain的GUID。
這里我們使用一些已知的聊天室的GUID。 現在使用流的OnNext方法,我們可以將數據推送到它。 讓我們在一個計時器內使用隨機數字。 您也可以使用任何其他數據類型的流。
RegisterTimer(s => { return stream.OnNextAsync(new System.Random().Next()); }, null, TimeSpan.FromMilliseconds(1000), TimeSpan.FromMilliseconds(1000));
3)訂閱和接收流數據
為了接收數據,我們可以使用隱式/顯式訂閱,在手冊的其他頁面中對這些訂閱進行了全面描述。 在這里,我們使用的是更容易隱士訂閱。 當grain類型想要隱式地訂閱一個流時,它使用ImplicitStreamSubscription (namespace)]。
對於我們的情況,我們將像這樣定義一個ReceiverGrain:
[ImplicitStreamSubscription("RANDOMDATA")] public class ReceiverGrain : Grain, IRandomReceiver
現在,無論何時將某些數據推送到名稱空間RANDOMDATA的流中(如定時器中所示),具有相同流的GUID的ReceiverGrain類型的Grain將收到該消息。 即使目前不存在Grain的激活,運行時也會自動創建一個新消息並將消息發送給它。
為了使這個工作,我們需要通過設置我們的OnNext方法接收數據來完成訂閱過程。 所以我們的ReceiverGrain應該調用OnActivateAsync這樣的東西
//Create a GUID based on our GUID as a grain var guid = this.GetPrimaryKey(); //獲取我們在配置中定義的一個提供者 var streamProvider = GetStreamProvider("SMSProvider"); //獲取對流的引用 var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA"); //將我們的OnNext方法設置為lambda,它只輸出數據,這不會產生新的訂閱 await stream.SubscribeAsync<int>(async (data, token) => Console.WriteLine(data));
我們現在都准備好了 唯一的要求就是觸發我們的生產者Grain的創建,然后它將注冊計時器,並開始發送隨機整數給所有訂閱的各方。
再次,這個指南跳過了很多細節,只是為了展示大局。 閱讀本手冊的其他部分以及RX上的其他資源,以便了解可用的內容和方式。
反應式編程是解決許多問題的一個非常有效的方法。 你可以例如在用戶中使用LINQ來過濾數字,並做各種有趣的東西。
3,為什么選擇流
1)為什么選擇Orleans Streams?
已經有很多技術可以讓你建立流處理系統。 這些系統包括持久存儲流數據的系統(例如,事件中心和Kafka)以及用於在流數據上表達計算操作的系統(例如,Azure流分析,Apache風暴和Apache Spark流)。 這些都是非常棒的系統,可以讓您構建高效的數據流處理管道。
2)現有系統的局限性
然而,這些系統不適合fine-grained free-form compute over stream data。他的流計算系統首先提到,允許您指定一個統一的數據流圖,以相同的方式應用於所有的流項目。當數據是一致的時候,這是一個強大的模型,並且您想要對這些數據表達相同的轉換、過濾或聚合操作。但是也有其他的用例,您需要在不同的數據項上表達完全不同的操作。在其中一些過程中,作為處理的一部分,您偶爾需要進行外部調用,例如調用一些任意REST API。統一的數據流處理引擎要么不支持這些場景,要么以有限的、受限的方式支持它們,或者在支持它們方面效率低下。這是因為它們天生就針對大量類似的項目進行優化,並且通常在表達性和處理方面都很有限。Orleans流的目標是那些其他的情形。
3)動機
這一切都是從Orleans用戶的請求開始的,他們支持從一個Grain方法調用返回一個項目序列。你可以想象,這只是冰山一角。實際上他們需要的遠不止這些。
Orleans Streams的一個典型場景是當你有每個用戶流,並且你想在每個用戶的上下文中為每個用戶執行不同的處理。 我們可能有數百萬用戶,但其中一些對天氣感興趣,可以訂閱特定位置的天氣警報,而有些則對體育賽事感興趣; 有人正在跟蹤某個航班的狀態。 處理這些事件需要不同的邏輯,但是您不希望運行兩個獨立的流處理實例。 一些用戶只對特定的庫存感興趣,並且只有在某些外部條件適用的情況下,條件可能不一定是流數據的一部分(因此需要在運行時作為處理的一部分在動態時檢查)。
用戶一直在改變他們的興趣,因此他們對特定事件流的訂閱動態地來來回回,因此流動拓撲結構動態而迅速地變化。 此外,根據用戶狀態和外部事件,每個用戶的處理邏輯也會動態變化和變化。 外部事件可能會修改特定用戶的處理邏輯。 例如,在游戲作弊檢測系統中,當發現新的作弊方式時,處理邏輯需要根據新的規則進行更新,以檢測出新的違規行為。 這當然需要在不中斷正在進行的處理流程的情況下完成。 批量數據流流處理引擎不是為了支持這種情況而構建的。
毋庸置疑,這樣的系統必須在多個聯網的機器上運行,而不是在單個節點上運行。 因此,處理邏輯必須以可擴展和彈性的方式分布在一組服務器上。
4)新的要求
我們確定了我們的流處理系統的4個基本要求,這將允許它針對上述情況。
- 靈活的流處理邏輯
- 支持動態拓撲
- 細粒度的流粒度
- 分布
5)靈活的流處理邏輯
我們希望系統支持表達流處理邏輯的不同方式。 我們上面提到的現有系統要求開發人員編寫一個聲明式的數據流計算圖,通常是遵循函數式編程風格。 這限制了處理邏輯的表達性和靈活性。 Orleans流對表達處理邏輯的方式漠不關心。 它可以表示為數據流(例如,通過在.NET中使用Reactive Extensions(Rx)); 作為功能程序; 作為聲明性查詢; 或在一般的命令邏輯。 邏輯可以是有狀態或無狀態的,可能有也可能不會有副作用,並且可以觸發外部行為。 所有權力都交給開發者。
6)支持動態拓撲
我們希望系統允許動態演進的拓撲結構。 我們上面提到的現有系統通常僅限於在部署時固定並且不能在運行時發展的靜態拓撲。 在下面的數據流表達式例子中,一切都很好,很簡單,直到你需要改變它。
Stream.GroupBy(x=> x.key).Extract(x=>x.field).Select(x=>x+2).AverageWindow(x, 5sec).Where(x=>x > 0.8) *
在Where過濾器中更改閾值條件,添加額外的Select語句或在數據流圖中添加另一個分支並生成新的輸出流。 在現有的系統中,如果不拆除整個拓撲並重新啟動數據流,這是不可能的。 實際上,這些系統將檢查現有的計算,並能夠從最新的檢查點重新啟動。 但是,這樣的重啟對於實時產生結果的在線服務是破壞性的並且是昂貴的。 當我們談論大量這樣的以相似但不同的(每用戶,每個設計等等)參數執行並且保持不斷變化的表達式時,這樣的重新啟動變得特別不切實際。
我們希望系統允許在運行時演進流處理圖,通過向計算圖添加新的鏈接或節點,或通過改變計算節點內的處理邏輯。
7)細粒度的流粒度
在現有的系統中,抽象的最小單位通常是整個流程(拓撲)。但是,我們的許多目標場景要求拓撲中的單個節點/鏈路本身是一個邏輯實體。這樣每個實體都可以獨立管理。例如,在由多個鏈路組成的大流量拓撲中,不同鏈路可以具有不同的特性,並且可以在不同的物理傳輸上實現。一些鏈接可以通過TCP套接字,而另一些則通過可靠的隊列。不同的鏈接可以有不同的交付保證。不同的節點可以有不同的檢查點策略,其處理邏輯可以用不同的模型甚至不同的語言來表示。現有系統通常不具有這種靈活性。
抽象單位和靈活性的論點類似於SoA(面向服務的體系結構)與參與者的比較。演員系統允許更多的靈活性,因為每個人本質上是一個獨立管理的“小服務”。同樣,我們希望系統允許這樣一個細粒度的控制。
8)分配
當然,我們的系統應該具有“良好的分布式系統”的所有特性。 包括:
- 可伸縮性 - 支持大量的流和計算元素。
- 彈性 - 允許添加/刪除資源以根據負載進行增長/收縮。
- 可靠性 - 對故障具有恢復能力
- 效率 - 高效使用底層資源
- 響應能力 - 啟用接近實時的情況。
說明:Orleans目前不直接支持編寫聲明式數據流表達式,如上例所示。 目前的Orleans流媒體API是更低層次的構建塊,如下所述。 提供聲明性數據流表達式是我們未來的目標。
4,流編程API
1)Orleans Streams編程API
應用程序通過與.NET中眾所周知的反應式擴展(Rx)非常相似的API與流進行交互。 主要區別在於Orleans流擴展是異步的,以便在Orleans的分布式和可伸縮計算結構中提高處理效率。
2)異步流
應用程序啟動使用流提供者得到一個處理流。 您可以在這里閱讀關於流提供者的更多信息,但現在您可以將其視為流工廠,允許實現者自定義流行為和語義:
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider"); IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
應用程序可以通過在Grain類中調用Grain類的GetStreamProvider方法,或者在客戶端上調用GrainClient.GetStreamProvider()方法來獲得對流提供者的引用。
Orleans.Streams.IAsyncStream <T>是虛擬流的邏輯強類型句柄。 它與Orleans Grain引用的相似。 調用GetStreamProvider和GetStream純粹是本地的。 GetStream的參數是一個GUID和一個額外的字符串,我們稱之為流名稱空間(可以為空)。 GUID和名稱空間字符串一起組成流標識(類似於GrainFactory.GetGrain的參數)。 GUID和名稱空間字符串的組合為確定流標識提供了額外的靈活性。 就像Grain類型PlayerGrain中可能存在Grain 7,並且Grain類型ChatRoomGrain內可能存在不同Grain 7,則流123可以與流名稱空間PlayerEventsStream一起存在,並且流名稱空間ChatRoomMessagesStream內可以存在不同的流123。
3)生產和消費
IAsyncStream <T>實現了Orleans.Streams.IAsyncObserver <T>和Orleans.Streams.IAsyncObservable <T>接口。 這樣,應用程序就可以使用這個流來使用Orleans.Streams.IAsyncObserver <T>來產生新的事件到流中,或者通過使用Orleans.Streams.IAsyncObservable <T>來訂閱和使用來自流的事件。
public interface IAsyncObserver<in T> { Task OnNextAsync(T item, StreamSequenceToken token = null); Task OnCompletedAsync(); Task OnErrorAsync(Exception ex); } public interface IAsyncObservable<T> { Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer); }
為了在流中生成事件,應用程序只需要調用
await stream.OnNextAsync<T>(event)
要訂閱一個流,應用程序調用
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
SubscribeAsync的參數既可以是實現IAsyncObserver接口的對象,也可以是用於處理傳入事件的lambda函數的組合。 SubscribeAsync的更多選項可通過AsyncObservableExtensions類獲得。 SubscribeAsync返回一個StreamSubscriptionHandle <T>,它是一個不透明的Handle,可用於取消訂閱流(類似於IDisposable的異步版本)。
await subscriptionHandle.UnsubscribeAsync()
需要注意的是訂閱是一個Grain,而不是一個激活。 一旦Grain代碼訂閱了流,這個訂閱超過了這種激活的生命,並保持永久持續,直到Grain代碼(可能在不同的激活)明確退訂。 這是虛擬流抽象的核心:不僅所有的流都是邏輯地存在,而且流訂閱也是持久的,並且超出了發布此訂閱的特定物理激活。
4)多重性
Orleans流可能有多個生產者和多個消費者。 生產者發布的消息將被傳遞給消息發布之前訂閱了流的所有消費者。
另外,消費者可以多次訂閱相同的流。 每次訂閱時,都會返回一個唯一的StreamSubscriptionHandle <T>。 如果Grain(或客戶端)被X次訂閱到同一個流,它將接收相同的事件X次,每次訂閱一次。 消費者還可以通過以下方式退訂個人訂閱或查找其當前所有訂閱:
IList<StreamSubscriptionHandle<T>> allMyHandles = await IAsyncStream<T>.GetAllSubscriptionHandles()
5)從故障中恢復
如果一個流的生產者掛了(或者它的Grain被停用),那么它就沒有必要去做。 下一次這個Grain想要產生更多的事件,它可以重新獲得流處理,並以相同的方式產生新的事件。
消費者邏輯涉及更多一點。 正如我們之前所說,一旦消費者Grain訂閱了一個流,這個訂閱是有效的,直到它明確退訂。 如果流的消費者死亡(或其Grain被停用),並且在該流上產生新的事件,則消費者Grain將被自動地重新激活(就像任何常規的Orleans Grain在向其發送消息時自動激活)。 grain代碼現在唯一需要做的就是提供一個IAsyncObserver <T>來處理數據。 消費者基本上需要重新附加處理邏輯作為OnActivateAsync方法的一部分。 要做到這一點,可以調用:
StreamSubscriptionHandle<int> newHandle = await subscriptionHandle.ResumeAsync(IAsyncObserver)
消費者使用它在第一次訂購時得到的上一個句柄,以便“恢復處理”。 請注意,ResumeAsync僅使用IAsyncObserver邏輯的新實例更新現有訂閱,並不會更改此消費者已訂閱此流的事實。
消費者如何擁有一個舊的訂閱句柄?有兩個選項。消費者可能已經持久化了從原來的SubscribeAsync操作中返回的句柄,現在可以使用它了。或者,如果消費者沒有這個句柄,它可以通過調用來要求IAsyncStream 的所有主動訂閱句柄:
IList<StreamSubscriptionHandle<T>> allMyHandles = await IAsyncStream<T>.GetAllSubscriptionHandles()
消費者現在可以恢復所有這些,或者如果他願意的話退訂。
注釋:如果用戶grain直接實現了IAsyncObserver接口(公共類MyGrain <T>:Grain,IAsyncObserver <T>),理論上不需要重新連接IAsyncObserver,因此不需要調用ResumeAsync。 流式運行時應該能夠自動確定grain已經實現了IAsyncObserver,並且只會調用那些IAsyncObserver方法。 然而,流式運行環境目前不支持這個,即使糧食直接實現了IAsyncObserver,糧食代碼仍然需要顯式調用ResumeAsync。 支持這是在我們的TODO名單上。
6)顯式和隱式訂閱
默認情況下,流消費者必須顯式訂閱流。 這種訂閱通常會由Grain(或客戶端)收到的一些外部消息觸發,指示他們訂閱。 例如,在聊天服務中,當用戶加入聊天室時,他的Grain會收到帶有聊天名稱的JoinChatGroup消息,並且會導致用戶Grain訂閱這個聊天流。
此外,Orleans Streams也支持“隱式訂閱”。在這個模型中,Grain並不明確訂閱流。這個Grain是自動訂閱的,隱式的,只是基於其Grain身份和一個ImplicitStreamSubscription屬性。隱式訂閱的主要價值是允許流活動觸發Grain激活(因此觸發訂閱)自動。 例如,使用SMS流,如果一個Grain想要產生一個流,而另一個Grain處理這個流,那么生產者就需要知道消費者Grain的身份,並且要求Grain調用它來訂購這個流。 只有在此之后,才能開始發送事件。 只有在此之后,才能開始發送事件。 相反,使用隱式訂閱,生產者可以開始將事件生成到流上,並且消費者Grain將自動被激活並訂閱流。 在這種情況下,制片人完全不在乎誰在閱讀這些事件
類型為MyGrainType的Grain實現類可以聲明一個屬性[ImplicitStreamSubscription(“MyStreamNamespace”)]。 這將告訴流式運行時,如果在標識為GUID XXX和“MyStreamNamespace”命名空間的流上生成事件,則應該將其傳遞給標識為XXX類型為MyGrainType的grain。 也就是說,運行時將流<XXX,MyStreamNamespace>映射到消費者Grain<XXX,MyGrainType>。
ImplicitStreamSubscription的存在使得流式運行時自動將這個Grain訂閱到一個流,並將流事件傳遞給它。 然而,grain代碼仍然需要告訴運行時如何處理事件。 本質上,它需要附加IAsyncObserver。 因此,在激活Grain時,OnActivateAsync中的Grain代碼需要調用:
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider"); IAsyncStream<T> stream = streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace"); StreamSubscriptionHandle<T> subscription = await stream.SubscribeAsync(IAsyncObserver<T>);
7)編寫訂閱邏輯
以下是關於如何為各種情況編寫訂閱邏輯的准則:顯式和隱式訂閱,可回放和不可回放的流。 顯式和隱式訂閱的主要區別在於,對於隱式的grain,每個流名稱空間總是只有一個隱式訂閱,沒有辦法創建多個訂閱(沒有訂閱多重性),沒有辦法退訂,而 grain邏輯總是只需要附加處理邏輯。 這也意味着,對於隱式訂閱,從不需要恢復訂閱。 另一方面,對於明確的訂閱,需要恢復訂閱,否則如果再次訂閱,將會導致訂閱多次。
①隱含訂閱:
對於隱式訂閱,Grain需要訂閱附加處理邏輯。 這應該在Grain的OnActivateAsync方法中完成。 Grain應該簡單地執行在其OnActivateAsync方法中等待stream.SubscribeAsync(OnNext ...)。 這將導致這個特定的激活附加OnNext函數來處理該流。 grain可以選擇指定StreamSequenceToken作為SubscribeAsync的參數,這將導致這個隱式訂閱從該標記開始消耗。 從不需要隱式訂閱來調用ResumeAsync。
public async override Task OnActivateAsync() { var streamProvider = GetStreamProvider(PROVIDER_NAME); var stream = streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace"); await stream.SubscribeAsync(OnNextAsync) }
②顯式訂閱:
對於顯式訂閱,grain必須調用SubscribeAsync來訂閱流。 這創建了一個訂閱,以及附加的處理邏輯。 顯式訂閱將存在,直到Grain退訂,所以如果Grain被取消激活,Grain仍然顯式訂閱,但不附加處理邏輯。 在這種情況下,Grain需要重新連接處理邏輯。 要做到這一點,在OnActivateAsync中,Grain首先需要通過調用stream.GetAllSubscriptionHandles()來找出它的訂閱。 grain必須在每個希望繼續處理的handle上執行ResumeAsync,或者在完成的任何handle上執行UnsubscribeAsync。 Grain還可以選擇指定StreamSequenceToken作為ResumeAsync調用的參數,這將導致顯式訂閱從該令牌開始消耗。
public async override Task OnActivateAsync() { var streamProvider = GetStreamProvider(PROVIDER_NAME); var stream = streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace"); var subscriptionHandles = await stream.GetAllSubscriptionHandles(); if (!subscriptionHandles.IsNullOrEmpty()) subscriptionHandles.ForEach(async x => await x.ResumeAsync(OnNextAsync)); }
8)流順序和序列令牌
個體生產者和個人消費者之間交付事件的順序取決於流提供者。
通過SMS,生產者通過控制他發布的方式來明確地控制消費者看到的事件的順序。 默認情況下(如果SMS提供程序的FireAndForget選項設置為false),並且生產者等待每個OnNextAsync調用,則事件按FIFO順序到達。 在SMS中,由生產者決定如何處理由OnNextAsync調用返回的破壞的Task所指示的交付失敗。
Azure隊列流不保證FIFO順序,因為底層的Azure隊列不能保證在失敗情況下的順序(它們確保在無故障執行中的FIFO順序)。 當生產者將事件生成到Azure隊列中時,如果排隊操作失敗,則由生產者嘗試另一個排隊,然后再處理潛在的重復消息。 在交付方面,Orleans Streaming運行時從Azure隊列中取出事件並嘗試將其交付給消費者進行處理。 Orleans Streaming運行時只有在成功處理后才會從隊列中刪除事件。 如果交付或處理失敗,則該事件不會從隊列中刪除,並會在稍后自動重新出現在隊列中。 Streaming運行時將嘗試再次傳送,因此可能會破壞FIFO的順序。 描述的行為符合Azure隊列的常規語義。
應用程序定義的順序:要處理上述順序問題,應用程序可以選擇指定自己的順序。 這是通過StreamSequenceToken的概念來實現的。 StreamSequenceToken是一個不透明的IComparable對象,可用於對事件進行排序。 生產者可以將可選的StreamSequenceToken傳遞給OnNext調用。 這個StreamSequenceToken將被一直傳遞給消費者,並將與該事件一起交付。 這樣,應用程序就可以獨立於流式運行時間來推理和重建它的順序。
9)可回放的流
一些數據流只允許應用程序在最近的時間點開始訂閱,而其他數據流允許“回溯”。 后者的能力取決於潛在的排隊技術和特定的流提供者。 例如,Azure隊列只允許使用最新的入隊事件,而EventHub允許從任意時間點(最多到某個過期時間)重放事件。 支持回溯的流被稱為可回溯流。
可重放流的使用者可以將StreamSequenceToken傳遞給SubscribeAsync調用,並且運行時將從該StreamSequenceToken(一個null標記表示消費者希望從最近開始接收事件)開始向其傳遞事件。
回放流的能力在恢復場景中非常有用。 例如,考慮訂閱流的grain,並定期檢查其狀態以及最新的序列標記。 從故障中恢復時,grain可以從最新的檢查點序列標記重新訂閱相同的流,從而進行恢復,而不會丟失自上一個檢查點以來生成的任何事件。
可重放流的當前狀態:SMS和Azure隊列提供程序都不可回滾,Orleans當前不包含可重放流的實現。 我們正在積極努力。
10)無狀態自動擴展處理
默認情況下,Orleans Streaming的目標是支持大量相對較小的流,每個流都由一個或多個完整的Grains進行處理。 所有的流加工在一起,在大量的正常(穩定的)Grain中被分割。 應用程序代碼通過分配流ID,grain ID和顯式訂閱來控制這個分片。 目標是分解狀態處理。
但是,自動擴展的無狀態處理也是一個有趣的場景。 在這種情況下應用程序有少量的流(甚至一個大的流),目標是無狀態處理。 例如,所有事件的所有消息的全局流以及涉及某種解碼/解密的處理,並可能將它們轉發到另一組流中進行進一步的有狀態處理。 Orleans通過StatelessWorker谷物支持無狀態的擴展流處理。
無狀態自動擴展處理的當前狀態:目前尚未實現(由於優先級限制)。 嘗試訂閱來自StatelessWorker grain的流將導致未定義的行為。 我們正在考慮支持這個選項。
11)Grain和Orleans客戶端
Orleans流在Grain和Orleans客戶端之間均勻流通。 也就是說,Grain和Orleans客戶端可以使用完全相同的API來生成和消費事件。 這極大地簡化了應用程序邏輯,使特殊的客戶端API(如Grain Observers)變得冗余。
12)完全管理和可靠的流媒體Pub-Sub
為了跟蹤流預訂,Orleans使用一個名為Streaming Pub-Sub的運行時組件,它作為流消費者和流生產者的集合點。 Pub Sub跟蹤所有的流訂閱,保持它們,並將流消費者與流生產者匹配。
應用程序可以選擇Pub-Sub數據的存儲位置和方式。 Pub-Sub組件本身被實現為Grain(稱為PubSubRendezvousGrain),它正在使用Orleans聲明式持久性來表示這些Grain。 PubSubRendezvousGrain使用名為PubSubStore的存儲提供程序。 與任何Grain一樣,您可以指定一個存儲提供者的實現。 對於Streaming Pub-Sub,您可以在配置文件中更改PubSubStore的實現:
<OrleansConfiguration xmlns="urn:orleans"> <Globals> <StorageProviders> <Provider Type="Orleans.Storage.AzureTableStorage" Name="PubSubStore" /> </StorageProviders> </Globals> </OrleansConfiguration>
這樣,Pub-Sub數據將持久存儲在Azure表中。 對於最初的開發,您也可以使用內存存儲。 除Pub-Sub之外,Orleans Streaming Runtime還將生產者的事件傳遞給消費者,管理分配給主動使用的流的所有運行時資源,並透明地垃圾從未使用的流中收集運行時資源。
13)配置
為了使用流,您需要通過配置啟用流提供程序。 你可以在這里閱讀更多關於流提供者。 示例流提供程序配置:
<OrleansConfiguration xmlns="urn:orleans"> <Globals> <StreamProviders> <Provider Type="Orleans.Providers.Streams.SimpleMessageStream.SimpleMessageStreamProvider" Name="SMSProvider"/> <Provider Type="Orleans.Providers.Streams.AzureQueue.AzureQueueStreamProvider" Name="AzureQueueProvider"/> </StreamProviders> </Globals> </OrleansConfiguration>
也可以通過調用Orleans.Runtime.Configuration.GlobalConfiguration或Orleans.Runtime.Configuration.ClientConfiguration類中的一個RegisterStreamProvider方法來以編程方式注冊流提供程序。
public void RegisterStreamProvider(string providerTypeFullName, string providerName, IDictionary<string, string> properties = null) public void RegisterStreamProvider<T>(string providerName, IDictionary<string, string> properties = null) where T : IStreamProvider
5,流提供者
流可以有不同的形狀和形式, 一些流可能通過直接的TCP鏈接傳遞事件,而另一些則通過持久隊列傳遞事件。不同的流類型可能使用不同的批處理策略、不同的緩存算法或不同的背壓過程。我們不希望將流應用程序限制在這些行為選擇的一小部分。 相反,Stream Providers是Orleans Streaming Runtime的擴展點,允許用戶實現任何類型的流。 這個可擴展性與Orleans存儲提供商的精神是相似的。 Orleans目前提供兩個默認流提供程序:簡單消息流提供程序和Azure隊列流提供程序。
1)簡單的消息流提供者
簡單的消息流提供商,也被稱為SMS提供商,通過利用常規的Orleans Grain消息傳遞在TCP上傳遞事件。 由於SMS中的事件是通過不可靠的TCP鏈接傳送的,因此SMS不保證可靠的事件傳送,也不會自動重新發送SMS流的失敗消息。 SMS流的生產者有一種方法來知道他的事件是否被成功地接收和處理:默認情況下,對stream.OnNextAsync的調用返回一個代表流消費者處理狀態的Task。 如果這個任務失敗了,生產者可以決定再次發送相同的事件,從而在應用層面上實現可靠性。 雖然個人流消息傳遞是盡力而為的,但SMS流本身是可靠的。 也就是說,Pub Sub執行的用戶到生產者的綁定是完全可靠的。
2)Azure隊列(AQ)流提供程序
Azure隊列(AQ)流提供程序通過Azure隊列傳遞事件。 在生產者方面,AQ Stream Provider將事件直接排入Azure隊列。 在消費者方面,AQ Stream Provider管理一組拉取代理,這些拉取代理從一組Azure隊列中提取事件,並將其傳遞給使用它們的應用程序代碼。 人們可以把提取代理想象成一個分布式的“微服務” - 一個分區的,高度可用的,有彈性的分布式組件。 拉出的代理程序運行在宿主應用程序Grains的同一倉儲內。 因此,不需要運行獨立的Azure角色來從隊列中拉出。 牽引代理的存在,其管理,后台管理,平衡他們之間的隊列以及將失敗代理中的隊列交給另一個代理完全由Orleans Streaming Runtime管理,並且對於使用流的應用程序代碼是透明的。
3)隊列適配器
通過持久隊列傳遞事件的不同流提供者表現出類似的行為,並受到類似的實現。 因此,我們提供了一個通用的可擴展PersistentStreamProvider,它允許開發人員從頭開始寫入不同類型的隊列,而無需從頭開始編寫全新的流提供程序。 PersistentStreamProvider用IQueueAdapter參數化,IQueueAdapter抽象出特定的隊列實現細節,並提供入隊和出隊事件的方法。 其余的由PersistentStreamProvider內部的邏輯處理。 上面提到的Azure隊列提供程序也是這樣實現的:它是具有AzureQueueAdapter的PersistentStreamProvider實例。
6,流實現
本節提供了Orleans Stream實現的高級概述。 它描述了在應用程序級別上不可見的概念和細節。 如果您只打算使用流,則不必閱讀本節。 但是,如果您打算擴展流,請在閱讀Streams Extensibility部分之前閱讀本節。
術語:
我們將“queue”這個詞引用到任何可以提取流事件的持久存儲技術,並允許提取事件或提供基於推的機制來消費事件。 通常,為了提供可伸縮性,這些技術提供分片/分區隊列。 例如,Azure隊列允許創建多個隊列,事件中心有多個中心,Kafka主題,...
1)持久化流
所有Orleans持久流提供者共享一個通用的實現PersistentStreamProvider。 這個通用的流提供者是通過一個技術特定的IQueueAdapter進行參數化的。
當流生成器生成一個新的流項並調用stream.OnNext()時,Orleans流運行時調用該流提供者的IQueueAdapter上的適當方法,將該項直接排入適當的隊列。
2)拉代理
持續流提供者的核心是拉動代理。 提取代理從一組持久隊列中提取事件,並將它們傳送到消耗它們的Grain中的應用程序代碼。 人們可以把提取代理想象成一個分布式的“微服務” - 一個分區的,高度可用的,有彈性的分布式組件。 牽引劑運行在托管Grain的同一個倉儲內,並由Orleans Streaming Runtime完全管理。
3)StreamQueueMapper和StreamQueueBalancer
Pulling代理使用IStreamQueueMapper和StreamQueueBalancerType進行參數化。IStreamQueueMapper提供了所有隊列的列表,並負責將流映射到隊列。 這樣,持久流提供者的生產者端知道將哪個隊列排入消息。StreamQueueBalancerType表示Orleans倉儲和代理之間的隊列平衡方式。 目標是以平衡的方式將隊列分配給代理,以防止瓶頸和支持彈性。 當新的倉儲被添加到Orleans集群時,隊列會自動在舊的和新的筒倉中重新平衡。 StreamQueueBalancer允許自定義該進程。 Orleans有一些內置的StreamQueueBalancers,用於支持不同的平衡方案(大小隊列數)和不同的環境(Azure,on prem,static)。
4)Pulling 協議
每個倉儲都運行一組拖放代理,每個代理都從一個隊列中拉出。提取代理本身由內部運行時組件(稱為SystemTarget)實現。系統目標本質上是運行時的Grain,受到單線程並發性的影響,可以使用常規的Grain消息傳遞,並且像Grain一樣輕。與Grain相反,系統目標不是虛擬的:它們是顯式創建的(由運行時創建的),也不是位置透明的。通過將拉代理實現為系統目標,對Orleans的流運行時可以依賴於許多內置的Orleans特性,並且可以擴展到大量的隊列,因為創建一個新的拉動代理就像創建一種新的谷Grain一樣便宜。
每個拉取代理都運行定期計時器,該計時器從隊列中拉出(通過調用IQueueAdapterReceiver)GetQueueMessagesAsync()方法。 返回的消息放在名為IQueueCache的內部每個代理程序數據結構中。 每條消息都被檢查以找出其目的地流。 代理程序使用Pub Sub來找出訂閱此流的流消費者的列表。 一旦消費者列表被檢索到,代理將其存儲在本地(在其pub-sub高速緩存中),因此不需要在每個消息上咨詢Pub Sub。 代理還與pub-sub訂閱,以接收任何訂閱該流的新消費者的通知。 代理與pub-sub之間的握手保證了強大的流式訂閱語義:一旦消費者訂閱了流,它就會看到訂閱之后生成的所有事件(另外,使用StreamSequenceToken允許在過去訂閱)。
5)隊列緩存
IQueueCache是一種內部的每個代理數據結構,允許將新事件從隊列中傳遞給消費者。 它也允許將交付分離到不同的流和不同的消費者。
想象一下,一個流有三個流消費者,其中一個流很慢。 如果不小心,緩慢的消費者有可能會影響代理商的進度,減緩其他消費者的消費,甚至有可能減緩其他消費者的排隊和交付。 為了防止這種情況,並允許代理中的最大並行性,我們使用IQueueCache。
IQueueCache緩存流事件,並為代理提供一種方式,以按照其速度向每個消費者傳遞事件。 每個消費者交付由稱為IQueueCacheCursor的內部組件實現,該組件跟蹤每個消費者的進度。 這樣,每個消費者都能按照自己的速度接收事件:快速消費者接收事件的速度就像從隊列中出列的速度一樣快,而慢速消費者接收事件的速度也是如此。 一旦消息被傳遞給所有消費者,它可以從緩存中刪除。
6)Backpressure
在Orleans,流運行時的Backpressure應用於兩個地方:將流事件從隊列中引入代理,並將事件從代理傳遞到流消費者。
后者由內置的Orleans消息傳遞機制提供。 每一個流事件都是通過標准的Orleans grain 消息從代理商傳遞給消費者,一次一個。 也就是說,代理向每個流消費者發送一個事件(或一個有限的大小的事件)並等待這個呼叫。 下一個事件將不會開始傳遞,直到上一個事件的任務已解決或中斷。 這樣,我們自然地將每個消費者的交付率限制在一個消息。
關於從排隊到代理商的流事件Orleans流媒體提供了一個新的特殊的Backpressure機制。由於代理將隊列中的事件從隊列中分離出來並交付給消費者,所以單個緩慢的消費者可能落后得太多以至於IQueueCache將被填滿。為了防止IQueueCache無限增長,我們限制它的大小(大小限制是可配置的)。然而,代理從來沒有拋出未交付的事件。相反,當緩存開始填滿時,代理會降低從隊列中取出事件的速率。那樣的話,我們可以通過調整我們從隊列中消耗的速度(“背壓”)來“調整”緩慢的交付周期,並在稍后恢復到快速消費速度。為了檢測“慢速遞送”谷,IQueueCache使用高速緩存桶的內部數據結構來追蹤事件傳遞給單個流消費者的進度。這導致了一個非常靈敏和自我調整的系統。
7,流擴展性
開發人員可以通過三種方式擴展當前已實現的Orleans流的行為:
- 利用或擴展流提供者配置。
- 編寫一個自定義隊列適配器。
- 寫入一個新的流提供程序
我們將在下面描述這些。請在閱讀本節之前閱讀新Orleans的Streams實現,以便對內部實現有一個高級的視圖。
1)流提供程序配置
目前實現的流提供程序支持一些配置選項。
簡單的消息流提供者配置。 SMS Stream Provider當前僅支持單個配置選項:
- FireAndForgetDelivery:這個選項指定SMS流生成器發送的消息是否被發送,忘記了是否被發送。 當FireAndForgetDelivery設置為false(消息發送不是FireAndForget)時,流生成器的調用stream.OnNext()將返回一個Task,它表示流消費者的處理狀態。 如果這個任務成功了,那么制作人就可以確定消息已經成功傳遞和處理了。 如果FireAndForgetDelivery設置為true,則返回的Task僅表示Orleans運行時已接受消息並將其排入隊列以供進一步傳送。 FireAndForgetDelivery的默認值為false。
持續流提供程序配置。 所有持久流提供程序都支持以下配置選項:
- GetQueueMessagesTimerPeriod - 在代理嘗試再次拉取之前,最后一次嘗試從隊列中拉出沒有返回任何項目的拉取代理等待的時間。 缺省值是100毫秒。
- InitQueueTimeout - 拉取代理程序等待適配器初始化與隊列的連接的時間。 默認是5秒。
- QueueBalancerType - 用於在隊列和代理之間平衡隊列的平衡算法的類型。 默認是ConsistentRingBalancer。
Azure隊列流提供程序配置。 Azure隊列流提供程序除持久化流提供程序支持的以外,還支持以下配置選項:
- DataConnectionString - Azure隊列存儲連接字符串。
- DeploymentId - 此Orleans集群的部署標識(通常類似於Azure部署標識)。
- CacheSize - 持久提供者緩存的大小,用於存儲流消息以進一步傳遞。 默認是4096。
這將是完全可能的,而且很多時候很容易提供額外的配置選項。 例如,在某些場景中,開發人員可能需要更多地控制隊列適配器使用的隊列名稱。 這是目前抽象與IStreamQueueMapper,但目前沒有辦法配置哪個IStreamQueueMapper使用,而無需編寫一個新的代碼。 如果需要的話,我們很樂意提供這樣的選擇。 所以在編寫一個全新的提供者之前,請考慮在現有的流提供者中添加更多的配置選
2)編寫自定義隊列適配器
如果您想使用不同的排隊技術,則需要編寫一個隊列適配器,將適配器的訪問權限抽象出來。 下面我們提供如何完成的細節。 有關示例,請參閱AzureQueueAdapterFactory。
-
首先定義一個實現IQueueAdapterFactory的MyQueueFactory類。 你需要:
a. 初始化工廠:讀取傳遞的配置值,如果需要,可能會分配一些數據結構等。
b. 實現一個返回IQueueAdapter的方法。
c.實現一個返回IQueueAdapterCache的方法。 理論上來說,你可以建立你自己的IQueueAdapterCache,但是你不需要。 分配並返回Orleans的SimpleQueueAdapterCache是個好主意。
d.實現一個返回IStreamQueueMapper的方法。 再一次,理論上可以建立你自己的IStreamQueueMapper,但是你不需要。 分配並返回一個Orleans HashRingBasedStreamQueueMapper是一個好主意。
-
實現實現IQueueAdapter接口的MyQueueAdapter類,該接口是管理對分片隊列的訪問的接口。 IQueueAdapter管理對一組隊列/隊列分區(這些是由IStreamQueueMapper返回的隊列)的訪問。 它提供了在指定的隊列中排隊消息的能力,並為特定的隊列創建IQueueAdapterReceiver。
-
實現實現IQueueAdapterReceiver的MyQueueAdapterReceiver類,它是管理對一個隊列(一個隊列分區)的訪問的接口。 除了初始化和關閉之外,它基本上提供了一種方法:從隊列中檢索最多maxCount消息。
-
聲明公共類MyQueueStreamProvider:PersistentStreamProvider <MyQueueFactory>。 這是您的新Stream Provider。
-
配置:為了加載和使用你新的流提供商,你需要通過倉儲配置文件來正確配置它。 如果您需要在客戶端上使用它,則需要在客戶端配置文件中添加一個類似的配置元素。 也可以以編程方式配置流提供程序。 以下是倉儲配置的一個例子:
<OrleansConfiguration xmlns="urn:orleans"> <Globals> <StreamProviders> <Provider Type="My.App.MyQueueStreamProvider" Name="MyStreamProvider" GetQueueMessagesTimerPeriod="100ms" AdditionalProperty="MyProperty"/> </StreamProviders> </Globals> </OrleansConfiguration>
3)編寫一個完全新的流提供程序也可以編寫一個全新的Stream Provider
在這種情況下,從Orleans 的角度來看,需要做的很少的整合。 您只需要實現IStreamProviderImpl接口,該接口是一個允許應用程序代碼獲取流的句柄的精簡接口。 除此之外,如何實施它完全取決於你。 實現一個全新的Stream Provider可能變成一個相當復雜的任務,因為您可能需要訪問各種內部運行時組件,其中一些可能具有內部訪問權限。
我們目前沒有預想到需要實現一個全新的Stream Provider並且不能通過上面提到的兩個選項來實現他的目標的場景:通過擴展配置或者通過編寫隊列適配器。 但是,如果您認為您有這樣的情況,我們希望聽到這個消息,並一起努力簡化編寫新的流提供程序。