Influxdb Java客戶端
Influxdb 的Docker版本目前最高是1.8.3. 官方最高版本是2.0.
Note: We recommend using the new client libraries on this page to leverage the new read (via Flux) and write APIs and prepare for conversion to InfluxDB 2.0 and InfluxDB Cloud 2.0. For more information, see InfluxDB 2.0 API compatibility endpoints. Client libraries for InfluxDB 1.7 and earlier may continue to work, but are not maintained by InfluxData.
官方推薦使用本頁新讀(Flux)和寫API來訪問Influx 2.0
查看官方文檔發現,客戶端有:
influx-client-java
與時俱進
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>1.12.0</version>
</dependency>
創建連接
InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://localhost:8086", token);
//
// Create bucket "iot_bucket" with data retention set to 3,600 seconds
//
BucketRetentionRules retention = new BucketRetentionRules();
retention.setEverySeconds(3600);
Bucket bucket = influxDBClient.getBucketsApi().createBucket("iot-bucket", retention, "12bdc4164c2e8141");
1.8版本如何訪問
Query data in InfluxDB 1.8.0+ using the InfluxDB 2.0 API and Flux (endpoint should be enabled by flux-enabled option)
雖然InfluxDB 2.0 API兼容 influxdb 1.8.0,但是應該開啟 flux
[http]
enabled = true
bind-address = ":8086"
auth-enabled = true
log-enabled = true
suppress-write-log = false
write-tracing = false
flux-enabled = true //重點在這
Flux 是什么,FluxQL是什么?
Flux是InfluxQL和其他類似SQL的查詢語言的替代品,用於查詢和分析數據。Flux使用功能語言模式,使其功能強大,靈活,並能夠克服InfluxQL的許多限制。
看樣子官方是更加推薦使用 Flux語法
Flux語法
例子:(bucket是InfluxDb2.0提出的概念,Influxdb1.x沒有)
data = from(bucket: "db/rp")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "example-measurement" and
r._field == "example-field"
)
Flux1.8和2.0
Key-Concepts
從這篇文章可以看到 2.0 是把Database和RententionPolicies合並為Bucket。因此Flink指標收集的位置是 flink/flink。
另外Organization是什么?
InfluxDB組織是一組用戶的工作空間。所有儀表板,任務,存儲桶和用戶均屬於組織。有關組織的更多信息,請參閱管理組織。
InfluxDb 1.8沒有Organization的概念。通過下方的例子,可以看到1.8版本的org=-。
訪問Influxdb 1.8
public class InfluxDB18Example {
public static void main(final String[] args) {
String database = "telegraf";
String retentionPolicy = "autogen";
InfluxDBClient client = InfluxDBClientFactory.createV1("http://localhost:8086",
"username",
"password".toCharArray(),
database,
retentionPolicy);
System.out.println("*** Write Points ***");
try (WriteApi writeApi = client.getWriteApi()) {
Point point = Point.measurement("mem")
.addTag("host", "host1")
.addField("used_percent", 29.43234543);
System.out.println(point.toLineProtocol());
writeApi.writePoint(point);
}
System.out.println("*** Query Points ***");
String query = String.format("from(bucket: \"%s/%s\") |> range(start: -1h)", database, retentionPolicy);
List<FluxTable> tables = client.getQueryApi().query(query);
tables.get(0).getRecords()
.forEach(record -> System.out.println(String.format("%s %s: %s %s",
record.getTime(), record.getMeasurement(), record.getField(), record.getValue())));
client.close();
}
}
訪問Flink Checkpoint的路徑指標數據
public static List<JobLastCheckpointExternalPath> getCheckPoints(String jobId) {
InfluxDbConfig config = new InfluxDbConfig();
config.setHost("http://10.11.159.156:8099");
config.setDatabase("flink");
config.setPassword("flink");
config.setUsername("flink");
config.setRetentionPolicy("one_hour");
String database = config.getDatabase();
String retentionPolicy = config.getRetentionPolicy();
InfluxDBClient client = InfluxDBClientFactory.createV1(config.getHost(),
config.getUsername(),
config.getPassword().toCharArray(),
database,
retentionPolicy);
client.setLogLevel(LogLevel.BASIC);
QueryApi queryApi = client.getQueryApi();
String query = String.format("from(bucket: \"%s/%s\") |> range(start: -30m) |> filter(fn: (r) =>\n" +
" r._measurement == \"jobmanager_job_lastCheckpointExternalPath\" and\n" +
" r.job_id == \"%s\"\n" +
" ) |> sort(columns: [\"_time\"], desc: true) |> limit(n:100)", database, retentionPolicy, jobId);
//
// Query data
//
List<JobLastCheckpointExternalPath> tables = queryApi.query(query, JobLastCheckpointExternalPath.class);
client.close();
return tables;
}
@Measurement(name = "jobmanager_job_lastCheckpointExternalPath")
public static
class JobLastCheckpointExternalPath {
@Column(timestamp = true)
Instant time;
@Column(name = "job_name")
String jobName;
@Column(name = "job_id")
String jobId;
@Column(name = "value")
String value;
@Override
public String toString() {
return "JobLastCheckpointExternalPath{" +
"time=" + time +
", jobName='" + jobName + '\'' +
", jobId='" + jobId + '\'' +
", value='" + value + '\'' +
'}';
}
}
更新(Flux語法)
fields = ["3361e97159f5d473f273b38a96e7ba06"]
from(bucket: "flink/one_hour") |> range(start: -1h)
|> filter(fn: (r) => r._measurement == "jobmanager_job_lastCheckpointExternalPath" and contains(value: r.job_id, set: fields)
and r["_value"] != "n/a"
)
|> keep(columns: ["_time", "_value", "job_id", "job_name"])
|> limit(n:1000)
|> sort(columns: ["_time"], desc: true)
注意:經過實際使用發現 sort 寫在limit 前面會導致實際排序不可靠。(就連官方文檔都是sort在limit前面)
sort() and limit()
SQL的in 查詢 對應 contains
contains(value: r.job_id, set: fields)
SQL的select projection 對應keep
keep(columns: ["_time", "_value", "job_id", "job_name"])