根據官方wiki文檔,sentinel控制台的實時監控數據,默認僅存儲 5 分鍾以內的數據。如需持久化,需要定制實現相關接口。
https://github.com/alibaba/Sentinel/wiki/在生產環境中使用-Sentinel-控制台 也給出了指導步驟:
1.自行擴展實現 MetricsRepository 接口;
2.注冊成 Spring Bean 並在相應位置通過 @Qualifier 注解指定對應的 bean name 即可。
本文使用時序數據庫InfluxDB來進行持久化,從下載開始,一步步編寫一個基於InfluxDB的存儲實現。
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
InfluxDB官網:https://www.influxdata.com
關鍵詞:
高性能時序數據庫
go語言編寫沒有外部依賴
支持HTTP API讀寫
支持類SQL查詢語法
通過數據保留策略(Retention Policies)支持自動清理歷史數據
通過連續查詢(Continuous Queries)支持數據歸檔
最新版本:1.6.4
下載
windows:wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.4_windows_amd64.zip
linux:wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.4_linux_amd64.tar.gz
注:windows下載安裝wget https://eternallybored.org/misc/wget/
在windows環境,解壓zip文件至D:\influxdb\influxdb-1.6.4-1目錄:
打開cmd命令行窗口,在D:\influxdb\influxdb-1.6.4-1執行命令啟動influxdb服務端:influxd
再打開一個cmd窗口,在目錄下輸入influx啟動客戶端: // 后面可以帶上參數:-precision rfc3339 指定時間格式顯示
show databases發現只有系統的2個數據庫,這里我們新建一個sentinel_db,輸入命令:create database sentinel_db
use sentinel_db 使用sentinel_db數據庫
show measurements 查看數據庫中的數據表(measurement)
可以看到,這幾個InfluxDB命令跟MySQL很相似。
==============================================================
InfluxDB名詞概念:
database:數據庫 // 關系數據庫的database
measurement:數據庫中的表 // 關系數據庫中的table
point:表里的一行數據 // 關系數據庫中的row
point由3部分組成:
time:每條數據記錄的時間,也是數據庫自動生成的主索引;// 類似主鍵
fields:各種記錄的值;// 沒有索引的字段
tags:各種有索引的屬性 // 有索引的字段
==============================================================
在官方github上,有一個java的客戶端庫:
https://github.com/influxdata/influxdb-java
在sentinel-dashboard的pom.xml中,加入maven依賴:
<dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.14</version> </dependency>
封裝一個工具類:存儲InfluxDB連接信息以及方便調用
/** * @author cdfive * @date 2018-10-19 */ @Component public class InfluxDBUtils { private static Logger logger = LoggerFactory.getLogger(InfluxDBUtils.class); private static String url; private static String username; private static String password; private static InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); @Value("${influxdb.url}") public void setUrl(String url) { InfluxDBUtils.url = url; } @Value("${influxdb.username}") public void setUsername(String username) { InfluxDBUtils.username = username; } @Value("${influxdb.password}") public void setPassword(String password) { InfluxDBUtils.password = password; } public static void init(String url, String username, String password) { InfluxDBUtils.url = url; InfluxDBUtils.username = username; InfluxDBUtils.password = password; } public static <T> T process(String database, InfluxDBCallback callback) { InfluxDB influxDB = null; T t = null; try { influxDB = InfluxDBFactory.connect(url, username, password); influxDB.setDatabase(database); t = callback.doCallBack(database, influxDB); } catch (Exception e) { logger.error("[process exception]", e); } finally { if (influxDB != null) { try { influxDB.close(); } catch (Exception e) { logger.error("[influxDB.close exception]", e); } } } return t; } public static void insert(String database, InfluxDBInsertCallback influxDBInsertCallback) { process(database, new InfluxDBCallback() { @Override public <T> T doCallBack(String database, InfluxDB influxDB) { influxDBInsertCallback.doCallBack(database, influxDB); return null; } }); } public static QueryResult query(String database, InfluxDBQueryCallback influxDBQueryCallback) { return process(database, new InfluxDBCallback() { @Override public <T> T doCallBack(String database, InfluxDB influxDB) { QueryResult queryResult = influxDBQueryCallback.doCallBack(database, influxDB); return (T) queryResult; } }); } public static <T> List<T> queryList(String database, String sql, Map<String, Object> paramMap, Class<T> clasz) { QueryResult queryResult = query(database, new InfluxDBQueryCallback() { @Override public QueryResult doCallBack(String database, InfluxDB influxDB) { BoundParameterQuery.QueryBuilder queryBuilder = BoundParameterQuery.QueryBuilder.newQuery(sql); queryBuilder.forDatabase(database); if (paramMap != null && paramMap.size() > 0) { Set<Map.Entry<String, Object>> entries = paramMap.entrySet(); for (Map.Entry<String, Object> entry : entries) { queryBuilder.bind(entry.getKey(), entry.getValue()); } } return influxDB.query(queryBuilder.create()); } }); return resultMapper.toPOJO(queryResult, clasz); } public interface InfluxDBCallback { <T> T doCallBack(String database, InfluxDB influxDB); } public interface InfluxDBInsertCallback { void doCallBack(String database, InfluxDB influxDB); } public interface InfluxDBQueryCallback { QueryResult doCallBack(String database, InfluxDB influxDB); } }
其中:
url、username、password用於存儲InfluxDB的連接、用戶名、密碼信息,定義為static屬性,因此在set方法上使用@Value注解從配置文件讀取屬性值;
resultMapper用於查詢結果到實體類的映射;
init方法用於初始化url、username、password;
process為通用的處理方法,負責打開關閉連接,並且調用InfluxDBCallback回調方法;
insert為插入數據方法,配合InfluxDBInsertCallback回調使用;
query為通用的查詢方法,配合InfluxDBQueryCallback回調方法使用,返回QueryResult對象;
queryList為查詢列表方法,調用query得到QueryResult,再通過resultMapper轉換為List<實體類>;
在resources目錄下的application.properties文件中,增加InfluxDB的配置:
influxdb.url=${influxdb.url} influxdb.username=${influxdb.username} influxdb.password=${influxdb.password}
用${xxx}占位符,這樣可以通過maven的pom.xml添加profile配置不同環境(開發、測試、生產) 或 從配置中心讀取參數。
在datasource.entity包下,新建influxdb包,下面新建sentinel_metric數據表(measurement)對應的實體類MetricPO:
package com.taobao.csp.sentinel.dashboard.datasource.entity.influxdb; import org.influxdb.annotation.Column; import org.influxdb.annotation.Measurement; import java.time.Instant; /** * @author cdfive * @date 2018-10-19 */ @Measurement(name = "sentinel_metric") public class MetricPO { @Column(name = "time") private Instant time; @Column(name = "id") private Long id; @Column(name = "gmtCreate") private Long gmtCreate; @Column(name = "gmtModified") private Long gmtModified; @Column(name = "app", tag = true) private String app; @Column(name = "resource", tag = true) private String resource; @Column(name = "passQps") private Long passQps; @Column(name = "successQps") private Long successQps; @Column(name = "blockQps") private Long blockQps; @Column(name = "exceptionQps") private Long exceptionQps; @Column(name = "rt") private double rt; @Column(name = "count") private int count; @Column(name = "resourceCode") private int resourceCode; // getter setter省略 }
該類參考MetricEntity創建,加上influxdb-java包提供的注解,通過@Measurement(name = "sentinel_metric")指定數據表(measurement)名稱,
time作為時序數據庫的時間列;
app、resource設置為tag列,通過注解標識為tag=true;
其它字段為filed列;
接着在InMemoryMetricsRepository所在的repository.metric包下新建InfluxDBMetricsRepository類,實現MetricsRepository<MetricEntity>接口:
package com.taobao.csp.sentinel.dashboard.repository.metric; import com.alibaba.csp.sentinel.util.StringUtil; import com.taobao.csp.sentinel.dashboard.datasource.entity.MetricEntity; import com.taobao.csp.sentinel.dashboard.datasource.entity.influxdb.MetricPO; import com.taobao.csp.sentinel.dashboard.util.InfluxDBUtils; import org.apache.commons.lang.time.DateFormatUtils; import org.apache.commons.lang.time.DateUtils; import org.influxdb.InfluxDB; import org.influxdb.dto.Point; import org.springframework.stereotype.Repository; import org.springframework.util.CollectionUtils; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * metrics數據InfluxDB存儲實現 * @author cdfive * @date 2018-10-19 */ @Repository("influxDBMetricsRepository") public class InfluxDBMetricsRepository implements MetricsRepository<MetricEntity> { /**時間格式*/ private static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS"; /**數據庫名稱*/ private static final String SENTINEL_DATABASE = "sentinel_db"; /**數據表名稱*/ private static final String METRIC_MEASUREMENT = "sentinel_metric"; /**北京時間領先UTC時間8小時 UTC: Universal Time Coordinated,世界統一時間*/ private static final Integer UTC_8 = 8; @Override public void save(MetricEntity metric) { if (metric == null || StringUtil.isBlank(metric.getApp())) { return; } InfluxDBUtils.insert(SENTINEL_DATABASE, new InfluxDBUtils.InfluxDBInsertCallback() { @Override public void doCallBack(String database, InfluxDB influxDB) { if (metric.getId() == null) { metric.setId(System.currentTimeMillis()); } doSave(influxDB, metric); } }); } @Override public void saveAll(Iterable<MetricEntity> metrics) { if (metrics == null) { return; } Iterator<MetricEntity> iterator = metrics.iterator(); boolean next = iterator.hasNext(); if (!next) { return; } InfluxDBUtils.insert(SENTINEL_DATABASE, new InfluxDBUtils.InfluxDBInsertCallback() { @Override public void doCallBack(String database, InfluxDB influxDB) { while (iterator.hasNext()) { MetricEntity metric = iterator.next(); if (metric.getId() == null) { metric.setId(System.currentTimeMillis()); } doSave(influxDB, metric); } } }); } @Override public List<MetricEntity> queryByAppAndResourceBetween(String app, String resource, long startTime, long endTime) { List<MetricEntity> results = new ArrayList<MetricEntity>(); if (StringUtil.isBlank(app)) { return results; } if (StringUtil.isBlank(resource)) { return results; } StringBuilder sql = new StringBuilder(); sql.append("SELECT * FROM " + METRIC_MEASUREMENT); sql.append(" WHERE app=$app"); sql.append(" AND resource=$resource"); sql.append(" AND time>=$startTime"); sql.append(" AND time<=$endTime"); Map<String, Object> paramMap = new HashMap<String, Object>(); paramMap.put("app", app); paramMap.put("resource", resource); paramMap.put("startTime", DateFormatUtils.format(new Date(startTime), DATE_FORMAT_PATTERN)); paramMap.put("endTime", DateFormatUtils.format(new Date(endTime), DATE_FORMAT_PATTERN)); List<MetricPO> metricPOS = InfluxDBUtils.queryList(SENTINEL_DATABASE, sql.toString(), paramMap, MetricPO.class); if (CollectionUtils.isEmpty(metricPOS)) { return results; } for (MetricPO metricPO : metricPOS) { results.add(convertToMetricEntity(metricPO)); } return results; } @Override public List<String> listResourcesOfApp(String app) { List<String> results = new ArrayList<>(); if (StringUtil.isBlank(app)) { return results; } StringBuilder sql = new StringBuilder(); sql.append("SELECT * FROM " + METRIC_MEASUREMENT); sql.append(" WHERE app=$app"); sql.append(" AND time>=$startTime"); Map<String, Object> paramMap = new HashMap<String, Object>(); long startTime = System.currentTimeMillis() - 1000 * 60; paramMap.put("app", app); paramMap.put("startTime", DateFormatUtils.format(new Date(startTime), DATE_FORMAT_PATTERN)); List<MetricPO> metricPOS = InfluxDBUtils.queryList(SENTINEL_DATABASE, sql.toString(), paramMap, MetricPO.class); if (CollectionUtils.isEmpty(metricPOS)) { return results; } List<MetricEntity> metricEntities = new ArrayList<MetricEntity>(); for (MetricPO metricPO : metricPOS) { metricEntities.add(convertToMetricEntity(metricPO)); } Map<String, MetricEntity> resourceCount = new HashMap<>(32); for (MetricEntity metricEntity : metricEntities) { String resource = metricEntity.getResource(); if (resourceCount.containsKey(resource)) { MetricEntity oldEntity = resourceCount.get(resource); oldEntity.addPassQps(metricEntity.getPassQps()); oldEntity.addRtAndSuccessQps(metricEntity.getRt(), metricEntity.getSuccessQps()); oldEntity.addBlockQps(metricEntity.getBlockQps()); oldEntity.addExceptionQps(metricEntity.getExceptionQps()); oldEntity.addCount(1); } else { resourceCount.put(resource, MetricEntity.copyOf(metricEntity)); } } // Order by last minute b_qps DESC. return resourceCount.entrySet() .stream() .sorted((o1, o2) -> { MetricEntity e1 = o1.getValue(); MetricEntity e2 = o2.getValue(); int t = e2.getBlockQps().compareTo(e1.getBlockQps()); if (t != 0) { return t; } return e2.getPassQps().compareTo(e1.getPassQps()); }) .map(Map.Entry::getKey) .collect(Collectors.toList()); } private MetricEntity convertToMetricEntity(MetricPO metricPO) { MetricEntity metricEntity = new MetricEntity(); metricEntity.setId(metricPO.getId()); metricEntity.setGmtCreate(new Date(metricPO.getGmtCreate())); metricEntity.setGmtModified(new Date(metricPO.getGmtModified())); metricEntity.setApp(metricPO.getApp()); metricEntity.setTimestamp(Date.from(metricPO.getTime().minusMillis(TimeUnit.HOURS.toMillis(UTC_8))));// 查詢數據減8小時 metricEntity.setResource(metricPO.getResource()); metricEntity.setPassQps(metricPO.getPassQps()); metricEntity.setSuccessQps(metricPO.getSuccessQps()); metricEntity.setBlockQps(metricPO.getBlockQps()); metricEntity.setExceptionQps(metricPO.getExceptionQps()); metricEntity.setRt(metricPO.getRt()); metricEntity.setCount(metricPO.getCount()); return metricEntity; } private void doSave(InfluxDB influxDB, MetricEntity metric) { influxDB.write(Point.measurement(METRIC_MEASUREMENT) .time(DateUtils.addHours(metric.getTimestamp(), UTC_8).getTime(), TimeUnit.MILLISECONDS)// 因InfluxDB默認UTC時間,按北京時間算寫入數據加8小時 .tag("app", metric.getApp()) .tag("resource", metric.getResource()) .addField("id", metric.getId()) .addField("gmtCreate", metric.getGmtCreate().getTime()) .addField("gmtModified", metric.getGmtModified().getTime()) .addField("passQps", metric.getPassQps()) .addField("successQps", metric.getSuccessQps()) .addField("blockQps", metric.getBlockQps()) .addField("exceptionQps", metric.getExceptionQps()) .addField("rt", metric.getRt()) .addField("count", metric.getCount()) .addField("resourceCode", metric.getResourceCode()) .build()); } }
其中:
save、saveAll方法通過調用InfluxDBUtils.insert和InfluxDBInsertCallback回調方法,往sentinel_db庫的sentinel_metric數據表寫數據;
saveAll方法不是循環調用save方法,而是在回調內部循環Iterable<MetricEntity> metrics處理,這樣InfluxDBFactory.connect連接只打開關閉一次;
doSave方法中,.time(DateUtils.addHours(metric.getTimestamp(), 8).getTime(), TimeUnit.MILLISECONDS)
因InfluxDB的UTC時間暫時沒找到修改方法,所以這里time時間列加了8個小時時差;
queryByAppAndResourceBetween、listResourcesOfApp里面的查詢方法,使用InfluxDB提供的類sql語法,編寫查詢語句即可。
最后一步,在MetricController、MetricFetcher兩個類,找到metricStore屬性,在@Autowired注解上面加上@Qualifier("jpaMetricsRepository")注解:
@Qualifier("influxDBMetricsRepository") @Autowired private MetricsRepository<MetricEntity> metricStore;
來驗證下成果:
設置sentinel-dashboard工程啟動參數:-Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard
啟動工程,打開http://localhost:8080,查看各頁面均顯示正常,
在命令行通過InfluxDB客戶端命令,show measurements,可以看到已經生成了sentinel_metric數據表(measurement);
查詢總數:select count(id) from sentinel_metric
查詢最新5行數據:select * from sentinel_metric order by time desc limit 5
注:命令行語句結束不用加分號
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
代碼參考:https://github.com/cdfive/Sentinel/tree/winxuan_develop/sentinel-dashboard
擴展:
1.考慮以什么時間維度歸檔歷史數據;
2.結合grafana將監控數據進行多維度的統計和呈現。
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
參考:
Sentinel官方文檔:
https://github.com/alibaba/Sentinel/wiki/控制台
https://github.com/alibaba/Sentinel/wiki/在生產環境中使用-Sentinel-控制台
InfluxDB官網文檔 https://docs.influxdata.com/influxdb/v1.6/introduction/getting-started/
InfluxDB簡明手冊 https://xtutu.gitbooks.io/influxdb-handbook/content/