asp.net core 和consul


consul集群搭建

Consul是HashiCorp公司推出的使用go語言開發的開源工具,用於實現分布式系統的服務發現與配置,內置了服務注冊與發現框架、分布一致性協議實現、健康檢查、Key/Value存儲、多數據中心方案,使用起來較為簡單。使用docker命令創建注冊中心比較麻煩,並且不好維護,這里使用docker-compose來實現。registrator保證了,如果服務已停止,則從注冊中心中移除。docker-compose.yaml如下

version: "3.0"
 
services:
    # consul server,對外暴露的ui接口為8500,只有在2台consul服務器的情況下集群才起作用
    consulserver:
        image: progrium/consul:latest
        hostname: consulserver
        ports:
            - "8300"
            - "8400"
            - "8500:8500"
            - "53"
        command: -server -ui-dir /ui -data-dir /tmp/consul --bootstrap-expect=3
 
    # consul server1在consul server服務起來后,加入集群中
    consulserver1:
        image: progrium/consul:latest
        hostname: consulserver1
        depends_on:
            - "consulserver"
        ports:
            - "8300"
            - "8400"
            - "8500"
            - "53"
        command: -server -data-dir /tmp/consul -join consulserver
 
    # consul server2在consul server服務起來后,加入集群中
    consulserver2:
        image: progrium/consul:latest
        hostname: consulserver2
        depends_on:
            - "consulserver"
        ports:
            - "8300"
            - "8400"
            - "8500"
            - "53"
        command: -server -data-dir /tmp/consul -join consulserver
    registrator:
        image: gliderlabs/registrator:master
        hostname: registrator
        depends_on:
            - "consulserver"
        volumes:
            - "/var/run/docker.sock:/tmp/docker.sock"
        command: -internal consul://consulserver:8500

然后運行docker-compose up -d 

ASP.NET

注冊服務

創建一個ServiceA(asp.net core 2.2) 項目,需要安裝Consul,Consul包中提供了一個IConsulClient類,我們可以通過它來調用Consul進行服務的注冊,以及發現等。我們需要在服務啟動的時候,將自身的地址等信息注冊到Consul中,並在服務關閉的時候從Consul撤銷。這種行為就非常適合使用 IHostedService 來實現。這里要注意的是,我們需要保證_serviceId對於同一個實例的唯一,避免重復性的注冊。關閉時撤銷服務:ConsulHostedService.cs

namespace ServiceA
{
    using Consul;
    using Microsoft.AspNetCore.Hosting.Server;
    using Microsoft.AspNetCore.Hosting.Server.Features;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using System;
    using System.Linq;
    using System.Net;
    using System.Threading;
    using System.Threading.Tasks;
 
    public class ConsulHostedService : IHostedService
    {
        private readonly IConsulClient _consulClient;
        private readonly ILogger _logger;
        private readonly IServer _server;
 
        public ConsulHostedService(IConsulClient consulClient, ILogger<ConsulHostedService> logger, IServer server)
        {
            _consulClient = consulClient;
            _logger = logger;
            _server = server;
        }
 
        private CancellationTokenSource _cts;
        private string _serviceId;
 
        public async Task StartAsync(CancellationToken cancellationToken)
        {
            // Create a linked token so we can trigger cancellation outside of this token's cancellation
            _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 
            var features = _server.Features;
            var address = features.Get<IServerAddressesFeature>().Addresses.First();
            var uri = new Uri(address);
 
            _serviceId = "Service-v1-" + Dns.GetHostName() + "-" + uri.Authority;
 
            var registration = new AgentServiceRegistration()
            {
                ID = _serviceId,
                Name = "Service",
                Address = uri.Host,
                Port = uri.Port,
                Tags = new[] { "api" },
                Check = new AgentServiceCheck()
                {
                    // HTTP = $"{uri.Scheme}://{uri.Host}:{uri.Port}/api/Health/Status",
                    HTTP = $"{uri.Scheme}://{uri.Host}:{uri.Port}/healthz",
                    Timeout = TimeSpan.FromSeconds(2),
                    Interval = TimeSpan.FromSeconds(10)
                }
            };
 
            _logger.LogInformation("Registering in Consul");
 
            // 首先移除服務,避免重復注冊
            await _consulClient.Agent.ServiceDeregister(registration.ID, _cts.Token);
            await _consulClient.Agent.ServiceRegister(registration, _cts.Token);
        }
 
        public async Task StopAsync(CancellationToken cancellationToken)
        {
            _cts.Cancel();
            _logger.LogInformation("Deregistering from Consul");
            try
            {
                await _consulClient.Agent.ServiceDeregister(_serviceId, cancellationToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, $"Deregisteration failed");
            }
        }
    }
}

StartupConfigureServices方法中來配置IConsulClient到ASP.NET Core的依賴注入系統中,healthz地址,我使用了ASP.NET Core 2.2中自帶的健康檢查,它需要在Startup中添加如下配置

namespace ServiceA
{
    using System;
    using Consul;
    using Microsoft.AspNetCore.Builder;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }
 
        public IConfiguration Configuration { get; }
        public void ConfigureServices(IServiceCollection services)
        {
            //配置IConsulClient到ASP.NET Core的依賴注入系統中
            string consulAddress = "http://192.168.100.5:8500";
            services.AddSingleton<IConsulClient, ConsulClient>(p => new ConsulClient(consulConfig =>
            {
                consulConfig.Address = new Uri(consulAddress);
            }));
            services.AddSingleton<IHostedService, ConsulHostedService>();
 
            services.AddHealthChecks();//自帶的健康檢查
 
            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
        }
        public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
           app.UseHealthChecks("/healthz");
            app.UseMvc();
        }
    }
}

當然也可以自己寫一個HealthController:

using Microsoft.AspNetCore.Mvc;
 
namespace ServiceA.Controllers
{
    [Route("api/[controller]")]
    [Produces("application/json")]
    [ApiController]
    public class HealthController : Controller
    {
        [HttpGet("status")]
        public IActionResult Status() => Ok();
    }
}
using Microsoft.AspNetCore.Mvc;
 
namespace ServiceA.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class ValuesController : ControllerBase
    {
        // GET api/values
        [HttpGet]
        public ActionResult<string> Get()
        {
            return "value1AAA";
        }
    }
}

可以在Program.cs指定端口:

namespace ServiceA
{
    using Microsoft.AspNetCore;
    using Microsoft.AspNetCore.Hosting;
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateWebHostBuilder(args).Build().Run();
        }
 
        public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
            WebHost.CreateDefaultBuilder(args).UseUrls("http://192.168.100.2:6002")
                .UseStartup<Startup>();
    }
}

這里簡要說明一下我的環境, 代碼在win10物理機上,consul集群是win10虛擬機上ubuntu18的docker 環境,所以指定ip便於docker里面訪問,還有就是win10的防火牆要關閉。

把新建ServiceB和ServiceA一樣 只是修改一個端口然后用 dotnet run 運行如下:

把ServiceB關閉后


發現服務

現在來看看服務消費者如何從Consul來獲取可用的服務列表。

我們創建一個ConsoleApp,做為服務的調用端,添加ConsulNuget包,然后,我們創建一個ConsulClient實例,直接調用consuleClient.Health.Service就可以獲取到可用的服務列表了,然后使用HttpClient就可以發起對服務的調用。

但我們需要思考一個問題,我們什么時候從Consul獲取服務呢?最為簡單的便是在每次調用服務時,都先從Consul來獲取一下服務列表,這樣做的好處是我們得到的服務列表是最新的,能及時獲取到新注冊的服務以及過濾掉掛掉的服務。但是這樣每次請求都增加了一次對Consul的調用,對性能有稍微的損耗,不過我們可以在每個調用端的機器上都部署一個Consul Agent,這樣對性能的影響就微乎其微了。另外一種方式,可以在調用端做服務列表的本地緩存,並定時與Consul同步。其實現也非常簡單,通過一個Timer來定時從Consul拉取最新的服務列表,創建一個ConsulServiceProvider.cs類,實現如下:

namespace ConsoleApp
{
    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;
    using Consul;
    public interface IServiceDiscoveryProvider
    {
        Task<List<string>> GetServicesAsync();
    }
    public class ConsulServiceProvider : IServiceDiscoveryProvider
    {
        private string consulAddres;
        public ConsulServiceProvider(string url) {
            consulAddres = url;
        }
        public async Task<List<string>> GetServicesAsync()
        {
            var consuleClient = new ConsulClient(consulConfig =>
            {
                consulConfig.Address = new Uri(consulAddres);
            });
 
            var queryResult = await consuleClient.Health.Service("Service", string.Empty, true);
 
            while (queryResult.Response.Length == 0)
            {
                Console.WriteLine("No services found, wait 1s....");
                await Task.Delay(1000);
                queryResult = await consuleClient.Health.Service("Service", string.Empty, true);
            }
 
            var result = new List<string>();
            foreach (var serviceEntry in queryResult.Response)
            {
                result.Add(serviceEntry.Service.Address + ":" + serviceEntry.Service.Port);
            }
            return result;
        }
    }
 
    public class PollingConsulServiceProvider : IServiceDiscoveryProvider
    {
        private List<string> _services = new List<string>();
        private bool _polling;
        private string consulAddres;
        public PollingConsulServiceProvider(string url)
        {
            consulAddres = url;
            var _timer = new Timer(async _ =>
            {
                if (_polling)
                {
                    return;
                }
 
                _polling = true;
                await Poll();
                _polling = false;
 
            }, null, 0, 1000);
        }
     
        public async Task<List<string>> GetServicesAsync()
        {
            if (_services.Count == 0) await Poll();
            return _services;
        }
 
        private async Task Poll()
        {
            _services = await new ConsulServiceProvider(consulAddres).GetServicesAsync();
        }
    }
}

負載均衡

如何將不同的用戶的流量分發到不同的服務器上面呢,早期的方法是使用DNS做負載,通過給客戶端解析不同的IP地址,讓客戶端的流量直接到達各個服務器。但是這種方法有一個很大的缺點就是延時性問題,在做出調度策略改變以后,由於DNS各級節點的緩存並不會及時的在客戶端生效,而且DNS負載的調度策略比較簡單,無法滿足業務需求,因此就出現了負載均衡器。

常見的負載均衡算法有如下幾種:

  • 隨機算法:每次從服務列表中隨機選取一個服務器。

  • 輪詢及加權輪詢:按順序依次調用服務列表中的服務器,也可以指定一個加權值,來增加某個服務器的調用次數。

  • 最小連接:記錄每個服務器的連接數,每次選取連接數最少的服務器。

  • 哈希算法:分為普通哈希與一致性哈希等。

  • IP地址散列:通過調用端Ip地址的散列,將來自同一調用端的分組統一轉發到相同服務器的算法。

  • URL散列:通過管理調用端請求URL信息的散列,將發送至相同URL的請求轉發至同一服務器的算法。

本文中簡單模擬前兩種來介紹一下。

隨機均衡是最為簡單粗暴的方式,我們只需根據服務器數量生成一個隨機數即可

最簡單的輪詢實現 使用lock控制並發,每次請求,移動一下服務索引。

RandomLoadBalancer.cs

namespace ConsoleApp
{
    using System;
    using System.Threading.Tasks;
    public interface ILoadBalancer
    {
        Task<string> GetServiceAsync();
    }
    public class RandomLoadBalancer : ILoadBalancer
    {
        private readonly IServiceDiscoveryProvider _sdProvider;
 
        public RandomLoadBalancer(IServiceDiscoveryProvider sdProvider)
        {
            _sdProvider = sdProvider;
        }
 
        private Random _random = new Random();
 
        public async Task<string> GetServiceAsync()
        {
            var services = await _sdProvider.GetServicesAsync();
            return services[_random.Next(services.Count)];
        }
    }
    public class RoundRobinLoadBalancer : ILoadBalancer
    {
        private readonly IServiceDiscoveryProvider _sdProvider;
 
        public RoundRobinLoadBalancer(IServiceDiscoveryProvider sdProvider)
        {
            _sdProvider = sdProvider;
        }
 
        private readonly object _lock = new object();
        private int _index = 0;
 
        public async Task<string> GetServiceAsync()
        {
            var services = await _sdProvider.GetServicesAsync();
            lock (_lock)
            {
                if (_index >= services.Count)
                {
                    _index = 0;
                }
                return services[_index++];
            }
        }
    }
}

便可以直接使用HttpClient來完成服務的調用了

 
namespace ConsoleApp
{
    using System;
    using System.Net.Http;
    using System.Threading.Tasks;
    class Program
    {
        static void  Main(string[] args)
        {
            TestConsul().ConfigureAwait(false);
            Console.ReadKey();
        }
        static async Task TestConsul() {
            string url = "http://192.168.100.5:8500";
            ILoadBalancer balancer = new RoundRobinLoadBalancer(new PollingConsulServiceProvider(url));
            var client = new HttpClient();
 
            Console.WriteLine("Request by RoundRobinLoadBalancer....");
            for (int i = 0; i < 10; i++)
            {
                var service = await balancer.GetServiceAsync();
 
                Console.WriteLine(DateTime.Now.ToString() + "-RoundRobin:" +
                    await client.GetStringAsync("http://" + service + "/api/values") + " --> " + "Request from " + service);
            }
 
            Console.WriteLine("Request by RandomLoadBalancer....");
            balancer = new RandomLoadBalancer(new PollingConsulServiceProvider(url));
            for (int i = 0; i < 10; i++)
            {
                var service = await balancer.GetServiceAsync();
 
                Console.WriteLine(DateTime.Now.ToString() + "-Random:" +
                    await client.GetStringAsync("http://" + service + "/api/values") + " --> " + "Request from " + service);
            }
        }
    }
}


代碼下載

參考:

consul+docker實現服務注冊

RainingNight/AspNetCoreSample

微服務(入門二):netcore通過consul注冊服務


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM