前言
很多時候,我們會根據用戶最近一段時間的行為,做出一些相應的策略,從而改變系統的運動軌跡。
舉個簡單的例子來說明一下:
假設A公司現在有兩個合作伙伴(B和C),B和C都是提供天氣數據的,現在A公司做了一個聚合接口,把B和C的接口融合了,那么這個時候,要怎么去B和C公司獲取數據呢?
其實這個要考慮的東西有很多很多,下面根據本文的主題,拿出其中一個點來討論說明。
最簡單的做法就是,隨機調用。當然不是那么簡單的隨機調用。
根據調用的最近一百條數據的得到成功率,耗時等指標,再根據這些指標去判斷一次查詢要去那個公司獲取數據。
思路已經有了,這個時候就是怎么實踐的問題了。
本文介紹的做法是借助redis來完成的。
如何用redis來處理
redis的list類型可以說非常適合用來處理這個情況。
首先,可以把查詢按順序寫進去,一個個的入隊。
其次,寫進去之后可以對它進行裁剪,保留最近的100條數據。(換句話說,我們可以保證在這個list里面,最多就是100條數據)
最后,獲取這個list里面的100條數據,進行計算即可。
正常情況下,我們不會把計算放在查詢的過程里面,在查詢的時候,只需要一個決策的結果值就可以了,當然這個結果值也是計算后寫進redis的。
所以要將這個計算的過程從查詢中獨立出來,定時去執行即可。
總結上面所說的,大概可以畫出下面這樣一樣圖。
其中的第三步操作,將查詢記錄寫進list,然后進行裁剪這兩個操作,可以直接操作redis,也可以考慮通過MQ去寫,雖說沒什么太大的必要。
簡單的示例代碼
查詢的控制器
[Route("api/[controller]")]
[ApiController]
public class AreaController : ControllerBase
{
private readonly ILogger _logger;
private readonly IRedisCachingProvider _provider;
public AreaController(ILoggerFactory loggerFactory, IRedisCachingProvider provider)
{
_logger = loggerFactory.CreateLogger<AreaController>();
_provider = provider;
}
// GET api/area/11
[HttpGet("provinceId")]
public async Task<string> GetAsync(string provinceId)
{
// get datasource
var datasource = await GetQueryDataSourceIdAsync(provinceId);
if (string.IsNullOrWhiteSpace(datasource)) return "not support";
var beginTime = DateTime.Now;
// query
var (val, isSucceed) = await QueryDataSourceAsync(datasource);
var endTime = DateTime.Now;
// datasource
var dsInfo = new DataSourceInfo
{
Cost = (long)endTime.Subtract(endTime).TotalMilliseconds,
IsSucceed = isSucceed
};
// record
_ = Task.Run(async () =>
{
try
{
await _provider.LPushAsync($"info:{datasource}", new List<DataSourceInfo> { dsInfo });
await _provider.LTrimAsync($"info:{datasource}", 0, 99);
}
catch (Exception ex)
{
_logger.LogError(ex, $"record #{datasource}# error");
}
});
return val;
}
private async Task<string> GetQueryDataSourceIdAsync(string provinceId)
{
var datasourceIds = GetDataSourceIdProvinceId(provinceId);
if (datasourceIds.Count <= 0) return string.Empty;
var cacheKey = "dskpi";
var kpis = await _provider.HMGetAsync(cacheKey, datasourceIds);
var datasource = datasourceIds.First();
if (kpis != null && kpis.Any())
{
// policy
datasource = kpis.OrderByDescending(x => x.Value).First().Key;
}
return datasource;
}
private async Task<(string val, bool isSucceed)> QueryDataSourceAsync(string datasource)
{
await Task.Delay(100);
var rd = new Random().NextDouble();
return (datasource, rd > 0.5d);
}
private List<string> GetDataSourceIdProvinceId(string provinceId)
{
return new List<string> { "100", "900" };
}
}
由調度系統觸發的計算控制器
[Route("api/cal")]
[ApiController]
public class CalculatiionController : ControllerBase
{
private readonly ILogger _logger;
private readonly IRedisCachingProvider _provider;
public CalculatiionController(ILoggerFactory loggerFactory, IRedisCachingProvider provider)
{
_logger = loggerFactory.CreateLogger<CalculatiionController>();
_provider = provider;
}
// GET api/cal/
[HttpGet]
public string Get()
{
var id = Guid.NewGuid().ToString("N");
_ = Task.Run(async () => await CalAsync(id));
return "ok";
}
private async Task CalAsync(string id)
{
_logger.LogInformation($"{id} begin at {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
var datasourceIds = GetAllDataSourceIds();
foreach (var item in datasourceIds)
{
try
{
var topN = await _provider.LRangeAsync<DataSourceInfo>($"info:{item}", 0, 99);
var cost = topN.Average(x => x.Cost);
var rate = topN.Count(x => x.IsSucceed) / 100;
var score = GetScore(cost, rate);
await _provider.HSetAsync($"dskpi", item, score.ToString());
}
catch (Exception ex)
{
_logger.LogError(ex, $"{id} {item} calculate fail ...");
}
}
_logger.LogInformation($"{id} end at {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
}
private int GetScore(double cost, int rate)
{
return new Random().Next(1, 100);
}
private List<string> GetAllDataSourceIds()
{
return new List<string> { "100", "900" };
}
}
也可以在Github上面找到上面的示例代碼 RecentRecordsDemo