指標接入方式
官方源碼庫 https://github.com/prometheus/client_java
- target自己采集指標,暴露出端口, prometheusserver主動拉取數據
- target主動推送到pushgateway, prometheus主動去pushgateway拉取
target 暴露端口的方式
本模式下有兩種實現
- 普通采集指標,暴露接口的方式
- 借助actuator和內置的micrometer, 然后使用prometheus-registry. --對於springboot項目, 這種比較方便業務埋點. 從springboot2.X開始, actuator內部集成了micrometer
simpleclient方式的使用方式和原理
- 接入方式
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>0.10.0</version>
</dependency>
<!-- Hotspot JVM metrics-->
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
<version>0.10.0</version>
</dependency>
<!-- Exposition HTTPServer-->
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_httpserver</artifactId>
<version>0.10.0</version>
</dependency>
**<!-- Pushgateway exposition-->**
<!--
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_pushgateway</artifactId>
<version>0.10.0</version>
</dependency>
-->
從上面的注釋也能看出, 加了pushgateway之后 , 就能推送到pushgateway了
- 使用方式
參見源碼中readme文件, 附有例子 ,我就不啰嗦了. - 原理
- simpleclient_hotspot包主要用於采集jvm相關的指標信息
- simpleclient 內部封裝了基本的數據結構和數據采集方式, 是最底層的邏輯
- httpserver 負責將采集的數據暴露出去, 負責接收請求
- 開胃菜-從prometheus的拉取開始 --httpserver部分
可見httpserver部分只有一個類, 主要完成暴露接口, 接收請求的功能. 寫的短小精悍, 值得細讀, 對於希望做出一個輕量級(輕量級就意味着專注於核心功能)的web服務的需求來說,很合適.
- NamedDaemonThreadFactory 線程池-線程工廠類
static class NamedDaemonThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final int poolNumber = POOL_NUMBER.getAndIncrement();
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadFactory delegate;
private final boolean daemon;
/**
*
* @param delegate
* @param daemon 一般都是false,設置為普通用戶線程.主線程退出,用戶線程還會繼續執行.
* 如果設置為true , 設置為守護線程, 主線程退出, 守護現場也會銷毀, 垃圾回收線程就是守護線程.
*/
NamedDaemonThreadFactory(ThreadFactory delegate, boolean daemon) {
this.delegate = delegate;
this.daemon = daemon;
}
@Override
public Thread newThread(Runnable r) {
Thread t = delegate.newThread(r);
t.setName(String.format("prometheus-http-%d-%d", poolNumber, threadNumber.getAndIncrement()));
t.setDaemon(daemon);
return t;
}
static ThreadFactory defaultThreadFactory(boolean daemon) {
return new NamedDaemonThreadFactory(Executors.defaultThreadFactory(), daemon);
}
}
- LocalByteArray 實現了threalocal,用於存儲/傳遞數據
private static class LocalByteArray extends ThreadLocal<ByteArrayOutputStream> {
// ByteArrayOutputStream 是一個基於字節數組的輸出流
@Override
protected ByteArrayOutputStream initialValue()
{
return new ByteArrayOutputStream(1 << 20);
}
}
- HTTPMetricHandler 處理http請求
/**
* Handles Metrics collections from the given registry.
* registry位於simpleclient包
*/
static class HTTPMetricHandler implements HttpHandler {
private final CollectorRegistry registry;
//LocalByteArray 是一個threadlocal對象,
private final LocalByteArray response = new LocalByteArray();
private final static String HEALTHY_RESPONSE = "Exporter is Healthy.";
HTTPMetricHandler(CollectorRegistry registry) {
this.registry = registry;
}
@Override
public void handle(HttpExchange t) throws IOException {
String query = t.getRequestURI().getRawQuery();
String contextPath = t.getHttpContext().getPath();
ByteArrayOutputStream response = this.response.get();
//清空輸出流
response.reset();
OutputStreamWriter osw = new OutputStreamWriter(response, Charset.forName("UTF-8"));
if ("/-/healthy".equals(contextPath)) {//如果是健康檢查; 可見prometheus的健康檢查是基於http請求的
osw.write(HEALTHY_RESPONSE);
} else {
String contentType = TextFormat.chooseContentType(t.getRequestHeaders().getFirst("Accept"));
t.getResponseHeaders().set("Content-Type", contentType);
// 重頭戲!!! 從registry中取出數據, 寫入到輸出流中,
//得到的是 Enumeration<Collector.MetricFamilySamples>
TextFormat.writeFormat(contentType, osw,
registry.filteredMetricFamilySamples(parseQuery(query)));
}
osw.close();
// 如果客戶端請求表示需要壓縮的話, 要進行壓縮
if (shouldUseCompression(t)) {
t.getResponseHeaders().set("Content-Encoding", "gzip");
t.sendResponseHeaders(HttpURLConnection.HTTP_OK, 0);
final GZIPOutputStream os = new GZIPOutputStream(t.getResponseBody());
try {
response.writeTo(os);
} finally {
os.close();
}
} else {
t.getResponseHeaders().set("Content-Length",
String.valueOf(response.size()));
t.sendResponseHeaders(HttpURLConnection.HTTP_OK, response.size());
response.writeTo(t.getResponseBody());
}
t.close();
}
}
上面提到了一個 Enumeration ,這個接口用於遍歷集合
public interface Enumeration<E> {
/**
* Tests if this enumeration contains more elements.
*
* @return <code>true</code> if and only if this enumeration object
* contains at least one more element to provide;
* <code>false</code> otherwise.
*/
boolean hasMoreElements();
/**
* Returns the next element of this enumeration if this enumeration
* object has at least one more element to provide.
*
* @return the next element of this enumeration.
* @exception NoSuchElementException if no more elements exist.
*/
E nextElement();
}
成員變量介紹完了, 再看看構造方法
/**
* Start a HTTP server serving Prometheus metrics from the given registry using the given {@link HttpServer}.
* The {@code httpServer} is expected to already be bound to an address
*/
public HTTPServer(HttpServer httpServer, CollectorRegistry registry, boolean daemon) throws IOException {
if (httpServer.getAddress() == null)
throw new IllegalArgumentException("HttpServer hasn't been bound to an address");
//httpserver是 com.sun.net.httpserver, 這個是jdk提供的類,包含了一系列用於構建web服務器的類
server = httpServer;
//httphandler用於傳給server, 處理請求
HttpHandler mHandler = new HTTPMetricHandler(registry);
//標記處理類
server.createContext("/", mHandler);
server.createContext("/metrics", mHandler);
server.createContext("/-/healthy", mHandler);
//真正處理任務的線程池, 由於主要是用於prometheus采集, 不需要很多線程
executorService = Executors.newFixedThreadPool(5, NamedDaemonThreadFactory.defaultThreadFactory(daemon));
server.setExecutor(executorService);
start(daemon);
}
- http-server相關使用可參見下文
https://www.cnblogs.com/aspwebchh/p/8300945.html
可見, httpserver部分主要功能就是暴露端口, 提供web服務, 供prometheus拉取數據. 內部使用線程池處理請求 ,處理時, 使用handler從registry中獲取數據, 然后發出響應. handler使用threadlocal存放響應數據. 這個模塊的啟動方式就是 new HTTPServer(port);
至於再哪兒啟動的 , 得看情況.
simple-client里面封裝了prometheus的指標類型 ,提供了model和存儲類,定義了collector(收集器), registry等
下面主要講講model之間的關系, 以及數據如何存儲,如何被取出.
記住 , 客戶端只是收集質變的當前值, 歷史值都在prometheus里面存儲,客戶端創建了指標之后, 只是再不斷更改它的值
- 先說model
啟動類 基本類型都定義再collector里面
接口實現關系
主要類 :
Sample 單個樣本
/**
* A single Sample, with a unique name and set of labels.
* 單個樣本
*/
public static class Sample {
public final String name;
public final List<String> labelNames;
public final List<String> labelValues; // Must have same length as labelNames.
public final double value;
//每個樣本默認都會與時間戳
public final Long timestampMs; // It's an epoch format with milliseconds value included (this field is subject to change).
public Sample(String name, List<String> labelNames, List<String> labelValues, double value, Long timestampMs) {
this.name = name;
this.labelNames = labelNames;
this.labelValues = labelValues;
this.value = value;
this.timestampMs = timestampMs;
}
MetricFamilySamples 真正的指標類
/**
* A metric, and all of its samples.
* 真正的指標類, 里面包含一系列樣本
* 指標集樣本
*/
static public class MetricFamilySamples {
public final String name;
public final String unit;
public final Type type;
public final String help;
//樣本
public final List<Sample> samples;
public MetricFamilySamples(String name, String unit, Type type, String help, List<Sample> samples) {
if (!unit.isEmpty() && !name.endsWith("_" + unit)) {
throw new IllegalArgumentException("Metric's unit is not the suffix of the metric name: " + name);
}
if ((type == Type.INFO || type == Type.STATE_SET) && !unit.isEmpty()) {
throw new IllegalArgumentException("Metric is of a type that cannot have a unit: " + name);
}
List<Sample> mungedSamples = samples;
// Deal with _total from pre-OM automatically.
if (type == Type.COUNTER) {
if (name.endsWith("_total")) {
name = name.substring(0, name.length() - 6);
}
String withTotal = name + "_total";
mungedSamples = new ArrayList<Sample>(samples.size());
for (Sample s: samples) {
String n = s.name;
if (name.equals(n)) {
n = withTotal;
}
mungedSamples.add(new Sample(n, s.labelNames, s.labelValues, s.value, s.timestampMs));
}
}
this.name = name;
this.unit = unit;
this.type = type;
this.help = help;
this.samples = mungedSamples;
}
- 主要接口
希望返回的指標集
public interface Describable {
/**
* Provide a list of metric families this Collector is expected to return.
*
* These should exclude the samples. This is used by the registry to
* detect collisions and duplicate registrations.
*
* Usually custom collectors do not have to implement Describable. If
* Describable is not implemented and the CollectorRegistry was created
* with auto describe enabled (which is the case for the default registry)
* then {@link collect} will be called at registration time instead of
* describe. If this could cause problems, either implement a proper
* describe, or if that's not practical have describe return an empty
* list.
*/
List<MetricFamilySamples> describe();
}
返回指標數據
public abstract List<MetricFamilySamples> collect();
public class Counter extends SimpleCollector<Counter.Child> implements Collector.Describable {
可看出 , counter等這些 , 都會實現collector接口.
- 下面是個httpserver例子
public class ExampleExporter {
static final Gauge g = Gauge.build().name("gauge").help("blah").register();
static final Counter c = Counter.build().name("counter").help("meh").register();
static final Summary s = Summary.build().name("summary").help("meh").register();
static final Histogram h = Histogram.build().name("histogram").help("meh").register();
static final Gauge l = Gauge.build().name("labels").help("blah").labelNames("l").register();
public static void main(String[] args) throws Exception {
new HTTPServer(1234);
g.set(1);
c.inc(2);
s.observe(3);
h.observe(4);
l.labels("foo").inc(5);
}
}
- 取數據
之前說過, server通過handler處理數據, 使用的hadler處理
//循環集合,得到指標數據
TextFormat.writeFormat(contentType, osw,
// 返回的是個Enumeration集合
registry.filteredMetricFamilySamples(parseQuery(query)));
---
//取數據
public static void write004(Writer writer, Enumeration<Collector.MetricFamilySamples> mfs) throws IOException {
Map<String, Collector.MetricFamilySamples> omFamilies = new TreeMap<String, Collector.MetricFamilySamples>();
/* See http://prometheus.io/docs/instrumenting/exposition_formats/
* for the output format specification. */
while(mfs.hasMoreElements()) {
Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement();
String name = metricFamilySamples.name;
writer.write("# HELP ");
writer.write(name);
if (metricFamilySamples.type == Collector.Type.COUNTER) {
writer.write("_total");
}
if (metricFamilySamples.type == Collector.Type.INFO) {
writer.write("_info");
}
writer.write(' ');
writeEscapedHelp(writer, metricFamilySamples.help);
writer.write('\n');
writer.write("# TYPE ");
writer.write(name);
if (metricFamilySamples.type == Collector.Type.COUNTER) {
writer.write("_total");
}
if (metricFamilySamples.type == Collector.Type.INFO) {
writer.write("_info");
}
writer.write(' ');
writer.write(typeString(metricFamilySamples.type));
writer.write('\n');
String createdName = name + "_created";
String gcountName = name + "_gcount";
String gsumName = name + "_gsum";
for (Collector.MetricFamilySamples.Sample sample: metricFamilySamples.samples) {
/* OpenMetrics specific sample, put in a gauge at the end. */
if (sample.name.equals(createdName)
|| sample.name.equals(gcountName)
|| sample.name.equals(gsumName)) {
Collector.MetricFamilySamples omFamily = omFamilies.get(sample.name);
if (omFamily == null) {
omFamily = new Collector.MetricFamilySamples(sample.name, Collector.Type.GAUGE, metricFamilySamples.help, new ArrayList<Collector.MetricFamilySamples.Sample>());
omFamilies.put(sample.name, omFamily);
}
omFamily.samples.add(sample);
continue;
}
writer.write(sample.name);
if (sample.labelNames.size() > 0) {
writer.write('{');
for (int i = 0; i < sample.labelNames.size(); ++i) {
writer.write(sample.labelNames.get(i));
writer.write("=\"");
writeEscapedLabelValue(writer, sample.labelValues.get(i));
writer.write("\",");
}
writer.write('}');
}
writer.write(' ');
writer.write(Collector.doubleToGoString(sample.value));
if (sample.timestampMs != null){
writer.write(' ');
writer.write(sample.timestampMs.toString());
}
writer.write('\n');
}
}
// Write out any OM-specific samples.
if (!omFamilies.isEmpty()) {
write004(writer, Collections.enumeration(omFamilies.values()));
}
}
- 存數據
以下是各步驟的代碼片段
static final Gauge g = Gauge.build().name("gauge").help("blah").register();
---
/**
* Create and register the Collector with the given registry.
* 返回一個實體 ,比如guage
*/
public C register(CollectorRegistry registry) {
//實現產生一個真正的指標類, 比如new guage()
C sc = create();
registry.register(sc);//向registry中存儲
return sc;
}
-----
public class CollectorRegistry {
/**
* The default registry.
*/
public static final CollectorRegistry defaultRegistry = new CollectorRegistry(true);
private final Object namesCollectorsLock = new Object();
private final Map<Collector, List<String>> collectorsToNames = new HashMap<Collector, List<String>>();//類似 keyg:guage,value:"guage",注意, counter,SUMMARY等 , 會有加后綴的name
private final Map<String, Collector> namesToCollectors = new HashMap<String, Collector>();//類似key;guaage,valu:一個真實的GaugeMetricFamily
private final boolean autoDescribe;
public CollectorRegistry() {
this(false);
}
public CollectorRegistry(boolean autoDescribe) {
this.autoDescribe = autoDescribe;
}
/**
* Register a Collector.
* <p>
* A collector can be registered to multiple CollectorRegistries.
*/
public void register(Collector m) {
List<String> names = collectorNames(m);
synchronized (namesCollectorsLock) {
for (String name : names) {
if (namesToCollectors.containsKey(name)) {
throw new IllegalArgumentException("Collector already registered that provides name: " + name);
}
}
//namesToCollectors 才是最終存儲的地方
for (String name : names) {
namesToCollectors.put(name, m);
}
collectorsToNames.put(m, names);
}
}
pushgateway方式
其實就是封裝了pushgateway的api, 推送,刪除 ,增加指標信息, 將registry中的指標送到pushgateway暫存,用的是http請求
使用
* void executeBatchJob() throws Exception {
* CollectorRegistry registry = new CollectorRegistry();
* Gauge duration = Gauge.build()
* .name("my_batch_job_duration_seconds").help("Duration of my batch job in seconds.").register(registry);
* Gauge.Timer durationTimer = duration.startTimer();
* try {
* // Your code here.
*
* // This is only added to the registry after success,
* // so that a previous success in the Pushgateway isn't overwritten on failure.
* Gauge lastSuccess = Gauge.build()
* .name("my_batch_job_last_success").help("Last time my batch job succeeded, in unixtime.").register(registry);
* lastSuccess.setToCurrentTime();
* } finally {
* durationTimer.setDuration();
* PushGateway pg = new PushGateway("127.0.0.1:9091");
* pg.pushAdd(registry, "my_batch_job");
* }
* }
pushgateway有http連接池, 但是其實是每次都是新建鏈接,依賴於http 1.1 的keepalive,性能還好
public class PushGateway {
private static final int MILLISECONDS_PER_SECOND = 1000;
// Visible for testing.
protected final String gatewayBaseURL;
//連接池
private HttpConnectionFactory connectionFactory = new DefaultHttpConnectionFactory();
----
public class DefaultHttpConnectionFactory implements HttpConnectionFactory {
@Override
public HttpURLConnection create(String url) throws IOException {
return (HttpURLConnection) new URL(url).openConnection();
}
}
- 另外, push 方法默認是走了dorequest方法, 發送http請求. 下面是代碼實現
void doRequest(CollectorRegistry registry, String job, Map<String, String> groupingKey, String method) throws IOException {
String url = gatewayBaseURL;
if (job.contains("/")) {
url += "job@base64/" + base64url(job);
} else {
url += "job/" + URLEncoder.encode(job, "UTF-8");
}
if (groupingKey != null) {
for (Map.Entry<String, String> entry: groupingKey.entrySet()) {
if (entry.getValue().isEmpty()) {
url += "/" + entry.getKey() + "@base64/=";
} else if (entry.getValue().contains("/")) {
url += "/" + entry.getKey() + "@base64/" + base64url(entry.getValue());
} else {
url += "/" + entry.getKey() + "/" + URLEncoder.encode(entry.getValue(), "UTF-8");
}
}
}
HttpURLConnection connection = connectionFactory.create(url);
connection.setRequestProperty("Content-Type", TextFormat.CONTENT_TYPE_004);
if (!method.equals("DELETE")) {
connection.setDoOutput(true);
}
connection.setRequestMethod(method);
//連接的超時時間是10s, read數據的超時時間也是10s
connection.setConnectTimeout(10 * MILLISECONDS_PER_SECOND);
connection.setReadTimeout(10 * MILLISECONDS_PER_SECOND);
connection.connect();
try {
if (!method.equals("DELETE")) {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(connection.getOutputStream(), "UTF-8"));
TextFormat.write004(writer, registry.metricFamilySamples());
writer.flush();
writer.close();
}
int response = connection.getResponseCode();
if (response/100 != 2) {
String errorMessage;
InputStream errorStream = connection.getErrorStream();
if(errorStream != null) {
String errBody = readFromStream(errorStream);
errorMessage = "Response code from " + url + " was " + response + ", response body: " + errBody;
} else {
errorMessage = "Response code from " + url + " was " + response;
}
throw new IOException(errorMessage);
}
} finally {
connection.disconnect();
}
}
-
可以看到,這個超時時間默認超時時間是10s, readtimeout的超時時間也是10s, 這在gateway異常情況下, 會極大消耗客戶端資源, 導致掛起.
-
可以schedule發送數據嗎?
不行, 因為默認guage默認不會存時間戳, 所以時間存的其實是http請求的到達時間, 以下是抓包數據截圖
-
push每次新建連接會有性能問題嗎?
--沒有,走的是keepalive
以下是push方法-dorequest之后, disconnect的方法源碼
public void disconnect() {
this.responseCode = -1;
if (this.pi != null) {
this.pi.finishTracking();
this.pi = null;
}
if (this.http != null) {
if (this.inputStream != null) {
HttpClient var1 = this.http;
boolean var2 = var1.isKeepingAlive();
try {
this.inputStream.close();
} catch (IOException var4) {
}
if (var2) {
var1.closeIdleConnection();
}
} else {
this.http.setDoNotRetry(true);
this.http.closeServer();
}
this.http = null;
this.connected = false;
}
this.cachedInputStream = null;
if (this.cachedHeaders != null) {
this.cachedHeaders.reset();
}
}
可以看到 , URI的disconnect方法不是真正的關閉連接, 而是把相關的數據清除 , 以便下次復用. client和server是http1.1協議, 客戶端默認開啟keep-alive,而pushgateway的服務端也是支持的.
以下是兩次push的wireshark抓包
請求的代碼片段
public class ExamplePushGateway {
static final CollectorRegistry pushRegistry = new CollectorRegistry();
static final Gauge g = (Gauge) Gauge.build().name("gauge").help("blah").register(pushRegistry);
/**
* Example of how to use the pushgateway, pass in the host:port of a pushgateway.
*/
public static void main(String[] args) throws Exception {
PushGateway pg = new PushGateway("127.0.0.1:9091");
g.set(42);
pg.push(pushRegistry, "job");
g.set(45);
pg.push(pushRegistry,"job");
抓本機服務 使用這個
filter設置
tcp.dstport == 9091 or tcp.srcport == 9091
由抓包數據可知,只進行了一次http握手,disconnect並沒有關閉連接, 然后設置的是keepalive
以下是push一次數據之后 , 一段時間內的鏈接情況, 可見, 客戶端和服務器之前的鏈接並沒有斷開