以查詢 Metrics
信息案例來分析 Skywalking
查詢協議
基本概述
Skywalking
查詢協議默認基於 GraphQL
,如果有需要也可以自定義擴展,提供一個實現了 org.apache.skywalking.oap.server.core.query.QueryModule
的查詢模塊即可。
截取 Skywalking UI
發送的請求
- 請求路徑
POST http://127.0.0.1:8080/graphql
- 請求體
{
"query": "query queryData($condition: MetricsCondition!, $duration: Duration!) {\n readMetricsValues: readMetricsValues(condition: $condition, duration: $duration) {\n label\n values {\n values {value}\n }\n }}",
"variables": {
"duration": {
"start": "2021-07-03 1320",
"end": "2021-07-03 1321",
"step": "MINUTE"
},
"condition": {
"name": "instance_jvm_thread_runnable_thread_count",
"entity": {
"scope": "ServiceInstance",
"serviceName": "business-zone::projectA",
"serviceInstanceName": "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
"normal": true
}
}
}
}
- 響應
{
"data": {
"readMetricsValues": {
"values": {
"values": [
{
"value": 22
},
{
"value": 22
}
]
}
}
}
}
在 Skywalking
源碼中找到對應 GraphQL
定義
打開 oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
目錄,使用請求體中的模板關鍵字 readMetricsValues
搜索
在 oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol/metrics-v2.graphqls
中找到對應的定義
extend type Query {
# etc...
# Read time-series values in the duration of required metrics
readMetricsValues(condition: MetricsCondition!, duration: Duration!): MetricsValues!
# etc...
}
輸入參數定義
input MetricsCondition {
# Metrics name, which should be defined in OAL script
# Such as:
# Endpoint_avg = from(Endpoint.latency).avg()
# Then, `Endpoint_avg`
name: String!
# Follow entity definition description.
entity: Entity!
}
input Entity {
# 1. scope=All, no name is required.
# 2. scope=Service, ServiceInstance and Endpoint, set neccessary serviceName/serviceInstanceName/endpointName
# 3. Scope=ServiceRelation, ServiceInstanceRelation and EndpointRelation
# serviceName/serviceInstanceName/endpointName is/are the source(s)
# destServiceName/destServiceInstanceName/destEndpointName is/are destination(s)
# set necessary names of sources and destinations.
scope: Scope!
serviceName: String
# Normal service is the service having installed agent or metrics reported directly.
# Unnormal service is conjectural service, usually detected by the agent.
normal: Boolean
serviceInstanceName: String
endpointName: String
destServiceName: String
# Normal service is the service having installed agent or metrics reported directly.
# Unnormal service is conjectural service, usually detected by the agent.
destNormal: Boolean
destServiceInstanceName: String
destEndpointName: String
}
# The Duration defines the start and end time for each query operation.
# Fields: `start` and `end`
# represents the time span. And each of them matches the step.
# ref https://www.ietf.org/rfc/rfc3339.txt
# The time formats are
# `SECOND` step: yyyy-MM-dd HHmmss
# `MINUTE` step: yyyy-MM-dd HHmm
# `HOUR` step: yyyy-MM-dd HH
# `DAY` step: yyyy-MM-dd
# `MONTH` step: yyyy-MM
# Field: `step`
# represents the accurate time point.
# e.g.
# if step==HOUR , start=2017-11-08 09, end=2017-11-08 19
# then
# metrics from the following time points expected
# 2017-11-08 9:00 -> 2017-11-08 19:00
# there are 11 time points (hours) in the time span.
input Duration {
start: String!
end: String!
step: Step!
}
enum Step {
DAY
HOUR
MINUTE
SECOND
}
返回結果定義
type MetricsValues {
# Could be null if no label assigned in the query condition
label: String
# Values of this label value.
values: IntValues
}
type IntValues {
values: [KVInt!]!
}
type KVInt {
id: ID!
# This is the value, the caller must understand the Unit.
# Such as:
# 1. If ask for cpm metric, the unit and result should be count.
# 2. If ask for response time (p99 or avg), the unit should be millisecond.
value: Long!
}
使用 GraphQL
IDEA
插件驗證 Skywalking UI
的請求
使用“ GraphQL
在 Skywalking
中的應用”一節中的方式,模仿“截取 Skywalking UI 發送的請求”一節中前端發送的請求
- 請求模板
query queryData($condition: MetricsCondition!, $duration: Duration!) {
readMetricsValues: readMetricsValues(duration: $duration, condition: $condition) {
label values { values { id value }}
}
}
- 請求參數
{
"duration": {
"start": "2021-07-03 1400",
"end": "2021-07-03 1401",
"step": "MINUTE"
},
"condition": {
"name": "instance_jvm_thread_runnable_thread_count",
"entity": {
"scope": "ServiceInstance",
"serviceName": "business-zone::projectA",
"serviceInstanceName": "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
"normal": true
}
}
}
- 響應結果
{
"data": {
"readMetricsValues": {
"values": {
"values": [
{
"id": "202107031400_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=.1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=",
"value": 22
},
{
"id": "202107031401_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=.1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=",
"value": 22
}
]
}
}
}
}
PS:如果不使用模板的方式,寫查詢語句是會有代碼提示的
query queryData {
readMetricsValues(
duration: {start: "2021-07-03 1400",end: "2021-07-03 1401", step: MINUTE},
condition: {
name: "instance_jvm_thread_runnable_thread_count",
entity: {
scope: ServiceInstance,
serviceName: "business-zone::projectA",
serviceInstanceName: "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
normal: true
}
}
) {
label values{ values{ id value }}
}
}
如何將 GraphQL Schema
文件加載到程序中
搜索 metrics-v2.graphqls
,在 oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java
找到加載代碼
// 初始化GraphQL引擎
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
GraphQLSchema schema = SchemaParser.newParser()
// etc...
.file("query-protocol/metrics-v2.graphqls")
.resolvers(new MetricsQuery(getManager())) // MetricsQuery 是 com.coxautodev.graphql.tools.GraphQLQueryResolver 接口實現類
// etc...
.build()
.makeExecutableSchema();
this.graphQL = GraphQL.newGraphQL(schema).build();
}
在 org.apache.skywalking.oap.query.graphql.resolver.MetricsQuery
類中,找到 readMetricsValues
方法
/**
* Read time-series values in the duration of required metrics
*/
public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException {
if (MetricsType.UNKNOWN.equals(typeOfMetrics(condition.getName())) || !condition.getEntity().isValid()) {
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
MetricsValues values = new MetricsValues();
pointOfTimes.forEach(pointOfTime -> {
String id = pointOfTime.id(
condition.getEntity().isValid() ? condition.getEntity().buildId() : "ILLEGAL_ENTITY"
);
final KVInt kvInt = new KVInt();
kvInt.setId(id);
kvInt.setValue(0);
values.getValues().addKVInt(kvInt);
});
return values;
}
return getMetricsQueryService().readMetricsValues(condition, duration);
}
private MetricsQueryService getMetricsQueryService() {
if (metricsQueryService == null) {
this.metricsQueryService = moduleManager.find(CoreModule.NAME)
.provider()
.getService(MetricsQueryService.class);
}
return metricsQueryService;
}
org.apache.skywalking.oap.server.core.query.MetricsQueryService#readMetricsValues
/**
* Read time-series values in the duration of required metrics
*/
public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException {
return getMetricQueryDAO().readMetricsValues(
condition, ValueColumnMetadata.INSTANCE.getValueCName(condition.getName()), duration);
}
private IMetricsQueryDAO getMetricQueryDAO() {
if (metricQueryDAO == null) {
metricQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IMetricsQueryDAO.class);
}
return metricQueryDAO;
}
查看Extend storage文檔, IMetricsQueryDAO
為指標查詢數據訪問對象
# Implement all DAOs
# Here is the list of all DAO interfaces in storage
IServiceInventoryCacheDAO
IServiceInstanceInventoryCacheDAO
IEndpointInventoryCacheDAO
INetworkAddressInventoryCacheDAO
IBatchDAO
StorageDAO
IRegisterLockDAO
ITopologyQueryDAO
IMetricsQueryDAO
ITraceQueryDAO
IMetadataQueryDAO
IAggregationQueryDAO
IAlarmQueryDAO
IHistoryDeleteDAO
IMetricsDAO
IRecordDAO
IRegisterDAO
ILogQueryDAO
ITopNRecordsQueryDAO
IBrowserLogQueryDAO
通過類圖,可以看出 IMetricsQueryDAO
實現類有 ES
、 ES7
、 InfluxDB
、 SQL
四種
如何將 GraphQL
引擎注冊到 Jetty
服務
// 注冊GraphQL查詢處理器至Jetty服務
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
JettyHandlerRegister service = getManager().find(CoreModule.NAME)
.provider()
.getService(JettyHandlerRegister.class);
service.addHandler(new GraphQLQueryHandler(config.getPath(), graphQL));
}
通過分析 GraphQLQueryProvider
該類,發現就是 QueryModule
(查詢模塊)的 Provider
(提供)類
由此,也驗證了在“基本概述”一節的說法:
Skywalking
查詢協議默認基於GraphQL
,如果有需要也可以自定義擴展,提供一個實現了org.apache.skywalking.oap.server.core.query.QueryModule
的查詢模塊即可。
@Override
public String name() {
return "graphql";
}
@Override
public Class<? extends ModuleDefine> module() {
return QueryModule.class;
}
package org.apache.skywalking.oap.query.graphql;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import graphql.ExecutionInput;
import graphql.ExecutionResult;
import graphql.GraphQL;
import graphql.GraphQLError;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.library.server.jetty.JettyJsonHandler;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RequiredArgsConstructor
public class GraphQLQueryHandler extends JettyJsonHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(GraphQLQueryHandler.class);
private static final String QUERY = "query";
private static final String VARIABLES = "variables";
private static final String DATA = "data";
private static final String ERRORS = "errors";
private static final String MESSAGE = "message";
private final Gson gson = new Gson();
private final Type mapOfStringObjectType = new TypeToken<Map<String, Object>>() {
}.getType();
private final String path;
private final GraphQL graphQL;
@Override
public String pathSpec() {
return path;
}
@Override
protected JsonElement doGet(HttpServletRequest req) {
throw new UnsupportedOperationException("GraphQL only supports POST method");
}
@Override
protected JsonElement doPost(HttpServletRequest req) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(req.getInputStream()));
String line;
StringBuilder request = new StringBuilder();
while ((line = reader.readLine()) != null) {
request.append(line);
}
JsonObject requestJson = gson.fromJson(request.toString(), JsonObject.class);
return execute(requestJson.get(QUERY)
.getAsString(), gson.fromJson(requestJson.get(VARIABLES), mapOfStringObjectType));
}
private JsonObject execute(String request, Map<String, Object> variables) {
try {
ExecutionInput executionInput = ExecutionInput.newExecutionInput()
.query(request)
.variables(variables)
.build();
// 使用GraphQL引擎獲取查詢結果
ExecutionResult executionResult = graphQL.execute(executionInput);
LOGGER.debug("Execution result is {}", executionResult);
// 封裝返回結果
Object data = executionResult.getData();
List<GraphQLError> errors = executionResult.getErrors();
JsonObject jsonObject = new JsonObject();
if (data != null) {
jsonObject.add(DATA, gson.fromJson(gson.toJson(data), JsonObject.class));
}
if (CollectionUtils.isNotEmpty(errors)) {
JsonArray errorArray = new JsonArray();
errors.forEach(error -> {
JsonObject errorJson = new JsonObject();
errorJson.addProperty(MESSAGE, error.getMessage());
errorArray.add(errorJson);
});
jsonObject.add(ERRORS, errorArray);
}
return jsonObject;
} catch (final Throwable e) {
LOGGER.error(e.getMessage(), e);
JsonObject jsonObject = new JsonObject();
JsonArray errorArray = new JsonArray();
JsonObject errorJson = new JsonObject();
errorJson.addProperty(MESSAGE, e.getMessage());
errorArray.add(errorJson);
jsonObject.add(ERRORS, errorArray);
return jsonObject;
}
}
}
Webapp
網關轉發 GraphQL
請求至 OAP
v8.6.0
及之前,網關都是 zuul
, v8.7.0
及之后替換成了 Spring Cloud Gateway
。因為這塊不是這篇文章的重點,這里不再贅述
總結
Skywalking
的查詢協議默認使用通用性很強的 GraphQL
實現,客戶端可以通過 GraphQL
協議很方便的選取自己需要的數據。
對應 Skywalking
這種模式相對固定、變更不頻繁的查詢需求來說,還是挺適合的。