Microsoft Orleans 之 開發人員指南


開發一個Grain 

在開發Grain之前請先閱讀Grains 這篇文章

Grain 接口

Grains通過調用各自定義在接口中的方法相互作用,一個grain實現了事先定義好的一個或多個接口,grain接口中的方法必須返回Task(如果沒有返回值) 或者Task<T>(如果有類型返回值)

如下事例樣本:

//an example of a Grain Interface public interface IPlayerGrain : IGrainWithGuidKey { Task<IGameGrain> GetCurrentGame(); Task JoinGame(IGameGrain game); Task LeaveGame(IGameGrain game); } //an example of a Grain class implementing a Grain Interface public class PlayerGrain : Grain, IPlayerGrain { private IGameGrain currentGame; // Game the player is currently in. May be null. public Task<IGameGrain> GetCurrentGame() { return Task.FromResult(currentGame); } // Game grain calls this method to notify that the player has joined the game. public Task JoinGame(IGameGrain game) { currentGame = game; Console.WriteLine("Player {0} joined game {1}", this.GetPrimaryKey(), game.GetPrimaryKey()); return TaskDone.Done; } // Game grain calls this method to notify that the player has left the game. public Task LeaveGame(IGameGrain game) { currentGame = null; Console.WriteLine("Player {0} left game {1}", this.GetPrimaryKey(), game.GetPrimaryKey()); return TaskDone.Done; } }

Grain 引用
一個grain引用是一個代理對象,實現了相同grain接口由相應的grain類實現。使用異步消息,grains之間實現全雙工通訊,以及和客戶端,grain引用是通過grain的identity並通過調用方法GrainFactory.GetGrain<T>()
來創建,開發人員使用它就像使用普通.net 對象一樣,它可以作為一個方法的返回值傳遞一個方法,如下:
在orleans 客戶端代碼:
  //construct the grain reference of a specific player IPlayerGrain player = GrainClient.GrainFactory.GetGrain<IPlayerGrain>(playerId);
在另外一個grain中使用:
 //construct the grain reference of a specific player IPlayerGrain player = GrainFactory.GetGrain<IPlayerGrain>(playerId);

Grain方法調用

Orleans編程模型是一組異步編程模型,可以使用async and await,更詳細的文章請看這里
使用前面的例子來使用這個對象,代碼如下:
//Invoking a grain method asynchronously Task joinGameTask = player.JoinGame(this); //The `await` keyword effectively turns the remainder of the method into a closure that will asynchronously execute upon completion of the Task being awaited without blocking the executing thread. await joinGameTask; //The next lines will be turned into a closure by the C# compiler. players.Add(playerId);

也可以連接兩個或者兩個以上的Task 然后組合一個Task來完成任務,這是一種比較有用的方式,等待所有的任務完成然后在統計,如下實例:
List<Task> tasks = new List<Task>(); ChirperMessage chirp = CreateNewChirpMessage(text); foreach (IChirperSubscriber subscriber in Followers.Values) { tasks.Add(subscriber.NewChirpAsync(chirp)); } Task joinedTask = Task.WhenAll(tasks); await joinedTask;



TaskDone.Done Utility Property

有沒有一個標准的方式返回一個void的返回值呢,答案是肯定的,Orleans 為我們定義一個助手類:TaskDone.Done

 

客戶端開發

一旦我們定義好一個grain的接口並且有相應的實現類,我們就可以開始編寫我們的客戶端代碼,引入相應的dll

  • Orleans.dll
  • OrleansRuntimeInterfaces.dll

幾乎任何一個客戶端都會涉及到grain 工廠方法的使用,使用這個方法通過一個特殊的Id來引用,正如已經提到的grain不能被顯式創建或者銷毀。

GrainClient.Initialize(); // Hardcoded player ID Guid playerId = new Guid("{2349992C-860A-4EDA-9590-000000000006}"); IPlayerGrain player = GrainClient.GrainFactory.GetGrain<IPlayerGrain>(playerId); IGameGrain game = player.CurrentGame.Result; var watcher = new GameObserver(); var observer = GrainClient.GrainFactory.CreateObjectReference<IGameObserver>(watcher); await game.SubscribeForGameUpdates();


如果這個應用程序代碼放在控制台應用程序的主線程中的話,你就需要調用wait()方法。
查看更多細節的關鍵概念部分關於使用任務執行調度和異常流的詳細信息。

查找或者創建一個Grains
通過調用GrainClient.Initialize()建立連接之后,在泛型工廠類中的靜態方法可以用於獲取一個grain的引用,例如:GrainClient.GrainFactory.GetGrain<IPlayerGrain>()
Grain接口作為GrainFactory.GetGrain<T>()的一個參數來傳遞。
給Grains發送消息
在orleans 的編程模型中客戶端與服務端的通訊類似於grain之間的通訊,區別是client的調用是可能是多線程調用,而grain之間被約束為單線程,客戶端庫使用TPL的線程池管理的延續和回調,
所以它是由客戶端來管理自己的並發,使用任何同步構建適合其環境–鎖、事件、TPL 任務,等等。
獲取通知
有一種情況,其中一個簡單的消息/響應模式是不夠的,客戶端需要接收異步通知。例如,一個用戶可能希望被通知當一個新的消息已發表的人。
觀察者是一個單向異步接口繼承自IGrainObserver,它的方法必須返回void,觀察者調用一個grain的接口方法就是發出一個消息通知,除了沒有返回值外,grain不依賴於任何結果,
Orleans將確保消息單向傳遞的,grain提供了相應訂閱/發布通知的API
訂閱通知必須首先創建一個實現了觀察者接口的對象,然后在grain factory里調用CreateObjectReference方法,轉向grain對象的引用,然后可以將其傳遞給通知grain的訂閱方法。
該模型也可以被其他grain用於接收異步通知。不像在客戶端訂閱的情況,訂閱grain只是實現Observer接口作為一個面,並通過對自身的引用 (e.g. this.AsReference<IChirperViewer>).
Example
這里是一個完整代碼的例子
namespace PlayerWatcher { class Program { /// <summary> /// Simulates a companion application that connects to the game /// that a particular player is currently part of, and subscribes /// to receive live notifications about its progress. /// </summary> static void Main(string[] args) { try { GrainClient.Initialize(); // Hardcoded player ID Guid playerId = new Guid("{2349992C-860A-4EDA-9590-000000000006}"); IPlayerGrain player = GrainClient.GrainFactory.GetGrain<IPlayerGrain>(playerId); IGameGrain game = null; while (game == null) { Console.WriteLine("Getting current game for player {0}...", playerId); try { game = player.CurrentGame.Result; if (game == null) // Wait until the player joins a game Thread.Sleep(5000); } catch (Exception exc) { Console.WriteLine("Exception: ", exc.GetBaseException()); } } Console.WriteLine("Subscribing to updates for game {0}...", game.GetPrimaryKey()); // Subscribe for updates var watcher = new GameObserver(); game.SubscribeForGameUpdates(GrainClient.GrainFactory.CreateObjectReference<IGameObserver>(watcher)).Wait(); // .Wait will block main thread so that the process doesn't exit. // Updates arrive on thread pool threads. Console.WriteLine("Subscribed successfully. Press <Enter> to stop."); Console.ReadLine(); } catch (Exception exc) { Console.WriteLine("Unexpected Error: {0}", exc.GetBaseException()); } } /// <summary> /// Observer class that implements the observer interface. /// Need to pass a grain reference to an instance of this class to subscribe for updates. /// </summary> private class GameObserver : IGameObserver { // Receive updates public void UpdateGameScore(string score) { Console.WriteLine("New game score: {0}", score); } } } }
 
運行應用程序
配置連接到Orleans
允許應用程序與外界交互的orleans grains,框架包括客戶端庫。此客戶端程序庫可能由桌面或移動應用程序使用,或由呈現交互式網頁或公開Web服務接口的前端服務器使用。
客戶端庫提供了一個API編寫異步客戶端於orleans交互。一旦客戶端庫連接到orleans的網關,客戶端可以發送郵件到grains,接收響應,通過觀察者得到grain異步通知。


連接網關
建立一個連接,客戶端通過調用GrainClient.Initialize(),他將連接到silo在配置文件(ClientConfiguration.xml )中指定的ip和端口,這個文件必須和
Orleans.dll放在同一個目錄下,作為一種替代,你也可以通過編程的方式加載一個配置文件然后通過GrainClient.Initialize() 初始化來使用。
配置客戶端
在客戶端配置文件中(ClientConfiguration.xml),指定網關端點的ip和端口,它需要去匹配silo中配置文件(OrleansConfiguration.xml )中指定的網關
<ClientConfiguration xmlns="urn:orleans"> <Gateway Address="<IP address or host name of silo>" Port="30000" /> </ClientConfiguration>

如果Orleans運行在Windows Azure,客戶端就會自動發現網關信息,而就不需要靜態配置了,可以參考這個實例 Azure application sample 

配置silos
在OrleansConfiguration.xml配置文件中,ProxyingGateway代理網關元素指定了silo的endpoint網關(gateway),inter-silo的地址定義是通過Networking元素來定義的,必須區分開
且要跟它指定不同的端口。
<?xml version="1.0" encoding="utf-8"?> <OrleansConfiguration xmlns="urn:orleans"> <Defaults> <Networking Address="" Port="11111" /> <ProxyingGateway Address="" Port="30000" /> </Defaults> </OrleansConfiguration>
grain 持久化
grain持久化目標
1.允許不同的grain類型使用不同的存儲類型providers(一個用Azure 表,一個用SQL Azure )相同類型的存儲提供程序,但具有不同的配置(兩個使用Azure表,但一個使用存儲帳戶)。
2.允許存儲提供程序實例配置可交換(例如,開發-測試-產品)只是配置文件的變化。
3.允許額外的存儲供應商被寫入后提供了一個框架,由orleans團隊或其他人。
4.提供一個最小的生產級存儲提供程序。
5.存儲提供程序完全控制了在持久化支持存儲中的數據狀態數據的存儲方式。
推論:Orleans沒有提供一個全面的ORM的存儲解決方案,但允在必要的時候許自定義存儲提供程序支持指定一個ORM的要求。
grain 持久化API
Grain類型可以用兩種方式中的一種聲明:
1.拓展Grain 如果他們沒有任何持久的狀態,或如果他們將處理所有自己的持久狀態,或者
2.Extend Grain<T> 如果他們有一些持久態,他們希望Orleans良運行時處理。
另外,通過拓展Grain<T> grain類型自動選擇Orleans框架的系統管理的持久化。

對於本節的其他部分,我們只會考慮選擇 #2 / Grain<T> ,因為條件限制。

Grain 狀態存儲
從Grain<T>繼承的grain類(其中T是從GrainState自動導出的存儲數據類型)將從指定的存儲中自動加載它們的狀態。

Grain將標有[StorageProvider]特性指定用於讀/寫Grain狀態的數據存儲提供程序實例的命名。
[StorageProvider(ProviderName="store1")] public class MyGrain<MyGrainState> ... { ... }

Orleans Provider 管理框架提供了一種機制,以指定和注冊silo配置文件不同的存儲提供和存儲操作。
<OrleansConfiguration xmlns="urn:orleans"> <Globals> <StorageProviders> <Provider Type="Orleans.Storage.MemoryStorage" Name="DevStore" /> <Provider Type="Orleans.Storage.AzureTableStorage" Name="store1" DataConnectionString="DefaultEndpointsProtocol=https;AccountName=data1;AccountKey=SOMETHING1" /> <Provider Type="Orleans.Storage.AzureBlobStorage" Name="store2" DataConnectionString="DefaultEndpointsProtocol=https;AccountName=data2;AccountKey=SOMETHING2" /> </StorageProviders>


配置存儲providers
AzureTableStorage
<Provider Type="Orleans.Storage.AzureTableStorage" Name="TableStore" DataConnectionString="UseDevelopmentStorage=true" />


以下屬性可以添加到<Provider / >元素來配置提供者:
  • DataConnectionString="..." (強制) - Azure 存儲連接字符串。
  • TableName="OrleansGrainState" (可選) - 表存儲中使用的表名稱,默認OrleansGrainState
  • DeleteStateOnClear="false" (可選) - 如果是true,當清除grain時記錄被刪除,否則將寫入一個空記錄,默認為false
  • UseJsonFormat="false" (可選) - 如果為true, 使用json序列化, 使用Orleans的二進制序列化, 默認 false
  • UseFullAssemblyNames="false" (可選) - (如果 UseJsonFormat="true") 如果為true則使用程序集全名,否則使用簡單名, 默認為 false
  • IndentJSON="false" (可選) - (如果 UseJsonFormat="true") 如果為true壓縮json序列化, 默認false

 

注:表存儲限制,state不應該超過64kb

 

AzureBlobStorage
<Provider Type="Orleans.Storage.AzureTableStorage" Name="BlobStore" DataConnectionString="UseDevelopmentStorage=true" />


以下屬性可以添加到<Provider/>元素來配置提供者:
  • DataConnectionString="..." (強制) - Azure 存儲連接字符串。
  • ContainerName="grainstate" (可選) - 使用blob 存儲容器, 默認 grainstate
  • UseFullAssemblyNames="false" (可選) - (如果 UseJsonFormat="true") 如果為true則使用程序集全名,否則使用簡單名, 默認為 false
  • IndentJSON="false" (可選) - 如果為true壓縮json序列化, 默認false

MemoryStorage

<Provider Type="Orleans.Storage.MemoryStorage" Name="MemoryStorage" />

注:provider 持久狀態保存到內存中,如果silo關閉就不復存在,所以僅工測試開發時使用。
  • NumStorageGrains="10" (可選) -用於存儲”狀態“的grain數, 默認 10

ShardedStorageProvider

<Provider Type="Orleans.Storage.ShardedStorageProvider" Name="ShardedStorage"> <Provider /> <Provider /> <Provider /> </Provider>


用於寫入在多個其他存儲提供程序中共享的Grain狀態數據的簡單存儲提供程序。

一致的散列函數(默認Jenkins Hash)來決定哪些碎片(按它們在配置文件中定義的順序)是負責為指定的grain存儲狀態數據,然后通過適當的底層provider執行 讀/寫/清除請求
注:Jenkins Hash算法 是一種算法,如果想要了解更多去網上搜索。

存儲provider注解

如果繼承了Grain<T>的grain的類沒有指定[StorageProvider] 特性,那么就搜索默認的provider,如果沒有找到,那么這將被視為一個丟棄的存儲provider。
如果在silo 配置文件中只有一個provider,它將被視為這個silo的默認provider。
一個了使用不存在但是在silo配置中定義的存儲provider的grain,當silo加載器加載失敗時,silo的其他部分仍然會加載,那么以后當調用這個grain時,將會拋出Orleans.Storage.BadProviderConfigException異常,指定這個grain無法加載。
存儲provider實例使用一個給定的grain類型是由在grain類型的[ StorageProvider]特性定義的存儲提供程序名稱的組合決定的,再加上provider的類型和在silo配置中確定確定的provider配置選項。
不同的grain類型可以使用不同的配置存儲provider,即使兩個都是同一類型的:例如:兩種不同的Azure 表存儲提provider的實例,連接到不同的Azure存儲賬號(請參見上面的配置文件示例)。

存儲provider的所有配置細節都是靜態定義的,在silo啟動時讀取到silo結構中,在這個時候沒有什么機制可以動態地更新或改變一個silo使用的存儲provider的列表。
然而,這是一個優先次序/工作負載的約束,而不是一個基本的設計約束。

State 存儲 APIS

grain 有兩個主要的部分 state / persistence APIs: Grain-to-Runtime and Runtime-to-Storage-Provider.

Grain State存儲API
在Orleans運行狀態存儲功能將為grain提供grain的讀寫操作和自動填充/保存 grain的GrainState數據對象。
在后台,通過適當的持久化provider配置將為grain提供這些功能連接(由Orleans客戶創工具在生成的代碼)。

Grain 狀態的讀寫功能
當Grain激活時,它的狀態將自動讀取,但是,grain是負責明確地觸發寫入任何改變的晶粒狀態,在必要時。
參見下面的錯誤處理機制的詳細信息的故障模式部分(Failure Modes)。
在OnActivateAsync()(等同於base.ReadStateAsync()的調用)方法的調用時將激活GrainState 的自動讀取,在任何調用grain的方法之前GrainState 的狀態是不會刷新,除非是,grain 激活后的這個調用。
任何grain的方法調用時,可以要求Orleans運行時寫當前grain的狀態數據,激活指定的存儲provider通過調用writestateasync()。當他們對狀態數據進行重大更新時,grain負責顯式地執行寫操作。
通常,grain的方法將返回base.writestateasync() Task 從grain方法返回結果Task 作為最終的結果,但它並不需要遵循這個模式。在任何grain方法后,運行時不會自動更新儲存的grain的狀態。
在grain的任何一種方法或計時器回調處理程序中,grain可以request奧爾良運行時重新讀取當前grain 狀態數據,通過調用base.readstateasync()激活從指定的存儲provider。
這將完全覆蓋當前存儲在當前狀態對象中的任何當前狀態數據,這些數據是由持久存儲的最新值讀取的。
(這句沒法翻譯了,直接上原文)
An opaque provider-specific Etag value (stringmay be set by a storage provider as part of the grain state metadata populated when state was read. Some providers may choose to leave this as null if they do not useEtags.

從概念上講,
任何寫操作期間Orleans運行時將以grain狀態數據對象的深層副本供自己使用。在重寫的情況下,運行時可以使用優化規則和啟發式,以避免在某些情況下,在某些情況下執行一些或所有的深拷貝,提供預期的邏輯隔離語義被保留。

Grain狀態的讀寫操作的示例代碼
為了加入Orleans的grain狀態持久性機制,必須拓展Grain類。
上述定義中的T將被此grain的特定應用程序特定的grain狀態類所取代;見下面的示例。
grain類也應該有一個[ storageprovider ]特性告訴運行時存儲provider(實例)使用這種類型的grain。
public interface MyGrainState : GrainState { public int Field1 { get; set; } public string Field2 { get; set; } } [StorageProvider(ProviderName="store1")] public class MyPersistenceGrain : Grain<MyGrainState>, IMyPersistenceGrain { ... }


Grain State Read

在grain的OnActivateAsync()方法調用之前,grain 的讀取狀態將會自動發生,沒有應用程序代碼使使這種情況發生是必需的。要查看這個狀態,可以通過grain 類的Grain<T>.State屬性。

Grain State Write

在對grain的內存狀態作出適當的改變,grain應該調用base.writestateasync()方法將更改寫入持久存儲,通過定義在存儲provider的grain。此方法是異步的,並返回一個Task。

public Task DoWrite(int val) { State.Field1 = val; return base.WriteStateAsync(); }


Grain State Refresh

如果一個grain希望顯式地重新從provider讀取這個grain的最新狀態,grain 應該調用base.ReadStateAsync() 方法,這將重新加載grain的狀態從持久存儲里,通過定義的存儲provider的grain類型,在grain狀態存儲器復制以往任何將被覆蓋和取代base.readstateasync()任務完成時。

 

public async Task<int> DoRead() { await base.ReadStateAsync(); return State.Field1; }


grain狀態持久化操作的失效模式
grain狀態讀取操作的失效模式

在初始讀取該特定grain的狀態數據時,存儲提供程序返回的故障將導致該grain的激活操作失敗;既然這樣,沒有必要再去調用grain的OnActivateAsync()在生命周期內,在grain激活過程中,由於任何其他故障,將原始請求原路返回。
常見的存儲provider為一個特定的grain讀取狀態數據會導致ReadStateAsync() Task 調用失敗。grain可以選擇處理或忽略,失敗的Task,在Orleans就像任何其他的Task。
任何試圖在加載silo期間沒有正確加載provider的框架內的grain發送消息,都將會收到一個異常Orleans.BadProviderConfigException

grain狀態寫入操作的故障模式
Failures encountered by the storage provider to write state data for a particular grain will result in theWriteStateAsync() Task to be faulted. Usually,
this will mean the grain call will be faulted back to the client caller provided the WriteStateAsync() Task is correctly chained in to the final return Task for this grain method. However,
it will be possible for certain advanced scenarios to write grain code to specifically handle such write errors,
just like they can handle any other faulted Task.

Grains that execute error-handling / recovery code must catch exceptions / faulted WriteStateAsync()Tasks and not re-throw to
signify that they have successfully handled the write error.

注意:此段對於程序中的細節處理非常重要


Storage Provider Framework

有編寫額外的持久化provider的服務提供程序接口 – IStorageProvider。持久性提供程序API包括讀和寫操作grainstate數據。

public interface IStorageProvider { Logger Log { get; } Task Init(); Task Close(); Task ReadStateAsync(string grainType, GrainId grainId, GrainState grainState); Task WriteStateAsync(string grainType, GrainId grainId, GrainState grainState); }


Storage Provider Semantics

任何試圖執行寫操作時,存儲provider檢測ETag違反約束導致寫的Task被終端,就使用Orleans.InconsistentStateException包裝底層存儲異常。

public class InconsistentStateException : AggregateException { /// <summary>The Etag value currently held in persistent storage.</summary> public string StoredEtag { get; private set; } /// <summary>The Etag value currently held in memory, and attempting to be updated.</summary> public string CurrentEtag { get; private set; } public InconsistentStateException( string errorMsg, string storedEtag, string currentEtag, Exception storageException ) : base(errorMsg, storageException) { this.StoredEtag = storedEtag; this.CurrentEtag = currentEtag; } public InconsistentStateException(string storedEtag, string currentEtag, Exception storageException) : this(storageException.Message, storedEtag, currentEtag, storageException) { } }


Data Mapping

個人存儲provider必須決定如何最好地儲存grain的狀態–BLOB(各種格式/序列化格式)或列每場是顯而易見的選擇。

對於table的基本存儲provider編碼狀態數據字段到使用orlenas二進制序列化單表列。



Application Bootstrapping within a Silo

當silos上線在應用程序需要運行一些“自動執行”功能的幾種情況。

我們現在已經增加了支持這種自動運行功能通過配置“orleans silo provider”。例如:

<OrleansConfiguration xmlns="urn:orleans"> <Globals> <BootstrapProviders> <Provider Type="My.App.BootstrapClass1" Name="bootstrap1" /> <Provider Type="My.App.BootstrapClass2" Name="bootstrap2" /> </BootstrapProviders> </Globals> </OrleansConfiguration>


它也可以注冊programaticaly引導供provider,通過調用:
public void RegisterBootstrapProvider(string providerTypeFullName, string providerName, IDictionary<string, string> properties = null) public void RegisterBootstrapProvider<T>(string providerName, IDictionary<string, string> properties = null) where T : IBootstrapProvider 

Orleans.Runtime.Configuration.GlobalConfiguration class.

這個啟動項provider 是實現了Orleans.Providers.IBootstrapProvider接口
當silo 啟動時,Orleans 運行時就是實例化bootstrap 類列表,在適當的運行上下文執行時調用他們的init方法,作為一個客戶端發送消息到grains
Task Init( string name, IProviderRuntime providerRuntime, IProviderConfiguration config)


例外情況是,當silo啟動調用silo拋出異常時,這個silo就會停止。

This fail-fast approach is the standard way that Orleans handles silo start-up issues,
and is intended to allow any problems with silo configuration and/or bootstrap logic to be easily
detected during testing phases rather than being silently ignored and causing unexpected problems later in the silo lifecycle.


定時器和提醒(Timers and reminders)

Orleans 運行時提供了兩個機制:定時器和提醒的功能,允許grain周期性的執行某一個行為。

Timers

描述:定時器是用來創建周期性grain的行為,不需要跨多個激活的grain實例。它本質上是一組.NET System.Threading.Timer 類,另外它是受單線程執行保護的grain激活。

每個激活可能有與它相關聯的零或更多的計時器,運行時在激活的運行時上下文中執行每個定時程序。

用法:開始一個定時器,要使用 Grain.RegisterTimer 方法, 返回一個實現 IDisposable 引用的實例

protected IDisposable RegisterTimer(Func<object, Task> asyncCallback, object state, TimeSpan dueTime, TimeSpan period)

asyncCallback 當定時器到執行時間的時候調用的一個函數或者功能。
state 當執行asyncCallback 時傳入的一個對象
dueTime 指定定時器開始執行時第一次執行的等待時間。
period 指定定時器執行的時間間隔


取消定時器的處理:當激活被deactivated 或當一個故障發生時,一個定時器將停止觸發,並且它的silo崩潰。

重要的考慮因素:
  • When activation collection is enabled, the execution of a timer callback does not change the activation’s state from idle to in use. This means that a timer cannot be used to postpone deactivation of otherwise idle activations.
  • The period passed to Grain.RegisterTimer is the amount of time that passes from the moment the Task returned by asyncCallback is resolved to the moment that the next invocation of asyncCallback should occur. This not only makes it impossible for successive calls to asyncCallback to overlap but also makes it so that the length of time asyncCallback takes to complete affects the frequency at whichasyncCallback is invoked. This is an important deviation from the semantics of System.Threading.Timer.
  • Each invocation of asyncCallback is delivered to an activation on a separate turn and will never run concurrently with other turns on the same activation. Note however, asyncCallback invocations are not delivered as messages and are thus not subject to message interleaving semantics. This means that invocations of asyncCallback should be considered to behave as if running on a reentrant grain with respect to other messages to that grain.


Reminders

提醒類似於定時器,但是有幾個重要的區別

描述:

使用:

  • 提醒持續存在,並將繼續觸發在所有情況下(包括部分或全部集群重新啟動)除非明確取消。
  • 提醒與一個grain,沒有任何特定的激活.
  • If a grain has no activation associated with it and a reminder ticks, one will be created. e.g.: If an activation becomes idle and is deactivated, a reminder associated with the same grain will reactivate the grain when it ticks next.
  • Reminders are delivered by message and are subject to the same interleaving semantics as all other grain methods.
  • 提醒不應該被用於高頻定時器,它們的周期應該在幾分鍾,幾小時或幾天內測量。

 

Configuration

 提醒,是持久的,依賴於存儲到功能。您必須指定在提醒子系統功能之前使用的存儲支持。提醒功能是通過在服務器端配置的systemstore元控制。它與Azure Table 或 SQL Server 存儲一起協作。
<SystemStore SystemStoreType="AzureTable" /> OR <SystemStore SystemStoreType="SqlServer" />



如果你只是想提醒占個位置來運作,而不需要建立一個Azure帳戶或SQL數據庫,那么添加此元素的配置文件(“Globals”)會給你一個發展的唯一實現的提醒系統:
<ReminderService ReminderServiceType="ReminderTableGrain"/>

使用:如果一個grain要使用提醒功能就必須實現接口IRemindable內的方法RecieveReminder 
Task IRemindable.ReceiveReminder(string reminderName, TickStatus status) { Console.WriteLine("Thanks for reminding me-- I almost forgot!"); return TaskDone.Done; }

去啟動一個提醒功能,使用Grain.RegisterOrUpdateReminder方法,然后他會返回IOrleansReminder的一個對象
protected Task<IOrleansReminder> RegisterOrUpdateReminder(string reminderName, TimeSpan dueTime, TimeSpan period)
  • reminderName 是一個唯一的標識,在grain上下文范圍內。
  • dueTime 指定第一次提醒多久開始執行。
  • period 指定執行間隔。

由於激活的生命周期就是grain唯一的生命周期,所以你必須明確的取消,使用Grain.UnregisterReminder:

protected Task UnregisterReminder(IOrleansReminder reminder)


 Grain.RegisterOrUpdateReminder.方法的調用會返回提醒操作handler

如果你想在一個持續的方式識別一個提醒,使用一個包含提醒的名稱的字符串的IOrleansReminder 實例不能保證有效期超出一個激活的壽命。
如果你只是想通過提醒的name來的到一個提醒實例的話,調用Grain.GetReminder方法。
protected Task<IOrleansReminder> GetReminder(string reminderName)

我們該用那個一個功能呢?
1.在下面幾種情況使用定時器
  • It doesn’t matter (or is desirable) that the timer ceases to function if the activation is deactivated or failures occur.
  • If the resolution of the timer is small (e.g. reasonably expressible in seconds or minutes).
  • The timer callback can be started from Grain.OnActivateAsync or when a grain method is invoked.
2.這幾種情況下使用提醒功能
  • When the periodic behavior needs to survive the activation and any failures.
  • To perform infrequent tasks (e.g. reasonably expressible in minutes, hours, or days).
組合提醒和定時器
You might consider using a combination of reminders and timers to accomplish your goal.
For example, if you need a timer with a small resolution that needs to survive across activations,
you can use a reminder that runs every five minutes, whose purpose is to wake up
a grain that restarts a local timer that may have been lost due to a deactivation.



 

Orleans Streams

Orleans v.1.0.0添加流擴展的編程模型支持,流擴展提供了一組抽象接口api,使它的流更簡單和更強大的,流擴展允許開發人員以結構化的方式寫操作在一系列事件上的應用程序。

流provider的可擴展性模型使得在廣泛的現有的隊列技術的編程模型兼容和方便,例如: Event Hubs,ServiceBusAzure Queues, 和 Apache Kafka. 而不必要寫特殊代碼或運行專用程序與這樣的隊列進行交互。

我為什么要關心?

如果你已經了解了一些關於流處理的技術如:Event HubsKafka,Azure Stream AnalyticsApache StormApache Spark Streaming, 和Reactive Extensions (Rx) 

你可能會問你為什么要關心,為什么我們需要另一個流處理系統和Actor是如何相關的流?“Why Orleans Streams?”可以回答你的問題

編程模型

下面是一些Orleans 流編程模型原則:

  1. 在Orleans 的體系里 Orleans virtual actors, Orleans 流是虛擬的,也就是說,一個流總是存在的。它不是顯式創建或銷毀,它永遠不會失敗。
  2. 流的身份ID識別,這只是邏輯名稱由GUID字符串表示。
  3. Orleans 流數據的生成將允許從時間和空間上減弱它們的依賴性。這意味着流的生產者和流的消費者可能會在不同的服務器上,在不同的時間,並將承受失敗。
  4. Orleans 的流是輕量級和動態的,Orleans 流運行時的設計是處理高速大數據。
  5. Orleans 流是動態綁定的,Orleans 流運行時的設計處理的情況下,grain連接和斷開流在一個較高的速度。
  6. Orleans 流運行時透明地管理流消費的生命周期。當應用程序訂閱一個流,然后它將接收流的事件,即使是在出現故障。
  7. Orleans 流均勻分布在grain和 工作的客戶端。

編程APIs

應用程序流訪問APIs類似於眾所周知的 Reactive Extensions (Rx) in .NET,通過使用 Orleans.Streams.IAsyncStream<T> 實現了Orleans.Streams.IAsyncObserver<T> 和 Orleans.Streams.IAsyncObservable<T> 的接口。
在下面的一個典型例子產生的一些數據,這是一個HTTP請求的服務在雲中運行的請求。Orleans客戶端運行在前端服務器接收HTTP調用和發布數據流匹配裝置:
public async Task OnHttpCall(DeviceEvent deviceEvent) { // Post data directly into device's stream. IStreamProvider streamProvider = GrainClient.GetStreamProvider("myStreamProvider"); IAsyncStream<DeviceEventData> deviceStream = streamProvider.GetStream<DeviceEventData>(deviceEvent.DeviceId); await chatStream.OnNextAsync(deviceEvent.Data); }



另一個例子是在聊天的用戶(如Orleans 的grain)實施加入聊天室,得到一個處理流的聊天信息在這房間里的所有其他用戶產生和訂閱它。注意,聊天用戶既不需要知道聊天室的grain本身(可能不會有這樣的grain在我們的系統中),也不是在該組的其他用戶產生消息。不用說,對產生的聊天流,用戶不需要知道誰是目前訂閱的流。這演示了如何聊天用戶可以完全解耦的時間和空間。

public class ChatUser: Grain { public async Task JoinChat(string chatGroupName) { IStreamProvider streamProvider = base.GetStreamProvider("myStreamProvider"); IAsyncStream<string> chatStream = streamProvider.GetStream<string>(chatGroupName); await chatStream.SubscribeAsync((string chatEvent) => Console.Out.Write(chatEvent)); } }



快速入門示例

Quick Start Sample (這里是一個演示的例子),Streams Programming APIs (流編程模型APIs)

Stream Providers

流可以通過各種形狀和形式的物理信道,可以有不同的語義。

Orleans 流的設計是支持多種Stream Providers的概念,這是系統中的一個可拓展點,Orleans 目前提供兩種Stream providers。

基本的Tcp 通信Simple Message Stream Provider和雲隊列Azure Queue Stream Provider,你可以在這里(Stream Providers)找到更消息的介紹

Stream 意義

Stream Subsription Semantics: Orleans Streams guarantee Sequential Consistency for Stream Subsription operations. Specificaly, when consumer subscribes to a stream, once the Task representing the subsription operation was successfuly resolved, the consumer will see all events that were generated after it has subscribed. In addition, Rewindable streams allow to subscribe from an arbitrary point in time in the past by using StreamSequenceToken (more details can be found here).

Individual Stream Events Delivery Guarantees: Individual event delivery guarantees depend on individual stream providers. Some provide only best-effort at-most-once delivery (such as Simple Message Streams), while others provide at-least-once delivery (such as Azure Queue Streams). It is even possible to build a stream provider that will guarantee exactly-once delivery (we don’t have such a provider yet, but it is possible to build one with the extensability model).

Events Delivery Order: Event order also depends on a particular stream provider. In SMS streams, the producer explicitelly controls the order of events seen by the consumer by controlling the way it publishes them. Azure Queue streams do not guarantee FIFO order, since the underlaying Azure Queues do not guarantee order in failure cases. Applications can also control their own stream delivery ordering, by usingStreamSequenceToken.

 

流實施

 Orleans Streams Implementation提供了一個高層次的內部實施概述。

Streams 的可拓展性

Orleans Streams Extensibility 描述如何用新的功能擴展流。

Code Samples

更多的例子:here.
更多的資料:

 

調試符號

在開發期間Orleans調試比較簡單,直接附加進程,但是對於在生產環境來說,就無法斷點調試了,采用跟蹤是最好的辦法。

標記(Symbols):

Symbols for Orleans binaries are published to https://nuget.smbsrc.net symbols server. Add it to the list of symbols server in the Visual Studio options under Debugging/Symbols for debugging Orleans code. Make sure there is traling slash in the URL. Visual Studio 2015 has a bug with parsing it.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM