1. 引言
是的,Orleans v3.0.0 已經發布了,並已經完全支持 .NET Core 3.0。
所以,Orleans 系列是時候繼續了,抱歉,讓大家久等了。
萬丈高樓平地起,這一節我們就先來了解下Orleans的基本使用。
2. 模板項目講解
在上一篇文章中,我們了解到Orleans 作為.NET 分布式框架,其主要包括三個部分:Client、Grains、Silo Host(Server)。因此,為了方便講解,創建如下的項目結構進行演示:
這里有幾點需要說明:
- Orleans.Grains: 類庫項目,用於定義Grain的接口以及實現,需要引用
Microsoft.Orleans.CodeGenerator.MSBuild
和Microsoft.Orleans.Core.Abstractions
NuGet包。 - Orleans.Server:控制台項目,為 Silo 宿主提供宿主環境,需要引用
Microsoft.Orleans.Server
和Microsoft.Extensions.Hosting
NuGet包,以及Orleans.Grains
項目。 - Orleans.Client:控制台項目,用於演示如何借助Orleans Client建立與Orleans Server的連接,需要引用
Microsoft.Orleans.Client
和Microsoft.Extensions.Hosting
NuGet包,同時添加Orleans.Grains
項目引用。
3. 第一個Grain
Grain作為Orleans的第一公民,以及Virtual Actor的實際代言人,想吃透Orleans,那Grain就是第一道坎。
先看一個簡單的Demo,我們來模擬統計網站的實時在線用戶。
在Orlean s.Grains
添加ISessionControl
接口,主要用戶登錄狀態的管理。
public interface ISessionControlGrain : IGrainWithStringKey
{
Task Login(string userId);
Task Logout(string userId);
Task<int> GetActiveUserCount();
}
可以看見Grain的定義很簡單,只需要指定繼承自IGrain的接口就好。這里面繼承自IGrainWithStringKey
,說明該Grain 的Identity Key(身份標識)為string
類型。同時需要注意的是
Grain 的方法申明,返回值必須是: Task、Task
緊接着定義SessionControlGrain
來實現ISessionControlGrain
接口。
public class SessionControlGrain : Grain, ISessionControlGrain
{
private List<string> LoginUsers { get; set; } = new List<string>();
public Task Login(string userId)
{
//獲取當前Grain的身份標識(因為ISessionControlGrain身份標識為string類型,GetPrimaryKeyString());
var appName = this.GetPrimaryKeyString();
LoginUsers.Add(userId);
Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");
return Task.CompletedTask;
}
public Task Logout(string userId)
{
//獲取當前Grain的身份標識
var appName = this.GetPrimaryKey();
LoginUsers.Remove(userId);
Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");
return Task.CompletedTask;
}
public Task<int> GetActiveUserCount()
{
return Task.FromResult(LoginUsers.Count);
}
}
實現也很簡單,Grain的實現要繼承自Grain
基類。代碼中我們定義了一個List<string>
集合用於保存登錄用戶。
4. 第一個Silo Host(Server)
定義一個Silo用於暴露Grain提供的服務,在Orleans.Server.Program
中添加以下代碼用於啟動Silo Host。
static Task Main(string[] args)
{
Console.Title = typeof(Program).Namespace;
// define the cluster configuration
return Host.CreateDefaultBuilder()
.UseOrleans((builder) =>
{
builder.UseLocalhostClustering()
.AddMemoryGrainStorageAsDefault()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = "Hello.Orleans";
options.ServiceId = "Hello.Orleans";
})
.Configure<EndpointOptions>(options => options.AdvertisedIPAddress = IPAddress.Loopback)
.ConfigureApplicationParts(parts =>
parts.AddApplicationPart(typeof(ISessionControlGrain).Assembly).WithReferences());
}
)
.ConfigureServices(services =>
{
services.Configure<ConsoleLifetimeOptions>(options =>
{
options.SuppressStatusMessages = true;
});
})
.ConfigureLogging(builder => { builder.AddConsole(); })
.RunConsoleAsync();
}
Host.CreateDefaultBuilder()
:創建泛型主機提供宿主環境。UseOrleans
:用來配置Oleans。UseLocalhostClustering()
:用於在開發環境下指定連接到本地集群。Configure<ClusterOptions>
:用於指定連接到那個集群。Configure<EndpointOptions>
:用於配置silo與silo、silo與client之間的通信端點。開發環境下可僅指定回環地址作為集群間通信的IP地址。ConfigureApplicationParts()
:用於指定暴露哪些Grain服務。
以上就是開發環境下,Orleans Server的基本配置。對於詳細的配置也可以先參考Orleans Server Configuration。后續也會有專門的一篇文章來詳解。
5. 第一個Client
客戶端的定義也很簡單,主要是創建IClusterClient
對象建立於Orleans Server的連接。因為IClusterClient
最好能在程序啟動之時就建立連接,所以可以通過繼承IHostedService
來實現。
在Orleans.Client
中定義ClusterClientHostedService
繼承自IHostedService
。
public class ClusterClientHostedService : IHostedService
{
public IClusterClient Client { get; }
private readonly ILogger<ClusterClientHostedService> _logger;
public ClusterClientHostedService(ILogger<ClusterClientHostedService> logger, ILoggerProvider loggerProvider)
{
_logger = logger;
Client = new ClientBuilder()
.UseLocalhostClustering()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = "Hello.Orleans";
options.ServiceId = "Hello.Orleans";
})
.ConfigureLogging(builder => builder.AddProvider(loggerProvider))
.Build();
}
public Task StartAsync(CancellationToken cancellationToken)
{
var attempt = 0;
var maxAttempts = 100;
var delay = TimeSpan.FromSeconds(1);
return Client.Connect(async error =>
{
if (cancellationToken.IsCancellationRequested)
{
return false;
}
if (++attempt < maxAttempts)
{
_logger.LogWarning(error,
"Failed to connect to Orleans cluster on attempt {@Attempt} of {@MaxAttempts}.",
attempt, maxAttempts);
try
{
await Task.Delay(delay, cancellationToken);
}
catch (OperationCanceledException)
{
return false;
}
return true;
}
else
{
_logger.LogError(error,
"Failed to connect to Orleans cluster on attempt {@Attempt} of {@MaxAttempts}.",
attempt, maxAttempts);
return false;
}
});
}
public async Task StopAsync(CancellationToken cancellationToken)
{
try
{
await Client.Close();
}
catch (OrleansException error)
{
_logger.LogWarning(error, "Error while gracefully disconnecting from Orleans cluster. Will ignore and continue to shutdown.");
}
}
}
代碼講解:
- 構造函數中通過借助
ClientBuilder()
來初始化IClusterClient
。其中UseLocalhostClustering()
用於連接到開發環境中的localhost 集群。並通過Configure<ClusterOptions>
指定連接到哪個集群。(需要注意的是,這里的ClusterId必須與Orleans.Server中配置的保持一致。
Client = new ClientBuilder()
.UseLocalhostClustering()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = "Hello.Orleans";
options.ServiceId = "Hello.Orleans";
})
.ConfigureLogging(builder => builder.AddProvider(loggerProvider))
.Build();
- 在
StartAsync
方法中通過調用Client.Connect
建立與Orleans Server的連接。同時定義了一個重試機制。
緊接着我們需要將ClusterClientHostedService
添加到Ioc容器,添加以下代碼到Orleans.Client.Program
中:
static Task Main(string[] args)
{
Console.Title = typeof(Program).Namespace;
return Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddSingleton<ClusterClientHostedService>();
services.AddSingleton<IHostedService>(_ => _.GetService<ClusterClientHostedService>());
services.AddSingleton(_ => _.GetService<ClusterClientHostedService>().Client);
services.AddHostedService<HelloOrleansClientHostedService>();
services.Configure<ConsoleLifetimeOptions>(options =>
{
options.SuppressStatusMessages = true;
});
})
.ConfigureLogging(builder =>
{
builder.AddConsole();
})
.RunConsoleAsync();
}
對於ClusterClientHostedService
,並沒有選擇直接通過services.AddHostedService<T>
的方式注入,是因為我們需要注入該服務中提供的IClusterClient
(單例),以供其他類去消費。
緊接着,定義一個HelloOrleansClientHostedService
用來消費定義的ISessionControlGrain
。
public class HelloOrleansClientHostedService : IHostedService
{
private readonly IClusterClient _client;
private readonly ILogger<HelloOrleansClientHostedService> _logger;
public HelloOrleansClientHostedService(IClusterClient client, ILogger<HelloOrleansClientHostedService> logger)
{
_client = client;
_logger = logger;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
// 模擬控制台終端用戶登錄
await MockLogin("Hello.Orleans.Console");
// 模擬網頁終端用戶登錄
await MockLogin("Hello.Orleans.Web");
}
/// <summary>
/// 模擬指定應用的登錄
/// </summary>
/// <param name="appName"></param>
/// <returns></returns>
public async Task MockLogin(string appName)
{
//假設我們需要支持不同端登錄用戶,則只需要將項目名稱作為身份標識。
//即可獲取一個代表用來維護當前項目登錄狀態的的單例Grain。
var sessionControl = _client.GetGrain<ISessionControlGrain>(appName);
ParallelLoopResult result = Parallel.For(0, 10000, (index) =>
{
var userId = $"User-{index}";
sessionControl.Login(userId);
});
if (result.IsCompleted)
{
//ParallelLoopResult.IsCompleted 只是返回所有循環創建完畢,並不保證循環的內部任務創建並執行完畢
//所以,此處手動延遲5秒后再去讀取活動用戶數。
await Task.Delay(TimeSpan.FromSeconds(5));
var activeUserCount = await sessionControl.GetActiveUserCount();
_logger.LogInformation($"The Active Users Count of {appName} is {activeUserCount}");
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Closed!");
return Task.CompletedTask; ;
}
}
代碼講解:
這里定義了一個MockLogin
用於模擬不同終端10000個用戶的並發登錄。
- 通過構造函數注入需要的
IClusterClient
。 - 通過指定Grain接口以及身份標識,就可以通過Client 獲取對應的Grain,進而消費Grain中暴露的方法。
var sessionControl = _client.GetGrain<ISessionControlGrain>(appName);
這里需要注意的是,指定的身份標識為終端應用的名稱,那么在整個應用生命周期內,將有且僅有一個代表這個終端應用的Grain。 - 使用
Parallel.For
模擬並發 ParallelLoopResult.IsCompleted
只是返回所有循環任務創建完畢,並不代表循環的內部任務執行完畢。
6. 啟動第一個 Orleans 應用
先啟動Orleans.Server
:
再啟動Orleans.Client
:
從上面的運行結果來看,模擬兩個終端10000個用戶的並發登錄,最終輸出的活動用戶數量均為10000個。
回顧整個實現,並沒有用到諸如鎖、並發集合等避免並發導致的線程安全問題,但卻輸出正確的期望結果,這就正好說明了Orleans強大的並發控制特性。
public class SessionControlGrain : Grain, ISessionControlGrain
{
// 未使用並發集合
private List<string> LoginUsers { get; set; } = new List<string>();
public Task Login(string userId)
{
//獲取當前Grain的身份標識(因為ISessionControlGrain身份標識為string類型,GetPrimaryKeyString());
var appName = this.GetPrimaryKeyString();
LoginUsers.Add(userId);//未加鎖
Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");
return Task.CompletedTask;
}
....
}
7. 小結
通過簡單的演示,想必你對Orleans的編程實現有了基本的認知,並體會到其並發控制的強大之處。
這只是簡單的入門演練,Orleans很多強大的特性,后續再結合具體場景進行詳細闡述。
源碼已上傳至GitHub:Hello.Orleans