InfluxDB的簡單使用


InfluxDB是一個時間序列數據庫,它被設計用於處理高寫入和查詢負載。

本文簡單介紹了如何下載、配置、啟動InfluxDB,以及如何使用InfluxDB客戶端進行數據操作。開發環境為:Windows10,influxdb-1.8.4,VS2015,Vibrant.InfluxDB.Client 3.5.1。

1、下載安裝啟動

(1) 下載

InfluxDB官網為:https://www.influxdata.com/,本文使用的是influxdb-1.8.4_windows_amd64。

對於InfluxDB 2.0,在下載頁面中,選擇好Version(當前為2.0.8)及Platform(Windows Binaries(64-bit) - using PowerShell)之后,網頁會刷新安裝命令,從中可以提取InfluxDB下載地址

wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.0.8-windows-amd64.zip -UseBasicParsing -OutFile influxdb2-2.0.8-windows-amd64.zip
Expand-Archive .\influxdb2-2.0.8-windows-amd64.zip -DestinationPath 'C:\Program Files\InfluxData\influxdb\'

InfluxDB 2.0參考https://docs.influxdata.com/influxdb/v2.0/get-started/

(2) 配置

Windows下的InfluxDB下載完成並解壓后是一個綠色軟件,僅僅做一下配置就能夠運行,配置文件為influxdb.conf。

- 綁定端口

# bind-address = "127.0.0.1:8088"
bind-address = ":8088"

- 修改[http]節點

enabled = true
bind-address = ":8086"
auth-enabled = true
realm = "InfluxDB"
log-enabled = true

(3) 啟動

配置完成后,將InfluxDB的啟動寫入批處理可以快速點擊啟動,當然也可以配置成Windows服務。

influxd.exe -config influxdb.conf

此外,InfluxDB Client也可以通過批處理的形式快速啟動。

@ECHO OFF
SETLOCAL
SET HOME=%~dp0
"%~dp0\influx.exe" %*
ENDLOCAL

InfluxDB Client遠程登錄如下:

@ECHO OFF
influx.exe -host remoteServerIp -port remoteServerPort
pause

InfluxDB Client需要使用auth命令進行授權,默認用戶名、密碼是admin、password。授權完成后,就可以使用InfluxDB的常用命令進行操作了。

2、客戶端

(1) 安裝客戶端

使用NuGet搜索Vibrant.InfluxDB.Client並安裝(本文使用3.5.1)。

(2) 封裝客戶端

下面對InfluxClient類進行封裝,抽象出初始化數據庫、異步添加數據、異步查詢個數、異步獲取數據等方法。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using AccelerateSensor.Database.Tools;
using Vibrant.InfluxDB.Client;

namespace AccelerateSensor.Database.InfluxDb
{
    public class InfluxDbHelper
    {
        private readonly InfluxClient _influxClient;
        private string _databaseName;

        public InfluxDbHelper(string influxHost, string username, string password)
        {
            _influxClient = new InfluxClient(new Uri(influxHost), username, password);
        }

        #region Init

        public void Init(string databaseName)
        {
            InitAsync(databaseName).GetAwaiter();
        }

        private async Task InitAsync(string databaseName)
        {
            //創建數據庫
            _databaseName = databaseName;
            await _influxClient.CreateDatabaseAsync(_databaseName);
        }

        #endregion

        #region AddData

        /// <summary>
        /// 添加數據
        /// </summary>
        /// <typeparam name="TInfluxData">泛型數據類型</typeparam>
        /// <param name="measurementName">表名稱</param>
        /// <param name="dataArray">數據數組</param>
        /// <param name="databaseName">數據庫名稱(可選)</param>
        public void AddData<TInfluxData>(string measurementName, TInfluxData[] dataArray, string databaseName = "")
            where TInfluxData : new()
        {
            AddDataAsync(databaseName, measurementName, dataArray).GetAwaiter();
        }

        private async Task AddDataAsync<TInfluxData>(
            string databaseName, string measurementName,
            TInfluxData[] dataArray) where TInfluxData : new()
        {
            try
            {
                databaseName = string.IsNullOrWhiteSpace(databaseName) ? _databaseName : databaseName;
                await _influxClient.WriteAsync(databaseName, measurementName, dataArray);
            }
            catch (Exception e)
            {
                LogHelper.AddErrorLog(e.Message);
            }
        }

        #endregion

        /// <summary>
        /// 獲取數據個數
        /// </summary>
        /// <param name="query">查詢條件</param>
        /// <param name="databaseName">數據庫名稱(可選)</param>
        /// <returns>數據個數</returns>
        public async Task<int> GetDataCountAsync(string query, string databaseName = "")
        {
            try
            {
                databaseName = string.IsNullOrWhiteSpace(databaseName) ? _databaseName : databaseName;
                var resultSet = await _influxClient.ReadAsync<CountInfo>(databaseName, query);

                // resultSet will contain 1 result in the Results collection (or multiple if you execute multiple queries at once)
                var result = resultSet.Results[0];
                if (!result.Succeeded)
                {
                    LogHelper.AddErrorLog(result.ErrorMessage);
                    return 0;
                }

                // result will contain 1 series in the Series collection (or potentially multiple if you specify a GROUP BY clause)
                var series = result.Series[0];
                return series.Rows[0].Count;
            }
            catch (Exception e)
            {
                LogHelper.AddErrorLog(e.Message);
                return 0;
            }
        }

        /// <summary>
        /// 異步獲取數據
        /// </summary>
        /// <typeparam name="TInfluxData">數據類型</typeparam>
        /// <param name="query">查詢條件</param>
        /// <param name="databaseName">數據庫名稱(可選)</param>
        /// <returns>數據集合</returns>
        public async Task<List<TInfluxData>> GetDataAsync<TInfluxData>(string query, string databaseName = "")
            where TInfluxData : new()
        {
            try
            {
                databaseName = string.IsNullOrWhiteSpace(databaseName) ? _databaseName : databaseName;
                var resultSet = await _influxClient.ReadAsync<TInfluxData>(databaseName, query);

                // resultSet will contain 1 result in the Results collection (or multiple if you execute multiple queries at once)
                var result = resultSet.Results[0];
                if (!result.Succeeded)
                {
                    LogHelper.AddErrorLog(result.ErrorMessage);
                    return new List<TInfluxData>();
                }

                // result will contain 1 series in the Series collection (or potentially multiple if you specify a GROUP BY clause)
                var series = result.Series[0];

                var dataList = new List<TInfluxData>();
                dataList.AddRange(series.Rows);
                return dataList;
            }
            catch (Exception e)
            {
                LogHelper.AddErrorLog(e.Message);
                return new List<TInfluxData>();
            }
        }

        /// <summary>
        /// 獲取表數據個數的輔助類
        /// </summary>
        private class CountInfo
        {
            [InfluxTimestamp]
            // ReSharper disable once UnusedMember.Local
            public DateTime Timestamp { get; set; }

            [InfluxField("count")]
            // ReSharper disable once UnusedAutoPropertyAccessor.Local
            public int Count { get; set; }
        }
    }
}

(3) 使用封裝類

首先定義Vibrant.InfluxDB.Client數據庫表類型。

using System;
using Vibrant.InfluxDB.Client;

namespace AccelerateSensor.Service.DbProxy.InfluxDb.Models
{
    public class NodeData
    {
        [InfluxTimestamp]
        public DateTime Timestamp { get; set; }

        /// <summary>
        /// 節點編號
        /// </summary>
        [InfluxTag("NodeUuid")]
        public string NodeUuid { get; set; }
        
        /// <summary>
        /// 數據類型
        /// </summary>
        [InfluxField("AcquireDataType")]
        public int AcquireDataType { get; set; }

        /// <summary>
        /// 節點值
        /// </summary>
        [InfluxField("Value")]
        public int Value { get; set; }

        /// <summary>
        /// 更新時間
        /// </summary>
        [InfluxField("UpdateTime")]
        public DateTime UpdateTime { get; set; }
    }
}

然后通過InfluxDbHelper實現數據庫初始化(創建)、添加數據、分頁查詢數據、條件查詢數據等功能。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using AccelerateSensor.Database.InfluxDb;
using AccelerateSensor.Service.Tools;
using InfluxNodeData = AccelerateSensor.Service.DbProxy.InfluxDb.Models.NodeData;

namespace AccelerateSensor.Service.DbProxy.InfluxDb
{
    internal class InfluxDbProxy
    {
        private readonly InfluxDbHelper _influxDbHelper;

        public InfluxDbProxy()
        {
            _influxDbHelper = new InfluxDbHelper(Constants.InfluxHost, Constants.Username, Constants.Password);
        }

        public void Init()
        {
            _influxDbHelper.Init(Constants.DatabaseName);
        }

        public void AddNodeData(InfluxNodeData influxNodeData)
        {
            _influxDbHelper.AddData(Constants.MeasurementName.NodeData, new[] { influxNodeData });
        }

        public async Task<int> GetNodeDataCountAsync(string nodeUuid)
        {
            var query = $"SELECT count(AcquireDataType) FROM {Constants.MeasurementName.NodeData} " +
                        $"WHERE NodeUuid = '{nodeUuid}'";
            return await _influxDbHelper.GetDataCountAsync(query);
        }

        public async Task<List<NodeData>> GetPageNodeDataAsync(
            string nodeUuid, int countPerPage, int curPage, int count)
        {
            var offset = countPerPage * (curPage - 1);
            var query = $"SELECT * FROM {Constants.MeasurementName.NodeData} " +
                        $"WHERE NodeUuid = '{nodeUuid}' " +
                        @"ORDER BY time DESC " +
                        $"LIMIT {count} OFFSET {offset}";
            var influxNodeDataList = await _influxDbHelper.GetDataAsync<InfluxNodeData>(query);
            return influxNodeDataList.Select(GetNodeData).ToList();
        }

        public async Task<List<InfluxNodeData>> GetNodeDataAsync(string nodeUuid, DateTime start, DateTime stop)
        {
            var query = $"SELECT * FROM {Constants.MeasurementName.NodeData} " +
                        $"WHERE NodeUuid = '{nodeUuid}' " +
                        $"AND time >= '{start:yyyy-MM-dd hh:mm:ss}' " +
                        $"AND time <= '{stop:yyyy-MM-dd hh:mm:ss}'";
            var influxNodeDataList = await _influxDbHelper.GetDataAsync<InfluxNodeData>(query);
            return influxNodeDataList.Select(GetNodeData).ToList();
        }
    }
}

3、注意事項

需要注意的是在Vibrant.InfluxDB.Client數據庫表對應的類定義中,對數據類型是有要求的。帶有InfluxTimestamp屬性的字段是必須的,表示添加數據的時間戳,是InluxDB中measurement(表)的主鍵。帶有InfluxTag屬性的字段會被建立索引,只能是string類型。帶有InfluxField屬性的字段是常規的表字段,可以表示具體行的若干數值。

4、附:InfluxDB Client常用命令 

(1) 登錄
> auth
> admin
> password

(2) 查看用戶
> show users

(3) 查看數據庫
> show databases

(4) 使用數據庫
> use database_name

(5) 查看數據庫表
> show measurements

(6) 查看field及tag
> show field keys from measurement_name;
> show tag keys from measurement_name;

(7) 格式化時間戳
> precision rfc3339

(8) 查詢數據: 按時間降序,查詢第101-200條記錄
> select * from measurement_name where tag_name = 'tag_value' order by time desc limit 100 offset 100


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM