influxdb操作


一、连接influxdb

package influxdb

import (
    "sync"
    "github.com/influxdata/influxdb-client-go/v2"
    "github.com/influxdata/influxdb-client-go/v2/api/write"
)


var (
    oneInfluxdb           sync.Once
    client                influxdb2.Client.WriteAPI
    writeAPI              influxdb2.Client
    measurement           = "rule_stat"
    org                   = "rule_stat"
    bucket                = "rule_stat_test"
)


func InitInfluxdb(url, token string) {
    oneInfluxdb.Do(func(){
        if client == nil {
            client = influxdb2.NewClient(url, token)
            writeAPI = client.WriteAPI(org, bucket)
        }
    })
}                  

 

 二、influxdb概念

1.基本概念

influxDB名词 传统数据库概念
database 数据库
measurement 数据表
point 数据行

 

 

 

 

 

database:

数据库。一个数据库可以有多个measurement,retention policy, continuous queries以及user。

measurement:

数据表,不需要创建,写入数据的时候自动创建

point:

influxDB中的point相当于传统数据库里的一行数据,由时间戳(time)、数据(field)、标签(tag)组成。

Point属性 传统数据库概念
time 每个数据记录时间,是数据库中的主索引
field 各种记录值(没有索引的属性),例如温度、湿度
tags 各种有索引的属性,例如地区、海拔

 

 

 

 

 

示例:

|measurement_name|,tag_set| |field_set| |timestamp|
weather,location=us,server=host1 temperature=82 1465839830100400200

weather是表名。

Tag:

上面的location和server就是tag key,us和host1是tag value,tag是可选的。

不过写入数据时最好加上tag,因为它可以被索引。

tag value的类型只能是字符串。

Field:

上面的temperature是field key,82是field value。

field value会用于展示,value支持的类型有floats,integers,strings,booleans。

字段必须存在。注意,字段是没有索引的。

如果使用字段作为查询条件,会扫描符合查询条件的所有字段值,性能不及tag。

Timestamp:

格式是:RFC3339 UTC。默认精确到纳秒,可选。

2.influxdb概念

Series:

measurement, tag set, retention policy相同的数据集合算做一个 series。

理解这个概念至关重要,因为这些数据存储在内存中,如果series太多,会导致OOM。

Retention Policy:

保留策略,包括设置数据保存的时间以及在集群中的副本个数。

默认配置是:RP是autogen,保留时间是永久,副本为1。

这些配置在创建数据库时可以修改。

Shard:

存储一定时间间隔的数据,每个目录对应一个shard,目录的名字就是shard id。每一个shard都有自己的cache、wal、tsm file以及compactor,目的就是通过时间来快速定位到要查询数据的相关资源,加速查询的过程,并且也让之后的批量删除数据的操作变得非常简单且高效。

bucket:

即使是相同的measurement、tags、fields、time,即完全相同的point,因为Retention Policy不同,会存储在不同的地方。

为了解决以上问题,influxdb2.0引入的概念,相当于database + Retention Policy

3.存储引擎

概述:TSM Tree是在LSM Tree的基础上稍作修改优化而来。

它主要包含四个部分:cache、wal、tsm file、compactor。

Cache

插入数据时,先往cache中写入再写入wal中,可以认为cache是wal文件中的数据在内存中的缓存。

WAL

预写日志,对比mysql的binlog。

其作用就是为了持久化数据,当系统崩溃后可以通过wal文件恢复cache。

TSM File

每个tsm文件的大小上限是2GB。

当达到cache-snapshot-memory-size,cache-max-memory-size的限制时会触发将cache写入tsm文件。

Compactor

主要进行两种操作,一种是cache数据达到阀值后,进行快照,生成一个新的tsm文件。另外一种就是合并当前的tsm文件,将多个小的tsm文件合并成一个,减少文件的数量,并且进行一些数据删除操作。 这些操作都在后台自动完成。

4.目录结构

InfluxDB的数据存储有三个目录,分别是meta、wal、data。

meta用于存储数据库的一些元数据,meta目录下有一个meta.db文件。

wal目录存放预写日志文件,以.wal结尾。

data目录存放实际存储的数据文件,以.tsm结尾。

示例:

 

其中 test是数据库名称,autogen是存储策略名称,再下一层目录中的以数字命名的目录是 shard 的 ID 值,

比如 autogen存储策略下有两个 shard,ID 分别为 1 和 2。shard存储了某一个时间段范围内的数据。

再下一级的目录则为具体的文件,分别是 .wal.tsm结尾的文件。

 三、读写数据

// insert
func writesPoints(cli client.Client) {
    bp, err := influxdb2.NewBatchPoints(client.BatchPointsConfig{
        Database:  "test",
        Precision: "s", //精度,默认ns
    })
    if err != nil {
        log.Fatal(err)
    }
    measurement := "cpu_usage"
    tags := map[string]string{"cpu": "ih-cpu"}
    fields := map[string]interface{}{
        "idle":   201.1,
        "system": 43.3,
        "user":   86.6,
    }
 
    pt, err := influxdb2.NewPoint(measurement, tags, fields, time.Now())
    if err != nil {
        log.Fatal(err)
    }
    bp.AddPoint(pt)
    err = cli.Write(bp)
    if err != nil {
        log.Fatal(err)
    }
    log.Println("insert success")
}
// query
func queryDB(cli client.Client, cmd string) (res []client.Result, err error) {
    q := influxdb2.Query{
        Command:  cmd,
        Database: "test",
    }
    if response, err := cli.Query(q); err == nil {
        if response.Error() != nil {
            return res, response.Error()
        }
        res = response.Results
    } else {
        return res, err
    }
    return res, nil
}

 

package main
 
import (
    "fmt"
    "log"
    "time"
 
    client "github.com/influxdata/influxdb1-client/v2"
)
 
// influxdb demo
 
func connInflux() client.Client {
    cli, err := client.NewHTTPClient(client.HTTPConfig{
        Addr:     "http://127.0.0.1:8086",
        Username: "admin",
        Password: "",
    })
    if err != nil {
        log.Fatal(err)
    }
    return cli
}
func main() {
    conn := connInflux()
    fmt.Println(conn)
 
    // insert
    writesPoints(conn)
 
    // 获取10条数据并展示
    qs := fmt.Sprintf("SELECT * FROM %s LIMIT %d", "cpu_usage", 10)
    res, err := queryDB(conn, qs)
    if err != nil {
        log.Fatal(err)
    }
 
    for _, row := range res[0].Series[0].Values {
        for j, value := range row {
            log.Printf("j:%d value:%v\n", j, value)
        }
    }
}

http://testerhome.com/articles/30768

 https://www.sunzhongwei.com/amp/difference-between-bucket-and-measurement-in-influxdb

https://blog.csdn.net/zxxshaycormac/article/details/115497682


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM