prometheus-接入方式simple-client&pushgateway&客戶端源碼解析


指標接入方式

官方源碼庫   https://github.com/prometheus/client_java

  • target自己采集指標,暴露出端口,  prometheusserver主動拉取數據
  • target主動推送到pushgateway, prometheus主動去pushgateway拉取

target 暴露端口的方式

本模式下有兩種實現

  • 普通采集指標,暴露接口的方式
  • 借助actuator和內置的micrometer, 然后使用prometheus-registry. --對於springboot項目, 這種比較方便業務埋點. 從springboot2.X開始, actuator內部集成了micrometer

simpleclient方式的使用方式和原理

  • 接入方式
<dependency>
  <groupId>io.prometheus</groupId>
  <artifactId>simpleclient</artifactId>
  <version>0.10.0</version>
</dependency>
<!-- Hotspot JVM metrics-->
<dependency>
  <groupId>io.prometheus</groupId>
  <artifactId>simpleclient_hotspot</artifactId>
  <version>0.10.0</version>
</dependency>
<!-- Exposition HTTPServer-->
<dependency>
  <groupId>io.prometheus</groupId>
  <artifactId>simpleclient_httpserver</artifactId>
  <version>0.10.0</version>
</dependency>
**<!-- Pushgateway exposition-->**
<!--
<dependency>
  <groupId>io.prometheus</groupId>
  <artifactId>simpleclient_pushgateway</artifactId>
  <version>0.10.0</version>
</dependency> 
-->

從上面的注釋也能看出, 加了pushgateway之后 , 就能推送到pushgateway了

  • 使用方式
    參見源碼中readme文件, 附有例子 ,我就不啰嗦了.
  • 原理
    • simpleclient_hotspot包主要用於采集jvm相關的指標信息
    • simpleclient 內部封裝了基本的數據結構和數據采集方式, 是最底層的邏輯
    • httpserver 負責將采集的數據暴露出去, 負責接收請求
  • 開胃菜-從prometheus的拉取開始 --httpserver部分

    可見httpserver部分只有一個類, 主要完成暴露接口, 接收請求的功能. 寫的短小精悍, 值得細讀, 對於希望做出一個輕量級(輕量級就意味着專注於核心功能)的web服務的需求來說,很合適.

  • NamedDaemonThreadFactory 線程池-線程工廠類
    static class NamedDaemonThreadFactory implements ThreadFactory {
        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);

        private final int poolNumber = POOL_NUMBER.getAndIncrement();
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final ThreadFactory delegate;
        private final boolean daemon;

        /**
         *  
         * @param delegate
         * @param daemon 一般都是false,設置為普通用戶線程.主線程退出,用戶線程還會繼續執行.
         *               如果設置為true , 設置為守護線程, 主線程退出, 守護現場也會銷毀, 垃圾回收線程就是守護線程.
         */
        NamedDaemonThreadFactory(ThreadFactory delegate, boolean daemon) {
            this.delegate = delegate;
            this.daemon = daemon;
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = delegate.newThread(r);
            t.setName(String.format("prometheus-http-%d-%d", poolNumber, threadNumber.getAndIncrement()));
            t.setDaemon(daemon);
            return t;
        }

        static ThreadFactory defaultThreadFactory(boolean daemon) {
            return new NamedDaemonThreadFactory(Executors.defaultThreadFactory(), daemon);
        }
    }
  • LocalByteArray 實現了threalocal,用於存儲/傳遞數據
    private static class LocalByteArray extends ThreadLocal<ByteArrayOutputStream> {
//        ByteArrayOutputStream 是一個基於字節數組的輸出流
        @Override
        protected ByteArrayOutputStream initialValue()
        {
            return new ByteArrayOutputStream(1 << 20);
        }
    }
  • HTTPMetricHandler 處理http請求
    /**
     * Handles Metrics collections from the given registry.
     * registry位於simpleclient包
     */
    static class HTTPMetricHandler implements HttpHandler {
        private final CollectorRegistry registry;
        //LocalByteArray 是一個threadlocal對象,
        private final LocalByteArray response = new LocalByteArray();
        private final static String HEALTHY_RESPONSE = "Exporter is Healthy.";

        HTTPMetricHandler(CollectorRegistry registry) {
          this.registry = registry;
        }

        @Override
        public void handle(HttpExchange t) throws IOException {
            String query = t.getRequestURI().getRawQuery();

            String contextPath = t.getHttpContext().getPath();
            ByteArrayOutputStream response = this.response.get();
            //清空輸出流
            response.reset();
            OutputStreamWriter osw = new OutputStreamWriter(response, Charset.forName("UTF-8"));
            if ("/-/healthy".equals(contextPath)) {//如果是健康檢查; 可見prometheus的健康檢查是基於http請求的
                osw.write(HEALTHY_RESPONSE);
            } else {
                String contentType = TextFormat.chooseContentType(t.getRequestHeaders().getFirst("Accept"));
                t.getResponseHeaders().set("Content-Type", contentType);
                // 重頭戲!!! 從registry中取出數據, 寫入到輸出流中, 
                //得到的是 Enumeration<Collector.MetricFamilySamples>
                TextFormat.writeFormat(contentType, osw,
                        registry.filteredMetricFamilySamples(parseQuery(query)));
            }

            osw.close();
            // 如果客戶端請求表示需要壓縮的話, 要進行壓縮
            if (shouldUseCompression(t)) {
                t.getResponseHeaders().set("Content-Encoding", "gzip");
                t.sendResponseHeaders(HttpURLConnection.HTTP_OK, 0);
                final GZIPOutputStream os = new GZIPOutputStream(t.getResponseBody());
                try {
                    response.writeTo(os);
                } finally {
                    os.close();
                }
            } else {
                t.getResponseHeaders().set("Content-Length",
                        String.valueOf(response.size()));
                t.sendResponseHeaders(HttpURLConnection.HTTP_OK, response.size());
                response.writeTo(t.getResponseBody());
            }
            t.close();
        }

    }

上面提到了一個 Enumeration ,這個接口用於遍歷集合

public interface Enumeration<E> {
    /**
     * Tests if this enumeration contains more elements.
     *
     * @return  <code>true</code> if and only if this enumeration object
     *           contains at least one more element to provide;
     *          <code>false</code> otherwise.
     */
    boolean hasMoreElements();

    /**
     * Returns the next element of this enumeration if this enumeration
     * object has at least one more element to provide.
     *
     * @return     the next element of this enumeration.
     * @exception  NoSuchElementException  if no more elements exist.
     */
    E nextElement();
}

成員變量介紹完了, 再看看構造方法

    /**
     * Start a HTTP server serving Prometheus metrics from the given registry using the given {@link HttpServer}.
     * The {@code httpServer} is expected to already be bound to an address
     */
    public HTTPServer(HttpServer httpServer, CollectorRegistry registry, boolean daemon) throws IOException {
        if (httpServer.getAddress() == null)
            throw new IllegalArgumentException("HttpServer hasn't been bound to an address");
        //httpserver是 com.sun.net.httpserver, 這個是jdk提供的類,包含了一系列用於構建web服務器的類
        server = httpServer;
        //httphandler用於傳給server, 處理請求
        HttpHandler mHandler = new HTTPMetricHandler(registry);
        //標記處理類
        server.createContext("/", mHandler);
        server.createContext("/metrics", mHandler);
        server.createContext("/-/healthy", mHandler);
        //真正處理任務的線程池, 由於主要是用於prometheus采集, 不需要很多線程
        executorService = Executors.newFixedThreadPool(5, NamedDaemonThreadFactory.defaultThreadFactory(daemon));
        server.setExecutor(executorService);
        start(daemon);
    }
  • http-server相關使用可參見下文
    https://www.cnblogs.com/aspwebchh/p/8300945.html
    可見, httpserver部分主要功能就是暴露端口, 提供web服務, 供prometheus拉取數據. 內部使用線程池處理請求 ,處理時, 使用handler從registry中獲取數據, 然后發出響應. handler使用threadlocal存放響應數據. 這個模塊的啟動方式就是 new HTTPServer(port);
    至於再哪兒啟動的 , 得看情況.

simple-client里面封裝了prometheus的指標類型 ,提供了model和存儲類,定義了collector(收集器), registry等

下面主要講講model之間的關系, 以及數據如何存儲,如何被取出.
記住 , 客戶端只是收集質變的當前值, 歷史值都在prometheus里面存儲,客戶端創建了指標之后, 只是再不斷更改它的值

  • 先說model
    啟動類 基本類型都定義再collector里面
    接口實現關系

主要類 :
Sample 單個樣本

 /**
   * A single Sample, with a unique name and set of labels.
   * 單個樣本
   */
    public static class Sample {
      public final String name;
      public final List<String> labelNames;
      public final List<String> labelValues;  // Must have same length as labelNames.
      public final double value;
      //每個樣本默認都會與時間戳
      public final Long timestampMs;  // It's an epoch format with milliseconds value included (this field is subject to change).

      public Sample(String name, List<String> labelNames, List<String> labelValues, double value, Long timestampMs) {
        this.name = name;
        this.labelNames = labelNames;
        this.labelValues = labelValues;
        this.value = value;
        this.timestampMs = timestampMs;
      }

MetricFamilySamples 真正的指標類

/**
   * A metric, and all of its samples.
   * 真正的指標類, 里面包含一系列樣本
   * 指標集樣本
   */
  static public class MetricFamilySamples {
    public final String name;
    public final String unit;
    public final Type type;
    public final String help;
    //樣本
    public final List<Sample> samples;

    public MetricFamilySamples(String name, String unit, Type type, String help, List<Sample> samples) {
      if (!unit.isEmpty() && !name.endsWith("_" + unit)) {
        throw new IllegalArgumentException("Metric's unit is not the suffix of the metric name: " + name);
      }
      if ((type == Type.INFO || type == Type.STATE_SET) && !unit.isEmpty()) {
        throw new IllegalArgumentException("Metric is of a type that cannot have a unit: " + name);
      }
      List<Sample> mungedSamples = samples;
      // Deal with _total from pre-OM automatically.
      if (type == Type.COUNTER) {
        if (name.endsWith("_total")) {
          name = name.substring(0, name.length() - 6);
        }
        String withTotal = name + "_total";
        mungedSamples = new ArrayList<Sample>(samples.size());
        for (Sample s: samples) {
          String n = s.name;
          if (name.equals(n)) {
            n = withTotal;
          }
          mungedSamples.add(new Sample(n, s.labelNames, s.labelValues, s.value, s.timestampMs));
        }
      }
      this.name = name;
      this.unit = unit;
      this.type = type;
      this.help = help;
      this.samples = mungedSamples;
    }

  • 主要接口
    希望返回的指標集
public interface Describable {
    /**
     *  Provide a list of metric families this Collector is expected to return.
     *
     *  These should exclude the samples. This is used by the registry to
     *  detect collisions and duplicate registrations.
     *
     *  Usually custom collectors do not have to implement Describable. If
     *  Describable is not implemented and the CollectorRegistry was created
     *  with auto describe enabled (which is the case for the default registry)
     *  then {@link collect} will be called at registration time instead of
     *  describe. If this could cause problems, either implement a proper
     *  describe, or if that's not practical have describe return an empty
     *  list.
     */
    List<MetricFamilySamples> describe();
  }

返回指標數據

  public abstract List<MetricFamilySamples> collect();
public class Counter extends SimpleCollector<Counter.Child> implements Collector.Describable {

可看出 , counter等這些 , 都會實現collector接口.

  • 下面是個httpserver例子
public class ExampleExporter {

    static final Gauge g = Gauge.build().name("gauge").help("blah").register();
    static final Counter c = Counter.build().name("counter").help("meh").register();
    static final Summary s = Summary.build().name("summary").help("meh").register();
    static final Histogram h = Histogram.build().name("histogram").help("meh").register();
    static final Gauge l = Gauge.build().name("labels").help("blah").labelNames("l").register();

    public static void main(String[] args) throws Exception {
        new HTTPServer(1234);
        g.set(1);
        c.inc(2);
        s.observe(3);
        h.observe(4);
        l.labels("foo").inc(5);
    }
}
  • 取數據
    之前說過, server通過handler處理數據, 使用的hadler處理
//循環集合,得到指標數據
 TextFormat.writeFormat(contentType, osw,
// 返回的是個Enumeration集合
                        registry.filteredMetricFamilySamples(parseQuery(query)));


---
//取數據
 public static void write004(Writer writer, Enumeration<Collector.MetricFamilySamples> mfs) throws IOException {
    Map<String, Collector.MetricFamilySamples> omFamilies = new TreeMap<String, Collector.MetricFamilySamples>();
    /* See http://prometheus.io/docs/instrumenting/exposition_formats/
     * for the output format specification. */
    while(mfs.hasMoreElements()) {
      Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
      String name = metricFamilySamples.name;
      writer.write("# HELP ");
      writer.write(name);
      if (metricFamilySamples.type == Collector.Type.COUNTER) {
        writer.write("_total");
      }
      if (metricFamilySamples.type == Collector.Type.INFO) {
        writer.write("_info");
      }
      writer.write(' ');
      writeEscapedHelp(writer, metricFamilySamples.help);
      writer.write('\n');

      writer.write("# TYPE ");
      writer.write(name);
      if (metricFamilySamples.type == Collector.Type.COUNTER) {
        writer.write("_total");
      }
      if (metricFamilySamples.type == Collector.Type.INFO) {
        writer.write("_info");
      }
      writer.write(' ');
      writer.write(typeString(metricFamilySamples.type));
      writer.write('\n');

      String createdName = name + "_created";
      String gcountName = name + "_gcount";
      String gsumName = name + "_gsum";
      for (Collector.MetricFamilySamples.Sample sample: metricFamilySamples.samples) {
        /* OpenMetrics specific sample, put in a gauge at the end. */
        if (sample.name.equals(createdName)
            || sample.name.equals(gcountName)
            || sample.name.equals(gsumName)) {
          Collector.MetricFamilySamples omFamily = omFamilies.get(sample.name);
          if (omFamily == null) {
            omFamily = new Collector.MetricFamilySamples(sample.name, Collector.Type.GAUGE, metricFamilySamples.help, new ArrayList<Collector.MetricFamilySamples.Sample>());
            omFamilies.put(sample.name, omFamily);
          }
          omFamily.samples.add(sample);
          continue;
        }
        writer.write(sample.name);
        if (sample.labelNames.size() > 0) {
          writer.write('{');
          for (int i = 0; i < sample.labelNames.size(); ++i) {
            writer.write(sample.labelNames.get(i));
            writer.write("=\"");
            writeEscapedLabelValue(writer, sample.labelValues.get(i));
            writer.write("\",");
          }
          writer.write('}');
        }
        writer.write(' ');
        writer.write(Collector.doubleToGoString(sample.value));
        if (sample.timestampMs != null){
          writer.write(' ');
          writer.write(sample.timestampMs.toString());
        }
        writer.write('\n');
      }
    }
    // Write out any OM-specific samples.
    if (!omFamilies.isEmpty()) {
      write004(writer, Collections.enumeration(omFamilies.values()));
    }
  }
  • 存數據
    以下是各步驟的代碼片段
 static final Gauge g = Gauge.build().name("gauge").help("blah").register();
---
    /**
     * Create and register the Collector with the given registry.
     * 返回一個實體 ,比如guage
     */
    public C register(CollectorRegistry registry) {
      //實現產生一個真正的指標類, 比如new guage()
      C sc = create();
      registry.register(sc);//向registry中存儲
      return sc;
    }

-----
public class CollectorRegistry {
  /**
   * The default registry.
   */
  public static final CollectorRegistry defaultRegistry = new CollectorRegistry(true);

  private final Object namesCollectorsLock = new Object();
  private final Map<Collector, List<String>> collectorsToNames = new HashMap<Collector, List<String>>();//類似 keyg:guage,value:"guage",注意, counter,SUMMARY等 , 會有加后綴的name
  private final Map<String, Collector> namesToCollectors = new HashMap<String, Collector>();//類似key;guaage,valu:一個真實的GaugeMetricFamily

  private final boolean autoDescribe;

  public CollectorRegistry() {
    this(false);
  }

  public CollectorRegistry(boolean autoDescribe) {
    this.autoDescribe = autoDescribe;
  }

  /**
   * Register a Collector.
   * <p>
   * A collector can be registered to multiple CollectorRegistries.
   */
  public void register(Collector m) {
    List<String> names = collectorNames(m);
    synchronized (namesCollectorsLock) {
      for (String name : names) {
        if (namesToCollectors.containsKey(name)) {
          throw new IllegalArgumentException("Collector already registered that provides name: " + name);
        }
      }
      //namesToCollectors  才是最終存儲的地方
      for (String name : names) {
        namesToCollectors.put(name, m);
      }
      collectorsToNames.put(m, names);
    }
  }


pushgateway方式

其實就是封裝了pushgateway的api, 推送,刪除 ,增加指標信息, 將registry中的指標送到pushgateway暫存,用的是http請求
使用

 *   void executeBatchJob() throws Exception {
 *     CollectorRegistry registry = new CollectorRegistry();
 *     Gauge duration = Gauge.build()
 *         .name("my_batch_job_duration_seconds").help("Duration of my batch job in seconds.").register(registry);
 *     Gauge.Timer durationTimer = duration.startTimer();
 *     try {
 *       // Your code here.
 *
 *       // This is only added to the registry after success,
 *       // so that a previous success in the Pushgateway isn't overwritten on failure.
 *       Gauge lastSuccess = Gauge.build()
 *           .name("my_batch_job_last_success").help("Last time my batch job succeeded, in unixtime.").register(registry);
 *       lastSuccess.setToCurrentTime();
 *     } finally {
 *       durationTimer.setDuration();
 *       PushGateway pg = new PushGateway("127.0.0.1:9091");
 *       pg.pushAdd(registry, "my_batch_job");
 *     }
 *   }

pushgateway有http連接池, 但是其實是每次都是新建鏈接,依賴於http 1.1 的keepalive,性能還好

public class PushGateway {

  private static final int MILLISECONDS_PER_SECOND = 1000;

  // Visible for testing.
  protected final String gatewayBaseURL;
  //連接池
  private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory();

----
public class DefaultHttpConnectionFactory implements HttpConnectionFactory {
    @Override
    public HttpURLConnection create(String url) throws IOException {
        return (HttpURLConnection) new URL(url).openConnection();
    }
}

  • 另外, push 方法默認是走了dorequest方法, 發送http請求. 下面是代碼實現
  void doRequest(CollectorRegistry registry, String job, Map<String, String> groupingKey, String method) throws IOException {
    String url = gatewayBaseURL;
    if (job.contains("/")) {
      url += "job@base64/" + base64url(job);
    } else {
      url += "job/" + URLEncoder.encode(job, "UTF-8");
    }

    if (groupingKey != null) {
      for (Map.Entry<String, String> entry: groupingKey.entrySet()) {
        if (entry.getValue().isEmpty()) {
          url += "/" + entry.getKey() + "@base64/=";
        } else if (entry.getValue().contains("/")) {
          url += "/" + entry.getKey() + "@base64/" + base64url(entry.getValue());
        } else {
          url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8");
        }
      }
    }
    HttpURLConnection connection = connectionFactory.create(url);
    connection.setRequestProperty("Content-Type", TextFormat.CONTENT_TYPE_004);
    if (!method.equals("DELETE")) {
      connection.setDoOutput(true);
    }
    connection.setRequestMethod(method);
    //連接的超時時間是10s, read數據的超時時間也是10s
    connection.setConnectTimeout(10 * MILLISECONDS_PER_SECOND);
    connection.setReadTimeout(10 * MILLISECONDS_PER_SECOND);
    connection.connect();

    try {
      if (!method.equals("DELETE")) {
        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(connection.getOutputStream(), "UTF-8"));
        TextFormat.write004(writer, registry.metricFamilySamples());
        writer.flush();
        writer.close();
      }

      int response = connection.getResponseCode();
      if (response/100 != 2) {
        String errorMessage;
        InputStream errorStream = connection.getErrorStream();
        if(errorStream != null) {
          String errBody = readFromStream(errorStream);
          errorMessage = "Response code from " + url + " was " + response + ", response body: " + errBody;
        } else {
          errorMessage = "Response code from " + url + " was " + response;
        }
        throw new IOException(errorMessage);
      }
    } finally {
      connection.disconnect();
    }
  }

  • 可以看到,這個超時時間默認超時時間是10s, readtimeout的超時時間也是10s, 這在gateway異常情況下, 會極大消耗客戶端資源, 導致掛起.

  • 可以schedule發送數據嗎?
    不行, 因為默認guage默認不會存時間戳, 所以時間存的其實是http請求的到達時間, 以下是抓包數據截圖

  • push每次新建連接會有性能問題嗎?
    --沒有,走的是keepalive
    以下是push方法-dorequest之后, disconnect的方法源碼

 public void disconnect() {
        this.responseCode = -1;
        if (this.pi != null) {
            this.pi.finishTracking();
            this.pi = null;
        }

        if (this.http != null) {
            if (this.inputStream != null) {
                HttpClient var1 = this.http;
                boolean var2 = var1.isKeepingAlive();

                try {
                    this.inputStream.close();
                } catch (IOException var4) {
                }

                if (var2) {
                    var1.closeIdleConnection();
                }
            } else {
                this.http.setDoNotRetry(true);
                this.http.closeServer();
            }

            this.http = null;
            this.connected = false;
        }

        this.cachedInputStream = null;
        if (this.cachedHeaders != null) {
            this.cachedHeaders.reset();
        }

    }

可以看到 , URI的disconnect方法不是真正的關閉連接, 而是把相關的數據清除 , 以便下次復用. client和server是http1.1協議, 客戶端默認開啟keep-alive,而pushgateway的服務端也是支持的.
以下是兩次push的wireshark抓包
請求的代碼片段

public class ExamplePushGateway {
  static final CollectorRegistry pushRegistry = new CollectorRegistry();
  static final Gauge g = (Gauge) Gauge.build().name("gauge").help("blah").register(pushRegistry);

  /**
   * Example of how to use the pushgateway, pass in the host:port of a pushgateway.
   */
  public static void main(String[] args) throws Exception {
    PushGateway pg = new PushGateway("127.0.0.1:9091");
    g.set(42);
    pg.push(pushRegistry, "job");

    g.set(45);
    pg.push(pushRegistry,"job");

抓本機服務 使用這個

filter設置
tcp.dstport == 9091 or tcp.srcport == 9091

由抓包數據可知,只進行了一次http握手,disconnect並沒有關閉連接, 然后設置的是keepalive
以下是push一次數據之后 , 一段時間內的鏈接情況, 可見, 客戶端和服務器之前的鏈接並沒有斷開


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM