Skywalking-11:Skywalking查詢協議——案例分析


以查詢 Metrics 信息案例來分析 Skywalking 查詢協議

基本概述

Skywalking 查詢協議默認基於 GraphQL ,如果有需要也可以自定義擴展,提供一個實現了 org.apache.skywalking.oap.server.core.query.QueryModule 的查詢模塊即可。

截取 Skywalking UI 發送的請求

  • 請求路徑
POST http://127.0.0.1:8080/graphql
  • 請求體
{
  "query": "query queryData($condition: MetricsCondition!, $duration: Duration!) {\n  readMetricsValues: readMetricsValues(condition: $condition, duration: $duration) {\n    label\n    values {\n      values {value}\n    }\n  }}",
  "variables": {
    "duration": {
      "start": "2021-07-03 1320",
      "end": "2021-07-03 1321",
      "step": "MINUTE"
    },
    "condition": {
      "name": "instance_jvm_thread_runnable_thread_count",
      "entity": {
        "scope": "ServiceInstance",
        "serviceName": "business-zone::projectA",
        "serviceInstanceName": "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
        "normal": true
      }
    }
  }
}
  • 響應
{
  "data": {
    "readMetricsValues": {
      "values": {
        "values": [
          {
            "value": 22
          },
          {
            "value": 22
          }
        ]
      }
    }
  }
}

Skywalking 源碼中找到對應 GraphQL 定義

打開 oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol 目錄,使用請求體中的模板關鍵字 readMetricsValues 搜索
oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol/metrics-v2.graphqls 中找到對應的定義

extend type Query {
    # etc...
    # Read time-series values in the duration of required metrics
    readMetricsValues(condition: MetricsCondition!, duration: Duration!): MetricsValues!
    # etc...
}

輸入參數定義

input MetricsCondition {
    # Metrics name, which should be defined in OAL script
    # Such as:
    # Endpoint_avg = from(Endpoint.latency).avg()
    # Then, `Endpoint_avg`
    name: String!
    # Follow entity definition description.
    entity: Entity!
}

input Entity {
    # 1. scope=All, no name is required.
    # 2. scope=Service, ServiceInstance and Endpoint, set neccessary serviceName/serviceInstanceName/endpointName
    # 3. Scope=ServiceRelation, ServiceInstanceRelation and EndpointRelation
    #    serviceName/serviceInstanceName/endpointName is/are the source(s)
    #    destServiceName/destServiceInstanceName/destEndpointName is/are destination(s)
    #    set necessary names of sources and destinations.
    scope: Scope!
    serviceName: String
    # Normal service is the service having installed agent or metrics reported directly.
    # Unnormal service is conjectural service, usually detected by the agent.
    normal: Boolean
    serviceInstanceName: String
    endpointName: String
    destServiceName: String
    # Normal service is the service having installed agent or metrics reported directly.
    # Unnormal service is conjectural service, usually detected by the agent.
    destNormal: Boolean
    destServiceInstanceName: String
    destEndpointName: String
}

# The Duration defines the start and end time for each query operation.
# Fields: `start` and `end`
#   represents the time span. And each of them matches the step.
#   ref https://www.ietf.org/rfc/rfc3339.txt
#   The time formats are
#       `SECOND` step: yyyy-MM-dd HHmmss
#       `MINUTE` step: yyyy-MM-dd HHmm
#       `HOUR` step: yyyy-MM-dd HH
#       `DAY` step: yyyy-MM-dd
#       `MONTH` step: yyyy-MM
# Field: `step`
#   represents the accurate time point.
# e.g.
#   if step==HOUR , start=2017-11-08 09, end=2017-11-08 19
#   then
#       metrics from the following time points expected
#       2017-11-08 9:00 -> 2017-11-08 19:00
#       there are 11 time points (hours) in the time span.
input Duration {
    start: String!
    end: String!
    step: Step!
}

enum Step {
    DAY
    HOUR
    MINUTE
    SECOND
}

返回結果定義

type MetricsValues {
    # Could be null if no label assigned in the query condition
    label: String
    # Values of this label value.
    values: IntValues
}

type IntValues {
    values: [KVInt!]!
}

type KVInt {
    id: ID!
    # This is the value, the caller must understand the Unit.
    # Such as:
    # 1. If ask for cpm metric, the unit and result should be count.
    # 2. If ask for response time (p99 or avg), the unit should be millisecond.
    value: Long!
}

使用 GraphQL IDEA 插件驗證 Skywalking UI 的請求

使用“ GraphQL 在 Skywalking 中的應用”一節中的方式,模仿“截取 Skywalking UI 發送的請求”一節中前端發送的請求

  • 請求模板
query queryData($condition: MetricsCondition!, $duration: Duration!) {
    readMetricsValues: readMetricsValues(duration: $duration, condition: $condition) {
        label values { values { id value }}
    }
}
  • 請求參數
{
  "duration": {
    "start": "2021-07-03 1400",
    "end": "2021-07-03 1401", 
    "step": "MINUTE"
  },
  "condition": {
    "name": "instance_jvm_thread_runnable_thread_count",
    "entity": {
      "scope": "ServiceInstance",
      "serviceName": "business-zone::projectA",
      "serviceInstanceName": "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
      "normal": true
    }
  }
}
  • 響應結果
{
  "data": {
    "readMetricsValues": {
      "values": {
        "values": [
          {
            "id": "202107031400_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=.1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=",
            "value": 22
          },
          {
            "id": "202107031401_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=.1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=",
            "value": 22
          }
        ]
      }
    }
  }
}

PS:如果不使用模板的方式,寫查詢語句是會有代碼提示的

query queryData {
    readMetricsValues(
        duration: {start: "2021-07-03 1400",end: "2021-07-03 1401", step: MINUTE},
        condition: {
            name: "instance_jvm_thread_runnable_thread_count",
            entity: {
                scope: ServiceInstance,
                serviceName: "business-zone::projectA",
                serviceInstanceName: "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
                normal: true
            }
        }
    ) {
        label values{ values{ id value }}
    }
}

如何將 GraphQL Schema 文件加載到程序中

搜索 metrics-v2.graphqls ,在 oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java 找到加載代碼

    // 初始化GraphQL引擎
    @Override
    public void prepare() throws ServiceNotProvidedException, ModuleStartException {
        GraphQLSchema schema = SchemaParser.newParser()
                                           // etc...
                                           .file("query-protocol/metrics-v2.graphqls")
                                           .resolvers(new MetricsQuery(getManager())) // MetricsQuery 是 com.coxautodev.graphql.tools.GraphQLQueryResolver 接口實現類
                                           // etc...
                                           .build()
                                           .makeExecutableSchema();
        this.graphQL = GraphQL.newGraphQL(schema).build();
    }

org.apache.skywalking.oap.query.graphql.resolver.MetricsQuery 類中,找到 readMetricsValues 方法

    /**
     * Read time-series values in the duration of required metrics
     */
    public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException {
        if (MetricsType.UNKNOWN.equals(typeOfMetrics(condition.getName())) || !condition.getEntity().isValid()) {
            final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
            MetricsValues values = new MetricsValues();
            pointOfTimes.forEach(pointOfTime -> {
                String id = pointOfTime.id(
                    condition.getEntity().isValid() ? condition.getEntity().buildId() : "ILLEGAL_ENTITY"
                );
                final KVInt kvInt = new KVInt();
                kvInt.setId(id);
                kvInt.setValue(0);
                values.getValues().addKVInt(kvInt);
            });
            return values;
        }
        return getMetricsQueryService().readMetricsValues(condition, duration);
    }

    private MetricsQueryService getMetricsQueryService() {
        if (metricsQueryService == null) {
            this.metricsQueryService = moduleManager.find(CoreModule.NAME)
                                                    .provider()
                                                    .getService(MetricsQueryService.class);
        }
        return metricsQueryService;
    }

org.apache.skywalking.oap.server.core.query.MetricsQueryService#readMetricsValues

    /**
     * Read time-series values in the duration of required metrics
     */
    public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException {
        return getMetricQueryDAO().readMetricsValues(
            condition, ValueColumnMetadata.INSTANCE.getValueCName(condition.getName()), duration);
    }

    private IMetricsQueryDAO getMetricQueryDAO() {
        if (metricQueryDAO == null) {
            metricQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IMetricsQueryDAO.class);
        }
        return metricQueryDAO;
    }

查看Extend storage文檔, IMetricsQueryDAO 為指標查詢數據訪問對象

# Implement all DAOs
# Here is the list of all DAO interfaces in storage
IServiceInventoryCacheDAO
IServiceInstanceInventoryCacheDAO
IEndpointInventoryCacheDAO
INetworkAddressInventoryCacheDAO
IBatchDAO
StorageDAO
IRegisterLockDAO
ITopologyQueryDAO
IMetricsQueryDAO
ITraceQueryDAO
IMetadataQueryDAO
IAggregationQueryDAO
IAlarmQueryDAO
IHistoryDeleteDAO
IMetricsDAO
IRecordDAO
IRegisterDAO
ILogQueryDAO
ITopNRecordsQueryDAO
IBrowserLogQueryDAO

通過類圖,可以看出 IMetricsQueryDAO 實現類有 ES 、 ES7 、 InfluxDB 、 SQL 四種

如何將 GraphQL 引擎注冊到 Jetty 服務

    // 注冊GraphQL查詢處理器至Jetty服務
    @Override
    public void start() throws ServiceNotProvidedException, ModuleStartException {
        JettyHandlerRegister service = getManager().find(CoreModule.NAME)
                                                   .provider()
                                                   .getService(JettyHandlerRegister.class);
        service.addHandler(new GraphQLQueryHandler(config.getPath(), graphQL));
    }

通過分析 GraphQLQueryProvider 該類,發現就是 QueryModule (查詢模塊)的 Provider (提供)類

由此,也驗證了在“基本概述”一節的說法:

Skywalking 查詢協議默認基於 GraphQL ,如果有需要也可以自定義擴展,提供一個實現了 org.apache.skywalking.oap.server.core.query.QueryModule 的查詢模塊即可。

    @Override
    public String name() {
        return "graphql";
    }

    @Override
    public Class<? extends ModuleDefine> module() {
        return QueryModule.class;
    }
package org.apache.skywalking.oap.query.graphql;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import graphql.ExecutionInput;
import graphql.ExecutionResult;
import graphql.GraphQL;
import graphql.GraphQLError;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.library.server.jetty.JettyJsonHandler;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RequiredArgsConstructor
public class GraphQLQueryHandler extends JettyJsonHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(GraphQLQueryHandler.class);

    private static final String QUERY = "query";
    private static final String VARIABLES = "variables";
    private static final String DATA = "data";
    private static final String ERRORS = "errors";
    private static final String MESSAGE = "message";

    private final Gson gson = new Gson();
    private final Type mapOfStringObjectType = new TypeToken<Map<String, Object>>() {
    }.getType();

    private final String path;

    private final GraphQL graphQL;

    @Override
    public String pathSpec() {
        return path;
    }

    @Override
    protected JsonElement doGet(HttpServletRequest req) {
        throw new UnsupportedOperationException("GraphQL only supports POST method");
    }

    @Override
    protected JsonElement doPost(HttpServletRequest req) throws IOException {
        BufferedReader reader = new BufferedReader(new InputStreamReader(req.getInputStream()));
        String line;
        StringBuilder request = new StringBuilder();
        while ((line = reader.readLine()) != null) {
            request.append(line);
        }

        JsonObject requestJson = gson.fromJson(request.toString(), JsonObject.class);

        return execute(requestJson.get(QUERY)
                                  .getAsString(), gson.fromJson(requestJson.get(VARIABLES), mapOfStringObjectType));
    }

    private JsonObject execute(String request, Map<String, Object> variables) {
        try {
            ExecutionInput executionInput = ExecutionInput.newExecutionInput()
                                                          .query(request)
                                                          .variables(variables)
                                                          .build();
            // 使用GraphQL引擎獲取查詢結果
            ExecutionResult executionResult = graphQL.execute(executionInput);
            LOGGER.debug("Execution result is {}", executionResult);
            // 封裝返回結果
            Object data = executionResult.getData();
            List<GraphQLError> errors = executionResult.getErrors();
            JsonObject jsonObject = new JsonObject();
            if (data != null) {
                jsonObject.add(DATA, gson.fromJson(gson.toJson(data), JsonObject.class));
            }

            if (CollectionUtils.isNotEmpty(errors)) {
                JsonArray errorArray = new JsonArray();
                errors.forEach(error -> {
                    JsonObject errorJson = new JsonObject();
                    errorJson.addProperty(MESSAGE, error.getMessage());
                    errorArray.add(errorJson);
                });
                jsonObject.add(ERRORS, errorArray);
            }
            return jsonObject;
        } catch (final Throwable e) {
            LOGGER.error(e.getMessage(), e);
            JsonObject jsonObject = new JsonObject();
            JsonArray errorArray = new JsonArray();
            JsonObject errorJson = new JsonObject();
            errorJson.addProperty(MESSAGE, e.getMessage());
            errorArray.add(errorJson);
            jsonObject.add(ERRORS, errorArray);
            return jsonObject;
        }
    }
}

Webapp 網關轉發 GraphQL 請求至 OAP

v8.6.0 及之前,網關都是 zuul , v8.7.0 及之后替換成了 Spring Cloud Gateway 。因為這塊不是這篇文章的重點,這里不再贅述

總結

Skywalking 的查詢協議默認使用通用性很強的 GraphQL 實現,客戶端可以通過 GraphQL 協議很方便的選取自己需要的數據。
對應 Skywalking 這種模式相對固定、變更不頻繁的查詢需求來說,還是挺適合的。

參考文檔


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM