時代在變,技術也在更新迭代。從傳統的單體應用架構到現在的分布式集群架構,在技術的學習上真的是一點都不能松懈。
網上關於微服務與Consul的話題太多了,我在這里不做過多描述。
其實就是在微服務中我們可以利用Consul可以實現服務的發現、治理、健康檢查等...
用它先下載它:
我此番在windows下操作,打開下載的Consul所在文件夾,輸入 consul.exe agent -dev
Consul的默認啟動端口為8500,如果能正常顯示頁面則啟動成功。
新建解決方案,建立一個.Net Core MVC的項目和一個.Net Core WebApi的項目。 安裝NuGet包Consul
首先Api端服務實例啟動時需到Consul中進行服務注冊,Web Client直接與Consul進行連接,從Consul中拿到服務實例並配置策略及發送http請求等。
如圖所示:
Consul每隔一段時間就會調用一次注冊的服務實例進行健康檢查。
在Api項目中新建一個IConfiguration的擴展方法:
public static void ConsulExtend(this IConfiguration configuration) { ConsulClient client = new ConsulClient(m => { m.Address = new Uri("http://localhost:8500/"); m.Datacenter = "dc1"; }); //啟動的時候在consul中注冊實例服務 //在consul中注冊的ip,port string ip = configuration["ip"]; int port = int.Parse(configuration["port"]); int weight = string.IsNullOrWhiteSpace(configuration["weight"]) ? 1 : int.Parse(configuration["weight"]); client.Agent.ServiceRegister(new AgentServiceRegistration() { ID = "service" + Guid.NewGuid(),//唯一的 Name = "MicroserviceAttempt",//組(服務)名稱 Address = ip, Port = port,//不同的端口=>不同的實例 Tags = new string[] { weight.ToString() },//標簽 Check = new AgentServiceCheck()//服務健康檢查 { Interval = TimeSpan.FromSeconds(12),//間隔12s一次 檢查 HTTP = $"http://{ip}:{port}/Api/Health/Index", Timeout = TimeSpan.FromSeconds(5),//檢測等待時間 DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(20)//失敗后多久移除 } }); Console.WriteLine($"{ip}:{port}--weight:{weight}"); }
心跳檢查的接口:
[ApiController] [Route("api/[controller]/[action]")] public class HealthController : Controller { readonly IConfiguration _configuration; public HealthController(IConfiguration configuration) { _configuration = configuration; } [HttpGet] public IActionResult Index() { //心跳,consul會每隔幾秒調一次 Console.WriteLine($"{ _configuration["port"]} Invoke"); return Ok(); } }
在Startup類中的Configure方法加入:
//啟動時注冊,且注冊一次 this.Configuration.ConsulExtend();
將api項目啟動(三個端口)
dotnet ServicesInstances.dll --urls="http://*:5726" --ip="127.0.0.1" --port=5726 dotnet ServicesInstances.dll --urls="http://*:5727" --ip="127.0.0.1" --port=5727 dotnet ServicesInstances.dll --urls="http://*:5728" --ip="127.0.0.1" --port=5728
接着是web端,新建控制器:
public class UserController : Controller {readonly HttpSender _httpSender; public UserController(HttpSender httpSender) { _httpSender = httpSender; } //暫不考慮線程安全 private static int index = 0; public async Task<IActionResult> Index() { #region nginx版 只知道nginx地址就行了 //var str = await _httpSender.InvokeApi("http://localhost:8088/api/User/GetCustomerUser"); #endregion #region consul //new一個consul實例 ConsulClient client = new ConsulClient(m => { new Uri("http://localhost:8500/"); m.Datacenter = "dc1"; }); //與consul進行通信(連接),得到consul中所有的服務實例 var response = client.Agent.Services().Result.Response; string url = "http://MicroserviceAttempt/api/User/GetCustomerUser"; Uri uri = new Uri(url); string groupName = uri.Host; AgentService agentService = null;//服務實例 var serviceDictionary = response.Where(m => m.Value.Service.Equals(groupName, StringComparison.OrdinalIgnoreCase)).ToArray();//找到的全部服務實例 //{ // agentService = serviceDictionary[0].Value; //} { //輪詢策略=>達到負載均衡的目的 agentService = serviceDictionary[index++ % 3].Value; } { //平均策略(隨機獲取索引--相對平均)=>達到負載均衡的目的 agentService = serviceDictionary[new Random(index++).Next(0, serviceDictionary.Length)].Value; } { //權重策略,給不同的實例分配不同的壓力,注冊時提供權重 List<KeyValuePair<string, AgentService>> keyValuePairs = new List<KeyValuePair<string, AgentService>>(); foreach (var item in keyValuePairs) { int count = int.Parse(item.Value.Tags?[0]);//在服務注冊的時候給定權重數量 for (int i = 0; i < count; i++) { keyValuePairs.Add(item); } } agentService = keyValuePairs.ToArray()[new Random(index++).Next(0, keyValuePairs.Count())].Value; } url = $"{uri.Scheme}://{agentService.Address}:{agentService.Port}{uri.PathAndQuery}"; string content = await _httpSender.InvokeApi(url); #endregion return View(JsonConvert.DeserializeObject<CustomerUser>(content)); } }
public class HttpSender { public async Task<string> InvokeApi(string url) { using (HttpClient client = new HttpClient()) { HttpRequestMessage message = new HttpRequestMessage(); message.Method = HttpMethod.Get; message.RequestUri = new Uri(url); var result = client.SendAsync(message).Result; string content = result.Content.ReadAsStringAsync().Result; return content; } } }
啟動web項目,訪問User-Index這個視圖,會輪詢不同的服務實例。
但是這樣做不好,客戶端都需要和Consul進行連接,拿到所有的服務實例,直接和服務實例進行交互,服務實例就暴露了--所以需要網關。
網關將服務實例與客戶端進行隔離,是所有Api請求的入口。因此可以統一鑒權。當然微服務網關的作用有很多,大家可自行百度了解。
新建一個網關的項目,請求先到達網關,再由網關分發請求到不同的實例。如圖:
Consul整合GeteWay.引入NuGet包:Ocelot、Ocelot.Provider.Consul
修改Startup類:
public class Startup { public Startup(IConfiguration configuration) { Configuration = configuration; } public IConfiguration Configuration { get; } // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { services.AddOcelot().AddConsul(); //services.AddControllers(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { //將默認的請求管道全部丟掉 app.UseOcelot(); //if (env.IsDevelopment()) //{ // app.UseDeveloperExceptionPage(); //} //app.UseHttpsRedirection(); //app.UseRouting(); //app.UseAuthorization(); //app.UseEndpoints(endpoints => //{ // endpoints.MapControllers(); //}); } }
修改Program類:
public class Program { public static void Main(string[] args) { CreateHostBuilder(args).Build().Run(); } public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureAppConfiguration(conf => { conf.AddJsonFile("configuration.json", optional: false, reloadOnChange: true); }) .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup<Startup>(); }); }
新建一個configuration.json的文件,用於配置策略等...
Routes中路由規則可以配置多個
{//*************************單地址多實例負載均衡+Consul***************************** "Routes": [ { //GeteWay轉發=>Downstream "DownstreamPathTemplate": "/api/{url}", //服務地址--url變量 "DownstreamScheme": "http", //http://localhost:6299/T5/User/GetCustomerUser "UpstreamPathTemplate": "/T5/{url}", //網關地址--url變量 沖突的還可以加權重Priority "UpstreamHttpMethod": [ "Get", "Post" ], "UseServiceDiscovery": true, //使用服務發現 "ServiceName": "MicroserviceAttempt", //Consul服務名稱 "LoadBalancerOptions": { "Type": "RoundRobin" //輪詢 //"LeastConnection":最少連接數服務器 "NoloadBalance":不負載均衡 "CookieStickySession":會話粘滯 } } ], "GlobalConfiguration": { "BaseUrl": "http://127.0.0.1:6299", "ServiceDiscoveryProvider": { "Host": "localhost", "Port": 8500, "Type": "Consul"//由Consul提供服務發現,每次請求去Consul } //"ServiceDiscoveryProvider": { // "Host": "localhost", // "Port": 8500, // "Type": "PollConsul", //由Consul提供服務發現,每次請求去Consul // "PollingInterval": 1000//輪詢Consul,評率毫秒--down是不知道的 //} } //*************************單地址多實例負載均衡+Consul***************************** }
啟動網關(項目)服務:
dotnet GateWay-Ocelot.dll --urls="http://*:6299" --ip="127.0.0.1" --port=6299
調用服務接口:
每請求一次就輪詢不同的服務實例,達到負載均衡。
服務治理、熔斷、降級
服務治理-緩存
引入NuGet包:Ocelot.Cache.Cache
修改ConfigureServices方法:
public void ConfigureServices(IServiceCollection services) { services.AddOcelot().AddConsul() .AddCacheManager(m => { m.WithDictionaryHandle();//默認字典存儲 }); //services.AddControllers(); }
修改configuration.json文件
"Routes": [ { //GeteWay轉發=>Downstream "DownstreamPathTemplate": "/api/{url}", //服務地址--url變量 "DownstreamScheme": "http", //http://localhost:6299/T5/User/GetCustomerUser "UpstreamPathTemplate": "/T5/{url}", //網關地址--url變量 沖突的還可以加權重Priority "UpstreamHttpMethod": [ "Get", "Post" ], "UseServiceDiscovery": true, //使用服務發現 "ServiceName": "MicroserviceAttempt", //Consul服務名稱 "LoadBalancerOptions": { "Type": "RoundRobin" //輪詢 //"LeastConnection":最少連接數服務器 "NoloadBalance":不負載均衡 "CookieStickySession":會話粘滯 }, //使用緩存 "FileCacheOptions": { "TtlSeconds": 15,//過期時間 "Region": "UserCache" //可以調用Api清理 } } ]
再次調用會發現每隔15秒數據才會變.
自定義緩存
新建類CustomeCache
/// <summary> /// 自定義Cache /// </summary> public class CustomeCache : IOcelotCache<CachedResponse> { public class CacheDataModel { public CachedResponse CachedResponse { get; set; } public DateTime TimeOut { get; set; } public string Region { get; set; } } private static Dictionary<string, CacheDataModel> keyValuePairs = new Dictionary<string, CacheDataModel>(); /// <summary> /// 添加 /// </summary> /// <param name="key"></param> /// <param name="value"></param> /// <param name="ttl"></param> /// <param name="region"></param> public void Add(string key, CachedResponse value, TimeSpan ttl, string region) { Console.WriteLine($"調用了{nameof(CustomeCache)}--{nameof(Add)}"); keyValuePairs[key] = new CacheDataModel() { CachedResponse = value, Region = region, TimeOut = DateTime.Now.Add(ttl) }; } /// <summary> /// 覆蓋 /// </summary> /// <param name="key"></param> /// <param name="value"></param> /// <param name="ttl"></param> /// <param name="region"></param> public void AddAndDelete(string key, CachedResponse value, TimeSpan ttl, string region) { Console.WriteLine($"調用了{nameof(CustomeCache)}--{nameof(AddAndDelete)}"); keyValuePairs[key] = new CacheDataModel() { CachedResponse = value, Region = region, TimeOut = DateTime.Now.Add(ttl) }; } /// <summary> /// 清除 /// </summary> /// <param name="region"></param> public void ClearRegion(string region) { Console.WriteLine($"調用了{nameof(CustomeCache)}--{nameof(ClearRegion)}"); var keyList = keyValuePairs.Where(m => m.Value.Region.Equals(region)).Select(e => e.Key); foreach (var item in keyList) { keyValuePairs.Remove(item); } } /// <summary> /// 獲取 /// </summary> /// <param name="key"></param> /// <param name="region"></param> /// <returns></returns> public CachedResponse Get(string key, string region) { Console.WriteLine($"調用了{nameof(CustomeCache)}--{nameof(Get)}"); if (keyValuePairs.ContainsKey(key) && keyValuePairs[key] != null && keyValuePairs[key].TimeOut > DateTime.Now && keyValuePairs[key].Region.Equals(region)) return keyValuePairs[key].CachedResponse; else return null; } }
在ConfigureServices方法中加入:
public void ConfigureServices(IServiceCollection services) { services.AddOcelot().AddConsul() .AddCacheManager(m => { m.WithDictionaryHandle();//默認字典存儲 }); //services.AddControllers(); //這里的IOcelotCache<CachedResponse>是默認緩存的約束--准備替換成自定義的 services.AddSingleton<IOcelotCache<CachedResponse>, CustomeCache>(); }
調用,15秒刷新一次。
雪崩效應:微服務架構下,單個服務的故障而引發系列服務故障。
解決:1.超時:調用服務的操作可以配置為執行超時,如果服務未能在這個給定時間內響應,將回復一個失敗的消息。
2.熔斷:使用斷路器來檢測故障是否已得到解決,防止請求反復嘗試執行一個可能會失敗的操作,從而減少等待糾正故障的時間。
安裝NuGet包:Ocelot.Provider.Polly
修改ConfigureServices方法:
services.AddOcelot().AddConsul() .AddCacheManager(m => { m.WithDictionaryHandle();//默認字典存儲 }) .AddPolly(); //services.AddControllers(); //這里的IOcelotCache<CachedResponse>是默認緩存的約束--准備替換成自定義的 services.AddSingleton<IOcelotCache<CachedResponse>, CustomeCache>();
修改configuration.json文件
"Routes": [ { //GeteWay轉發=>Downstream "DownstreamPathTemplate": "/api/{url}", //服務地址--url變量 "DownstreamScheme": "http", //http://localhost:6299/T5/User/GetCustomerUser "UpstreamPathTemplate": "/T5/{url}", //網關地址--url變量 沖突的還可以加權重Priority "UpstreamHttpMethod": [ "Get", "Post" ], "UseServiceDiscovery": true, //使用服務發現 "ServiceName": "MicroserviceAttempt", //Consul服務名稱 "LoadBalancerOptions": { "Type": "RoundRobin" //輪詢 //"LeastConnection":最少連接數服務器 "NoloadBalance":不負載均衡 "CookieStickySession":會話粘滯 }, //使用緩存 "FileCacheOptions": { "TtlSeconds": 15, //過期時間 "Region": "UserCache" //可以調用Api清理 }, "QoSOptions": { "ExceptionsAllowedBeforeBreaking": 3, //熔斷之前允許多少個異常請求 "DurationOfBreak": 10000, //熔斷的時間,單位為ms.超過這個時間可再請求 "TimeoutValue": 4000 //如果下游請求的處理時間超過多少則將請求設置為超時 默認90秒 } } ],
故意設置接口休眠5秒鍾
調用:
限流:限制單位時間內的請求數(保護機制),超過就返回指定信息。
修改configuration.json文件
"Routes": [ { //GeteWay轉發=>Downstream "DownstreamPathTemplate": "/api/{url}", //服務地址--url變量 "DownstreamScheme": "http", //http://localhost:6299/T5/User/GetCustomerUser "UpstreamPathTemplate": "/T5/{url}", //網關地址--url變量 沖突的還可以加權重Priority "UpstreamHttpMethod": [ "Get", "Post" ], "UseServiceDiscovery": true, //使用服務發現 "ServiceName": "MicroserviceAttempt", //Consul服務名稱 "LoadBalancerOptions": { "Type": "RoundRobin" //輪詢 //"LeastConnection":最少連接數服務器 "NoloadBalance":不負載均衡 "CookieStickySession":會話粘滯 }, //使用緩存 "FileCacheOptions": { "TtlSeconds": 15, //過期時間 "Region": "UserCache" //可以調用Api清理 }, //限流 張隊長貢獻的 "RateLimitOptions": { "ClientWhitelist": ["Microservice","Attempt"],//白名單 ClientId區分大小寫 "EnableRateLimiting": true, "Period": "1s", //5m 1h 1d "PeriodTimespan": 30,//多少秒之后客戶端可以重試 "Limit": 5 //統計時間段內允許的最大請求數 }, "QoSOptions": { "ExceptionsAllowedBeforeBreaking": 3, //熔斷之前允許多少個異常請求 "DurationOfBreak": 10000, //熔斷的時間,單位為ms.超過這個時間可再請求 "TimeoutValue": 4000 //如果下游請求的處理時間超過多少則將請求設置為超時 默認90秒 } } ], "GlobalConfiguration": { "BaseUrl": "http://127.0.0.1:6299", "ServiceDiscoveryProvider": { "Host": "localhost", "Port": 8500, "Type": "Consul" //由Consul提供服務發現,每次請求去Consul }, "RateLimitOptions": { "QuotaExceededMessage": "Customize Tips!", //限流時返回的消息 "HttpStatusCode": 999 //限流時返回的code } //"ServiceDiscoveryProvider": { // "Host": "localhost", // "Port": 8500, // "Type": "PollConsul", //由Consul提供服務發現,每次請求去Consul // "PollingInterval": 1000//輪詢Consul,評率毫秒--down是不知道的 //} }
調用接口,超過五次就會限流:
當設置了白名單后,就對來訪的請求就不做限流機制
到此就結尾了。如有不當的地方,請諒解,希望能幫到大家。
代碼已上傳至我的github: