使用Redis實現最近N條數據的決策


前言

很多時候,我們會根據用戶最近一段時間的行為,做出一些相應的策略,從而改變系統的運動軌跡。

舉個簡單的例子來說明一下:

假設A公司現在有兩個合作伙伴(B和C),B和C都是提供天氣數據的,現在A公司做了一個聚合接口,把B和C的接口融合了,那么這個時候,要怎么去B和C公司獲取數據呢?

其實這個要考慮的東西有很多很多,下面根據本文的主題,拿出其中一個點來討論說明。

最簡單的做法就是,隨機調用。當然不是那么簡單的隨機調用。

根據調用的最近一百條數據的得到成功率,耗時等指標,再根據這些指標去判斷一次查詢要去那個公司獲取數據。

思路已經有了,這個時候就是怎么實踐的問題了。

本文介紹的做法是借助redis來完成的。

如何用redis來處理

redis的list類型可以說非常適合用來處理這個情況。

首先,可以把查詢按順序寫進去,一個個的入隊。

其次,寫進去之后可以對它進行裁剪,保留最近的100條數據。(換句話說,我們可以保證在這個list里面,最多就是100條數據)

最后,獲取這個list里面的100條數據,進行計算即可。

正常情況下,我們不會把計算放在查詢的過程里面,在查詢的時候,只需要一個決策的結果值就可以了,當然這個結果值也是計算后寫進redis的。

所以要將這個計算的過程從查詢中獨立出來,定時去執行即可。

總結上面所說的,大概可以畫出下面這樣一樣圖。

其中的第三步操作,將查詢記錄寫進list,然后進行裁剪這兩個操作,可以直接操作redis,也可以考慮通過MQ去寫,雖說沒什么太大的必要。

簡單的示例代碼

查詢的控制器

[Route("api/[controller]")]
[ApiController]
public class AreaController : ControllerBase
{
    private readonly ILogger _logger;
    private readonly IRedisCachingProvider _provider;

    public AreaController(ILoggerFactory loggerFactory, IRedisCachingProvider provider)
    {
        _logger = loggerFactory.CreateLogger<AreaController>();
        _provider = provider;
    }

    // GET api/area/11
    [HttpGet("provinceId")]
    public async Task<string> GetAsync(string provinceId)
    {
        // get datasource
        var datasource = await GetQueryDataSourceIdAsync(provinceId);

        if (string.IsNullOrWhiteSpace(datasource)) return "not support";

        var beginTime = DateTime.Now;
       
        // query
        var (val, isSucceed) = await QueryDataSourceAsync(datasource);

        var endTime = DateTime.Now;

        // datasource 
        var dsInfo = new DataSourceInfo
        {
            Cost = (long)endTime.Subtract(endTime).TotalMilliseconds,
            IsSucceed = isSucceed
        };

        // record
        _ = Task.Run(async () =>
        {
            try
            {
                await _provider.LPushAsync($"info:{datasource}", new List<DataSourceInfo> { dsInfo });
                await _provider.LTrimAsync($"info:{datasource}", 0, 99);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, $"record #{datasource}# error");
            }
        });

        return val;
    }

    private async Task<string> GetQueryDataSourceIdAsync(string provinceId)
    {
        var datasourceIds = GetDataSourceIdProvinceId(provinceId);

        if (datasourceIds.Count <= 0) return string.Empty;
       
        var cacheKey = "dskpi";

        var kpis = await _provider.HMGetAsync(cacheKey, datasourceIds);

        var datasource = datasourceIds.First();

        if (kpis != null && kpis.Any())
        {
            // policy
            datasource = kpis.OrderByDescending(x => x.Value).First().Key;
        }

        return datasource;
    }

    private async Task<(string val, bool isSucceed)> QueryDataSourceAsync(string datasource)
    {
        await Task.Delay(100);

        var rd = new Random().NextDouble();

        return (datasource, rd > 0.5d);
    }

    private List<string> GetDataSourceIdProvinceId(string provinceId)
    {
        return new List<string> { "100", "900" };
    }
}

由調度系統觸發的計算控制器

[Route("api/cal")]
[ApiController]
public class CalculatiionController : ControllerBase
{
    private readonly ILogger _logger;
    private readonly IRedisCachingProvider _provider;

    public CalculatiionController(ILoggerFactory loggerFactory, IRedisCachingProvider provider)
    {
        _logger = loggerFactory.CreateLogger<CalculatiionController>();
        _provider = provider;
    }

    // GET api/cal/
    [HttpGet]
    public string Get()
    {
        var id = Guid.NewGuid().ToString("N");

        _ = Task.Run(async () => await CalAsync(id));

        return "ok";
    }

    private async Task CalAsync(string id)
    {
        _logger.LogInformation($"{id} begin at {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");

        var datasourceIds = GetAllDataSourceIds();
        
        foreach (var item in datasourceIds)
        {
            try
            {
                var topN = await _provider.LRangeAsync<DataSourceInfo>($"info:{item}", 0, 99);

                var cost = topN.Average(x => x.Cost);
                var rate = topN.Count(x => x.IsSucceed) / 100;

                var score = GetScore(cost, rate);

                await _provider.HSetAsync($"dskpi", item, score.ToString());

            }
            catch (Exception ex)
            {
                _logger.LogError(ex, $"{id} {item} calculate fail ...");
            }
        }

        _logger.LogInformation($"{id} end at {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
    }

    private int GetScore(double cost, int rate)
    {
        return new Random().Next(1, 100);
    }

    private List<string> GetAllDataSourceIds()
    {
        return new List<string> { "100", "900" };
    }
}

也可以在Github上面找到上面的示例代碼 RecentRecordsDemo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM