influxdb操作


一、連接influxdb

package influxdb

import (
    "sync"
    "github.com/influxdata/influxdb-client-go/v2"
    "github.com/influxdata/influxdb-client-go/v2/api/write"
)


var (
    oneInfluxdb           sync.Once
    client                influxdb2.Client.WriteAPI
    writeAPI              influxdb2.Client
    measurement           = "rule_stat"
    org                   = "rule_stat"
    bucket                = "rule_stat_test"
)


func InitInfluxdb(url, token string) {
    oneInfluxdb.Do(func(){
        if client == nil {
            client = influxdb2.NewClient(url, token)
            writeAPI = client.WriteAPI(org, bucket)
        }
    })
}                  

 

 二、influxdb概念

1.基本概念

influxDB名詞 傳統數據庫概念
database 數據庫
measurement 數據表
point 數據行

 

 

 

 

 

database:

數據庫。一個數據庫可以有多個measurement,retention policy, continuous queries以及user。

measurement:

數據表,不需要創建,寫入數據的時候自動創建

point:

influxDB中的point相當於傳統數據庫里的一行數據,由時間戳(time)、數據(field)、標簽(tag)組成。

Point屬性 傳統數據庫概念
time 每個數據記錄時間,是數據庫中的主索引
field 各種記錄值(沒有索引的屬性),例如溫度、濕度
tags 各種有索引的屬性,例如地區、海拔

 

 

 

 

 

示例:

|measurement_name|,tag_set| |field_set| |timestamp|
weather,location=us,server=host1 temperature=82 1465839830100400200

weather是表名。

Tag:

上面的location和server就是tag key,us和host1是tag value,tag是可選的。

不過寫入數據時最好加上tag,因為它可以被索引。

tag value的類型只能是字符串。

Field:

上面的temperature是field key,82是field value。

field value會用於展示,value支持的類型有floats,integers,strings,booleans。

字段必須存在。注意,字段是沒有索引的。

如果使用字段作為查詢條件,會掃描符合查詢條件的所有字段值,性能不及tag。

Timestamp:

格式是:RFC3339 UTC。默認精確到納秒,可選。

2.influxdb概念

Series:

measurement, tag set, retention policy相同的數據集合算做一個 series。

理解這個概念至關重要,因為這些數據存儲在內存中,如果series太多,會導致OOM。

Retention Policy:

保留策略,包括設置數據保存的時間以及在集群中的副本個數。

默認配置是:RP是autogen,保留時間是永久,副本為1。

這些配置在創建數據庫時可以修改。

Shard:

存儲一定時間間隔的數據,每個目錄對應一個shard,目錄的名字就是shard id。每一個shard都有自己的cache、wal、tsm file以及compactor,目的就是通過時間來快速定位到要查詢數據的相關資源,加速查詢的過程,並且也讓之后的批量刪除數據的操作變得非常簡單且高效。

bucket:

即使是相同的measurement、tags、fields、time,即完全相同的point,因為Retention Policy不同,會存儲在不同的地方。

為了解決以上問題,influxdb2.0引入的概念,相當於database + Retention Policy

3.存儲引擎

概述:TSM Tree是在LSM Tree的基礎上稍作修改優化而來。

它主要包含四個部分:cache、wal、tsm file、compactor。

Cache

插入數據時,先往cache中寫入再寫入wal中,可以認為cache是wal文件中的數據在內存中的緩存。

WAL

預寫日志,對比mysql的binlog。

其作用就是為了持久化數據,當系統崩潰后可以通過wal文件恢復cache。

TSM File

每個tsm文件的大小上限是2GB。

當達到cache-snapshot-memory-size,cache-max-memory-size的限制時會觸發將cache寫入tsm文件。

Compactor

主要進行兩種操作,一種是cache數據達到閥值后,進行快照,生成一個新的tsm文件。另外一種就是合並當前的tsm文件,將多個小的tsm文件合並成一個,減少文件的數量,並且進行一些數據刪除操作。 這些操作都在后台自動完成。

4.目錄結構

InfluxDB的數據存儲有三個目錄,分別是meta、wal、data。

meta用於存儲數據庫的一些元數據,meta目錄下有一個meta.db文件。

wal目錄存放預寫日志文件,以.wal結尾。

data目錄存放實際存儲的數據文件,以.tsm結尾。

示例:

 

其中 test是數據庫名稱,autogen是存儲策略名稱,再下一層目錄中的以數字命名的目錄是 shard 的 ID 值,

比如 autogen存儲策略下有兩個 shard,ID 分別為 1 和 2。shard存儲了某一個時間段范圍內的數據。

再下一級的目錄則為具體的文件,分別是 .wal.tsm結尾的文件。

 三、讀寫數據

// insert
func writesPoints(cli client.Client) {
    bp, err := influxdb2.NewBatchPoints(client.BatchPointsConfig{
        Database:  "test",
        Precision: "s", //精度,默認ns
    })
    if err != nil {
        log.Fatal(err)
    }
    measurement := "cpu_usage"
    tags := map[string]string{"cpu": "ih-cpu"}
    fields := map[string]interface{}{
        "idle":   201.1,
        "system": 43.3,
        "user":   86.6,
    }
 
    pt, err := influxdb2.NewPoint(measurement, tags, fields, time.Now())
    if err != nil {
        log.Fatal(err)
    }
    bp.AddPoint(pt)
    err = cli.Write(bp)
    if err != nil {
        log.Fatal(err)
    }
    log.Println("insert success")
}
// query
func queryDB(cli client.Client, cmd string) (res []client.Result, err error) {
    q := influxdb2.Query{
        Command:  cmd,
        Database: "test",
    }
    if response, err := cli.Query(q); err == nil {
        if response.Error() != nil {
            return res, response.Error()
        }
        res = response.Results
    } else {
        return res, err
    }
    return res, nil
}

 

package main
 
import (
    "fmt"
    "log"
    "time"
 
    client "github.com/influxdata/influxdb1-client/v2"
)
 
// influxdb demo
 
func connInflux() client.Client {
    cli, err := client.NewHTTPClient(client.HTTPConfig{
        Addr:     "http://127.0.0.1:8086",
        Username: "admin",
        Password: "",
    })
    if err != nil {
        log.Fatal(err)
    }
    return cli
}
func main() {
    conn := connInflux()
    fmt.Println(conn)
 
    // insert
    writesPoints(conn)
 
    // 獲取10條數據並展示
    qs := fmt.Sprintf("SELECT * FROM %s LIMIT %d", "cpu_usage", 10)
    res, err := queryDB(conn, qs)
    if err != nil {
        log.Fatal(err)
    }
 
    for _, row := range res[0].Series[0].Values {
        for j, value := range row {
            log.Printf("j:%d value:%v\n", j, value)
        }
    }
}

http://testerhome.com/articles/30768

 https://www.sunzhongwei.com/amp/difference-between-bucket-and-measurement-in-influxdb

https://blog.csdn.net/zxxshaycormac/article/details/115497682


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM