【.NET Core項目實戰-統一認證平台】第十五章 網關篇-使用二級緩存提升性能


【.NET Core項目實戰-統一認證平台】開篇及目錄索引

一、背景

首先說聲抱歉,可能是因為假期綜合症(其實就是因為懶哈)的原因,已經很長時間沒更新博客了,現在也調整的差不多了,准備還是以每周1-2篇的進度來更新博客,並完成本項目所有功能。

言歸正傳,本重構項目是在我根據實際需求重構,由於還未完全寫完,所以也沒進行壓測,在2月份時,張善友老師給我留言說經過壓測發現我重構的Ocelot網關功能性能較差,其中根本原因就是緩存模塊,由於重構項目的緩存強依賴Redis緩存,造成性能瓶頸,發現問題后,我也第一時間進行測試,性能影響很大,經過跟張老師請教,可以使用二級緩存來解決性能問題,首先感謝張老師關注並指點迷津,於是就有了這篇文章,如何把現有緩存改成二級緩存並使用。

二、改造思路

為了解決redis的強依賴性,首先需要把緩存數據存儲到本地,所有請求都優先從本地提取,如果提取不到再從redis提取,如果redis無數據,在從數據庫中提取。提取流程如下:

MemoryCache > Redis > db

此種方式減少提取緩存的網絡開銷,也合理利用了分布式緩存,並最終減少數據庫的訪問開銷。但是使用此種方案也面臨了一個問題是如何保證集群環境時每個機器本地緩存數據的一致性,這時我們會想到redis的發布、訂閱特性,在數據發生變動時更新redis數據並發布緩存更新通知,由每個集群機器訂閱變更事件,然后處理本地緩存記錄,最終達到集群緩存的緩存一致性。

但是此方式對於緩存變更非常頻繁的業務不適用,比如限流策略(准備還是使用分布式redis緩存實現),但是可以擴展配置單機限流時使用本地緩存實現,如果誰有更好的實現方式,也麻煩告知下集群環境下限流的實現,不勝感激。

三、改造代碼

首先需要分析下目前改造后的Ocelot網關在哪些業務中使用的緩存,然后把使用本地緩存的的業務重構,增加提取數據流程,最后提供網關外部緩存初始化接口,便於與業務系統進行集成。

1.重寫緩存方法

找到問題的原因后,就可以重寫緩存方法,增加二級緩存支持,默認使用本地的緩存,新建CzarMemoryCache類,來實現IOcelotCache<T>方法,實現代碼如下。

using Czar.Gateway.Configuration;
using Czar.Gateway.RateLimit;
using Microsoft.Extensions.Caching.Memory;
using Ocelot.Cache;
using System;

namespace Czar.Gateway.Cache
{
    /// <summary>
    /// 金焰的世界
    /// 2019-03-03
    /// 使用二級緩存解決集群環境問題
    /// </summary>
    public class CzarMemoryCache<T> : IOcelotCache<T>
    {
        private readonly CzarOcelotConfiguration _options;
        private readonly IMemoryCache _cache;
        public CzarMemoryCache(CzarOcelotConfiguration options,IMemoryCache cache)
        {
            _options = options;
            _cache = cache;
        }
        public void Add(string key, T value, TimeSpan ttl, string region)
        {
            key = CzarOcelotHelper.GetKey(_options.RedisOcelotKeyPrefix,region, key);
            if (_options.ClusterEnvironment)
            {
                var msg = value.ToJson();
                if (typeof(T) == typeof(CachedResponse))
                {//帶過期時間的緩存
                    _cache.Set(key, value, ttl); //添加本地緩存
                    RedisHelper.Set(key, msg); //加入redis緩存
                    RedisHelper.Publish(key, msg); //發布
                }
                else if (typeof(T) == typeof(CzarClientRateLimitCounter?))
                {//限流緩存,直接使用redis
                    RedisHelper.Set(key, value, (int)ttl.TotalSeconds);
                }
                else
                {//正常緩存,發布
                    _cache.Set(key, value, ttl); //添加本地緩存
                    RedisHelper.Set(key, msg); //加入redis緩存
                    RedisHelper.Publish(key, msg); //發布
                }
            }
            else
            {
                _cache.Set(key, value, ttl); //添加本地緩存
            }
        }

        public void AddAndDelete(string key, T value, TimeSpan ttl, string region)
        {
            Add(key, value, ttl, region);
        }

        public void ClearRegion(string region)
        {
            if (_options.ClusterEnvironment)
            {
                var keys = RedisHelper.Keys(region + "*");
                RedisHelper.Del(keys);
                foreach (var key in keys)
                {
                    RedisHelper.Publish(key, ""); //發布key值為空,處理時刪除即可。
                }
            }
            else
            {
                _cache.Remove(region);
            }
        }

        public T Get(string key, string region)
        {
            key = CzarOcelotHelper.GetKey(_options.RedisOcelotKeyPrefix, region, key);
            if(region== CzarCacheRegion.CzarClientRateLimitCounterRegion&& _options.ClusterEnvironment)
            {//限流且開啟了集群支持,默認從redis取
                return RedisHelper.Get<T>(key);
            }
            var result = _cache.Get<T>(key);
            if (result == null&& _options.ClusterEnvironment)
            {
                result= RedisHelper.Get<T>(key);
                if (result != null)
                {
                    if (typeof(T) == typeof(CachedResponse))
                    {//查看redis過期時間
                        var second = RedisHelper.Ttl(key);
                        if (second > 0)
                        {
                            _cache.Set(key, result, TimeSpan.FromSeconds(second));
                        }
                    }
                    else
                    {
                        _cache.Set(key, result, TimeSpan.FromSeconds(_options.CzarCacheTime));
                    }
                }
            }
            return result;
        }
    }
}

上面就段代碼實現了本地緩存和Redis緩存的支持,優先從本地提取,如果在集群環境使用,增加redis緩存支持,但是此種方式不適用緩存變更非常頻繁場景,比如客戶端限流的實現,所以在代碼中把客戶端限流的緩存直接使用redis緩存實現。

2.注入實現和訂閱

有了實現代碼后,發現還缺少添加緩存注入和配置信息修改。首先需要修改配置文件來滿足是否開啟集群判斷,然后需要實現redis的不同部署方式能夠通過配置文件配置進行管理,避免硬編碼導致的不可用問題。

配置文件CzarOcelotConfiguration.cs修改代碼如下:

namespace Czar.Gateway.Configuration
{
    /// <summary>
    /// 金焰的世界
    /// 2018-11-11
    /// 自定義配置信息
    /// </summary>
    public class CzarOcelotConfiguration
    {
        /// <summary>
        /// 數據庫連接字符串,使用不同數據庫時自行修改,默認實現了SQLSERVER
        /// </summary>
        public string DbConnectionStrings { get; set; }

        /// <summary>
        /// 金焰的世界
        /// 2018-11-12
        /// 是否啟用定時器,默認不啟動
        /// </summary>
        public bool EnableTimer { get; set; } = false;

        /// <summary>
        /// 金焰的世界
        /// 2018-11.12
        /// 定時器周期,單位(毫秒),默認30分總自動更新一次
        /// </summary>
        public int TimerDelay { get; set; } = 30 * 60 * 1000;

        /// <summary>
        /// 金焰的世界
        /// 2018-11-14
        /// Redis連接字符串
        /// </summary>
        public string RedisConnectionString { get; set; }

        /// <summary>
        /// 金焰的世界
        /// 2019-03-03
        /// 配置哨兵或分區時使用
        /// </summary>
        public string[] RedisSentinelOrPartitionConStr { get; set; }

        /// <summary>
        /// 金焰的世界
        /// 2019-03-03
        /// Redis部署方式,默認使用普通方式
        /// </summary>
        public RedisStoreMode RedisStoreMode { get; set; } = RedisStoreMode.Normal;

        /// <summary>
        /// 金焰的計界
        /// 2019-03-03
        /// 做集群緩存同步時使用,會訂閱所有正則匹配的事件
        /// </summary>
        public string RedisOcelotKeyPrefix { get; set; } = "CzarOcelot";

        /// <summary>
        /// 金焰的世界
        /// 2019-03-03
        /// 是否啟用集群環境,如果非集群環境直接本地緩存+數據庫即可
        /// </summary>
        public bool ClusterEnvironment { get; set; } = false;

        /// <summary>
        /// 金焰的世界
        /// 2018-11-15
        /// 是否啟用客戶端授權,默認不開啟
        /// </summary>
        public bool ClientAuthorization { get; set; } = false;

        /// <summary>
        /// 金焰的世界
        /// 2018-11-15
        /// 服務器緩存時間,默認30分鍾
        /// </summary>
        public int CzarCacheTime { get; set; } = 1800;
        /// <summary>
        /// 金焰的世界
        /// 2018-11-15
        /// 客戶端標識,默認 client_id
        /// </summary>
        public string ClientKey { get; set; } = "client_id";

        /// <summary>
        /// 金焰的世界
        /// 2018-11-18
        /// 是否開啟自定義限流,默認不開啟
        /// </summary>
        public bool ClientRateLimit { get; set; } = false;
    }
}

在配置文件中修改了redis相關配置,支持使用redis的普通模式、集群模式、哨兵模式、分區模式,配置方式可參考csrediscore開源項目。

然后修改ServiceCollectionExtensions.cs代碼,注入相關實現和redis客戶端。

			builder.Services.AddMemoryCache(); //添加本地緩存
            #region 啟動Redis緩存,並支持普通模式 官方集群模式  哨兵模式 分區模式
            if (options.ClusterEnvironment)
            {
                //默認使用普通模式
                var csredis = new CSRedis.CSRedisClient(options.RedisConnectionString);
                switch (options.RedisStoreMode)
                {
                    case RedisStoreMode.Partition:
                        var NodesIndex = options.RedisSentinelOrPartitionConStr;
                        Func<string, string> nodeRule = null;
                        csredis = new CSRedis.CSRedisClient(nodeRule, options.RedisSentinelOrPartitionConStr);
                        break;
                    case RedisStoreMode.Sentinel:
                        csredis = new CSRedis.CSRedisClient(options.RedisConnectionString, options.RedisSentinelOrPartitionConStr);
                        break;
                }
                //初始化 RedisHelper
                RedisHelper.Initialization(csredis);
            }
            #endregion
            builder.Services.AddSingleton<IOcelotCache<FileConfiguration>, CzarMemoryCache<FileConfiguration>>();
            builder.Services.AddSingleton<IOcelotCache<InternalConfiguration>, CzarMemoryCache<InternalConfiguration>>();
            builder.Services.AddSingleton<IOcelotCache<CachedResponse>, CzarMemoryCache<CachedResponse>>();
            builder.Services.AddSingleton<IInternalConfigurationRepository, RedisInternalConfigurationRepository>();
            builder.Services.AddSingleton<IOcelotCache<ClientRoleModel>, CzarMemoryCache<ClientRoleModel>>();
            builder.Services.AddSingleton<IOcelotCache<RateLimitRuleModel>, CzarMemoryCache<RateLimitRuleModel>>();
            builder.Services.AddSingleton<IOcelotCache<RemoteInvokeMessage>, CzarMemoryCache<RemoteInvokeMessage>>();
            builder.Services.AddSingleton<IOcelotCache<CzarClientRateLimitCounter?>, CzarMemoryCache<CzarClientRateLimitCounter?>>();

現在需要實現redis訂閱來更新本地的緩存信息,在項目啟動時判斷是否開啟集群模式,如果開啟就啟動訂閱,實現代碼如下:

public static async Task<IApplicationBuilder> UseCzarOcelot(this IApplicationBuilder builder, OcelotPipelineConfiguration pipelineConfiguration)
{
    //重寫創建配置方法
    var configuration = await CreateConfiguration(builder);
    ConfigureDiagnosticListener(builder);
    CacheChangeListener(builder);
    return CreateOcelotPipeline(builder, pipelineConfiguration);
}
/// <summary>
/// 金焰的世界
/// 2019-03-03
/// 添加緩存數據變更訂閱
/// </summary>
/// <param name="builder"></param>
/// <returns></returns>
private static void CacheChangeListener(IApplicationBuilder builder)
{
    var config= builder.ApplicationServices.GetService<CzarOcelotConfiguration>();
    var _cache= builder.ApplicationServices.GetService<IMemoryCache>();
    if (config.ClusterEnvironment)
    {
        //訂閱滿足條件的所有事件
        RedisHelper.PSubscribe(new[] { config.RedisOcelotKeyPrefix + "*" }, message =>
              {
                  var key = message.Channel;
                  _cache.Remove(key); //直接移除,如果有請求從redis里取
                  //或者直接判斷本地緩存是否存在,如果存在更新,可自行實現。
              });
    }
}

使用的是從配置文件提取的正則匹配的所有KEY都進行訂閱,由於本地緩存增加了定時過期策略,所以為了實現方便,當發現redis數據發生變化,所有訂閱端直接移除本地緩存即可,如果有新的請求直接從redis取,然后再次緩存,防止集群客戶端緩存信息不一致。

為了區分不同的緩存實體,便於在原始數據發送變更時進行更新,定義CzarCacheRegion類。

namespace Czar.Gateway.Configuration
{
    /// <summary>
    /// 緩存所屬區域
    /// </summary>
    public class CzarCacheRegion
    {
        /// <summary>
        /// 授權
        /// </summary>
        public const string AuthenticationRegion = "CacheClientAuthentication";

        /// <summary>
        /// 路由配置
        /// </summary>
        public const string FileConfigurationRegion = "CacheFileConfiguration";

        /// <summary>
        /// 內部配置
        /// </summary>
        public const string InternalConfigurationRegion = "CacheInternalConfiguration";

        /// <summary>
        /// 客戶端權限
        /// </summary>
        public const string ClientRoleModelRegion = "CacheClientRoleModel";

        /// <summary>
        /// 限流規則
        /// </summary>
        public const string RateLimitRuleModelRegion = "CacheRateLimitRuleModel";

        /// <summary>
        /// Rpc遠程調用
        /// </summary>
        public const string RemoteInvokeMessageRegion = "CacheRemoteInvokeMessage";

        /// <summary>
        /// 客戶端限流
        /// </summary>
        public const string CzarClientRateLimitCounterRegion = "CacheCzarClientRateLimitCounter";
    }
}

現在只需要修改緩存的region為定義的值即可,唯一需要改動的代碼就是把之前寫死的代碼改成如下代碼即可。

var enablePrefix = CzarCacheRegion.AuthenticationRegion;

3.開發緩存變更接口

現在整個二級緩存基本完成,但是還遇到一個問題就是外部如何根據數據庫變更數據時來修改緩存數據,這時就需要提供外部修改api來實現。

添加CzarCacheController.cs對外部提供緩存更新相關接口,詳細代碼如下:

using Czar.Gateway.Authentication;
using Czar.Gateway.Configuration;
using Czar.Gateway.RateLimit;
using Czar.Gateway.Rpc;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Caching.Memory;
using Ocelot.Configuration;
using Ocelot.Configuration.Creator;
using Ocelot.Configuration.Repository;
using System;
using System.Threading.Tasks;

namespace Czar.Gateway.Cache
{
    /// <summary>
    /// 提供外部緩存處理接口
    /// </summary>
    [Authorize]
    [Route("CzarCache")]
    public class CzarCacheController : Controller
    {
        private readonly CzarOcelotConfiguration _options;
        private readonly IClientAuthenticationRepository _clientAuthenticationRepository;
        private IFileConfigurationRepository _fileConfigurationRepository;
        private IInternalConfigurationCreator _internalConfigurationCreator;
        private readonly IClientRateLimitRepository _clientRateLimitRepository;
        private readonly IRpcRepository _rpcRepository;
        private readonly IMemoryCache _cache;
        public CzarCacheController(IClientAuthenticationRepository clientAuthenticationRepository, CzarOcelotConfiguration options,
          IFileConfigurationRepository fileConfigurationRepository,
          IInternalConfigurationCreator internalConfigurationCreator,
          IClientRateLimitRepository clientRateLimitRepository,
          IRpcRepository rpcRepository,
          IMemoryCache cache)
        {
            _clientAuthenticationRepository = clientAuthenticationRepository;
            _options = options;
            _fileConfigurationRepository = fileConfigurationRepository;
            _internalConfigurationCreator = internalConfigurationCreator;
            _clientRateLimitRepository = clientRateLimitRepository;
            _rpcRepository = rpcRepository;
            _cache = cache;
        }

        /// <summary>
        /// 更新客戶端地址訪問授權接口
        /// </summary>
        /// <param name="clientid">客戶端ID</param>
        /// <param name="path">請求模板</param>
        /// <returns></returns>
        [HttpPost]
        [Route("ClientRule")]
        public async Task UpdateClientRuleCache(string clientid, string path)
        {
            var region = CzarCacheRegion.AuthenticationRegion;
            var key = CzarOcelotHelper.ComputeCounterKey(region, clientid, "", path);
            key = CzarOcelotHelper.GetKey(_options.RedisOcelotKeyPrefix, region, key);
            var result = await _clientAuthenticationRepository.ClientAuthenticationAsync(clientid, path);
            var data = new ClientRoleModel() { CacheTime = DateTime.Now, Role = result };
            if (_options.ClusterEnvironment)
            {
                RedisHelper.Set(key, data); //加入redis緩存
                RedisHelper.Publish(key, data.ToJson()); //發布事件
            }
            else
            {
                _cache.Remove(key);
            }
        }

        /// <summary>
        /// 更新網關配置路由信息
        /// </summary>
        /// <returns></returns>
        [HttpPost]
        [Route("InternalConfiguration")]
        public async Task UpdateInternalConfigurationCache()
        {
            var key = CzarCacheRegion.InternalConfigurationRegion;
            key = CzarOcelotHelper.GetKey(_options.RedisOcelotKeyPrefix, "", key);
            var fileconfig = await _fileConfigurationRepository.Get();
            var internalConfig = await _internalConfigurationCreator.Create(fileconfig.Data);
            var config = (InternalConfiguration)internalConfig.Data;
            if (_options.ClusterEnvironment)
            {
                RedisHelper.Set(key, config); //加入redis緩存
                RedisHelper.Publish(key, config.ToJson()); //發布事件
            }
            else
            {
                _cache.Remove(key);
            }
        }

        /// <summary>
        /// 刪除路由配合的緩存信息
        /// </summary>
        /// <param name="region">區域</param>
        /// <param name="downurl">下端路由</param>
        /// <returns></returns>
        [HttpPost]
        [Route("Response")]
        public async Task DeleteResponseCache(string region,string downurl)
        {
            var key = CzarOcelotHelper.GetKey(_options.RedisOcelotKeyPrefix, region, downurl);
            if (_options.ClusterEnvironment)
            {
                await RedisHelper.DelAsync(key);
                RedisHelper.Publish(key, "");//發布時間
            }
            else
            {
                _cache.Remove(key);
            }
        }

        /// <summary>
        /// 更新客戶端限流規則緩存
        /// </summary>
        /// <param name="clientid">客戶端ID</param>
        /// <param name="path">路由模板</param>
        /// <returns></returns>
        [HttpPost]
        [Route("RateLimitRule")]
        public async Task UpdateRateLimitRuleCache(string clientid, string path)
        {
            var region = CzarCacheRegion.RateLimitRuleModelRegion;
            var key = clientid + path;
            key = CzarOcelotHelper.GetKey(_options.RedisOcelotKeyPrefix, region, key);
            var result = await _clientRateLimitRepository.CheckClientRateLimitAsync(clientid, path);
            var data = new RateLimitRuleModel() { RateLimit = result.RateLimit, rateLimitOptions = result.rateLimitOptions };
            if (_options.ClusterEnvironment)
            {
                RedisHelper.Set(key, data); //加入redis緩存
                RedisHelper.Publish(key, data.ToJson()); //發布事件
            }
            else
            {
                _cache.Remove(key);
            }
        }

        /// <summary>
        /// 更新客戶端是否開啟限流緩存
        /// </summary>
        /// <param name="path"></param>
        /// <returns></returns>
        [HttpPost]
        [Route("ClientRole")]
        public async Task UpdateClientRoleCache(string path)
        {
            var region = CzarCacheRegion.ClientRoleModelRegion;
            var key = path;
            key = CzarOcelotHelper.GetKey(_options.RedisOcelotKeyPrefix, region, key);
            var result = await _clientRateLimitRepository.CheckReRouteRuleAsync(path);
            var data = new ClientRoleModel() { CacheTime = DateTime.Now, Role = result };
            if (_options.ClusterEnvironment)
            {
                RedisHelper.Set(key, data); //加入redis緩存
                RedisHelper.Publish(key, data.ToJson()); //發布事件
            }
            else
            {
                _cache.Remove(key);
            }
        }

        /// <summary>
        /// 更新呢客戶端路由白名單緩存
        /// </summary>
        /// <param name="clientid"></param>
        /// <param name="path"></param>
        /// <returns></returns>
        [HttpPost]
        [Route("ClientReRouteWhiteList")]
        public async Task UpdateClientReRouteWhiteListCache(string clientid, string path)
        {
            var region = CzarCacheRegion.ClientReRouteWhiteListRegion;
            var key = clientid + path;
            key = CzarOcelotHelper.GetKey(_options.RedisOcelotKeyPrefix, region, key);
            var result = await _clientRateLimitRepository.CheckClientReRouteWhiteListAsync(clientid, path);
            var data = new ClientRoleModel() { CacheTime = DateTime.Now, Role = result };
            if (_options.ClusterEnvironment)
            {
                RedisHelper.Set(key, data); //加入redis緩存
                RedisHelper.Publish(key, data.ToJson()); //發布事件
            }
            else
            {
                _cache.Remove(key);
            }
        }

        [HttpPost]
        [Route("Rpc")]
        public async Task UpdateRpcCache(string UpUrl)
        {
            var region = CzarCacheRegion.RemoteInvokeMessageRegion;
            var key = UpUrl;
            key = CzarOcelotHelper.GetKey(_options.RedisOcelotKeyPrefix, region, key);
            var result = await _rpcRepository.GetRemoteMethodAsync(UpUrl);
            if (_options.ClusterEnvironment)
            {
                RedisHelper.Set(key, result); //加入redis緩存
                RedisHelper.Publish(key, result.ToJson()); //發布事件
            }
            else
            {
                _cache.Remove(key);
            }
        }
    }
}

現在基本實現整個緩存的更新策略,只要配合后台管理界面,在相關緩存原始數據發送變更時,調用對應接口即可完成redis緩存的更新,並自動通知集群的所有本機清理緩存等待重新獲取。

接口的調用方式參考之前我寫的配置信息接口變更那篇即可。

四、性能測試

完成了改造后,我們拿改造前網關、改造后網關、原始Ocelot、直接調用API四個環境分別測試性能指標,由於測試環境有效,我直接使用本機環境,然后是Apache ab測試工具測試下相關性能(本測試不一定准確,只作為參考指標),測試的方式是使用100個並發請求10000次,測試結果分別如下。

1、改造前網關性能

2、改造后網關性能

3、Ocelot默認網關性能

4、直接調用API性能

本測試僅供參考,因為由於網關和服務端都在本機環境部署,所以使用網關和不使用網關性能差別非常小,如果分開部署可能性別差別會明顯寫,這不是本篇討論的重點。

從測試中可以看到,重構的網關改造前和改造后性能有2倍多的提升,且與原生的Ocelot性能非常接近。

五、總結

本篇主要講解了如何使用redis的發布訂閱來實現二級緩存功能,並提供了緩存的更新相關接口供外部程序調用,避免出現集群環境下無法更新緩存數據導致提取數據不一致情況,但是針對每個客戶端獨立限流這塊集群環境目前還是采用的redis的方式未使用本地緩存,如果有寫的不對或有更好方式的,也希望多提寶貴意見。

本篇相關源碼地址:https://github.com/jinyancao/czar.gateway


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM