InfluxDB是一個時間序列數據庫,它被設計用於處理高寫入和查詢負載。
本文簡單介紹了如何下載、配置、啟動InfluxDB,以及如何使用InfluxDB客戶端進行數據操作。開發環境為:Windows10,influxdb-1.8.4,VS2015,Vibrant.InfluxDB.Client 3.5.1。
1、下載安裝啟動
(1) 下載
InfluxDB官網為:https://www.influxdata.com/,本文使用的是influxdb-1.8.4_windows_amd64。
對於InfluxDB 2.0,在下載頁面中,選擇好Version(當前為2.0.8)及Platform(Windows Binaries(64-bit) - using PowerShell)之后,網頁會刷新安裝命令,從中可以提取InfluxDB下載地址。
wget https://dl.influxdata.com/influxdb/releases/influxdb2-2.0.8-windows-amd64.zip -UseBasicParsing -OutFile influxdb2-2.0.8-windows-amd64.zip
Expand-Archive .\influxdb2-2.0.8-windows-amd64.zip -DestinationPath 'C:\Program Files\InfluxData\influxdb\'
InfluxDB 2.0參考https://docs.influxdata.com/influxdb/v2.0/get-started/。
(2) 配置
Windows下的InfluxDB下載完成並解壓后是一個綠色軟件,僅僅做一下配置就能夠運行,配置文件為influxdb.conf。
- 綁定端口
# bind-address = "127.0.0.1:8088" bind-address = ":8088"
- 修改[http]節點
enabled = true bind-address = ":8086" auth-enabled = true realm = "InfluxDB" log-enabled = true
(3) 啟動
配置完成后,將InfluxDB的啟動寫入批處理可以快速點擊啟動,當然也可以配置成Windows服務。
influxd.exe -config influxdb.conf
此外,InfluxDB Client也可以通過批處理的形式快速啟動。
@ECHO OFF SETLOCAL SET HOME=%~dp0 "%~dp0\influx.exe" %* ENDLOCAL
InfluxDB Client遠程登錄如下:
@ECHO OFF influx.exe -host remoteServerIp -port remoteServerPort pause
InfluxDB Client需要使用auth命令進行授權,默認用戶名、密碼是admin、password。授權完成后,就可以使用InfluxDB的常用命令進行操作了。
2、客戶端
(1) 安裝客戶端
使用NuGet搜索Vibrant.InfluxDB.Client並安裝(本文使用3.5.1)。
(2) 封裝客戶端
下面對InfluxClient類進行封裝,抽象出初始化數據庫、異步添加數據、異步查詢個數、異步獲取數據等方法。
using System; using System.Collections.Generic; using System.Threading.Tasks; using AccelerateSensor.Database.Tools; using Vibrant.InfluxDB.Client; namespace AccelerateSensor.Database.InfluxDb { public class InfluxDbHelper { private readonly InfluxClient _influxClient; private string _databaseName; public InfluxDbHelper(string influxHost, string username, string password) { _influxClient = new InfluxClient(new Uri(influxHost), username, password); } #region Init public void Init(string databaseName) { InitAsync(databaseName).GetAwaiter(); } private async Task InitAsync(string databaseName) { //創建數據庫 _databaseName = databaseName; await _influxClient.CreateDatabaseAsync(_databaseName); } #endregion #region AddData /// <summary> /// 添加數據 /// </summary> /// <typeparam name="TInfluxData">泛型數據類型</typeparam> /// <param name="measurementName">表名稱</param> /// <param name="dataArray">數據數組</param> /// <param name="databaseName">數據庫名稱(可選)</param> public void AddData<TInfluxData>(string measurementName, TInfluxData[] dataArray, string databaseName = "") where TInfluxData : new() { AddDataAsync(databaseName, measurementName, dataArray).GetAwaiter(); } private async Task AddDataAsync<TInfluxData>( string databaseName, string measurementName, TInfluxData[] dataArray) where TInfluxData : new() { try { databaseName = string.IsNullOrWhiteSpace(databaseName) ? _databaseName : databaseName; await _influxClient.WriteAsync(databaseName, measurementName, dataArray); } catch (Exception e) { LogHelper.AddErrorLog(e.Message); } } #endregion /// <summary> /// 獲取數據個數 /// </summary> /// <param name="query">查詢條件</param> /// <param name="databaseName">數據庫名稱(可選)</param> /// <returns>數據個數</returns> public async Task<int> GetDataCountAsync(string query, string databaseName = "") { try { databaseName = string.IsNullOrWhiteSpace(databaseName) ? _databaseName : databaseName; var resultSet = await _influxClient.ReadAsync<CountInfo>(databaseName, query); // resultSet will contain 1 result in the Results collection (or multiple if you execute multiple queries at once) var result = resultSet.Results[0]; if (!result.Succeeded) { LogHelper.AddErrorLog(result.ErrorMessage); return 0; } // result will contain 1 series in the Series collection (or potentially multiple if you specify a GROUP BY clause) var series = result.Series[0]; return series.Rows[0].Count; } catch (Exception e) { LogHelper.AddErrorLog(e.Message); return 0; } } /// <summary> /// 異步獲取數據 /// </summary> /// <typeparam name="TInfluxData">數據類型</typeparam> /// <param name="query">查詢條件</param> /// <param name="databaseName">數據庫名稱(可選)</param> /// <returns>數據集合</returns> public async Task<List<TInfluxData>> GetDataAsync<TInfluxData>(string query, string databaseName = "") where TInfluxData : new() { try { databaseName = string.IsNullOrWhiteSpace(databaseName) ? _databaseName : databaseName; var resultSet = await _influxClient.ReadAsync<TInfluxData>(databaseName, query); // resultSet will contain 1 result in the Results collection (or multiple if you execute multiple queries at once) var result = resultSet.Results[0]; if (!result.Succeeded) { LogHelper.AddErrorLog(result.ErrorMessage); return new List<TInfluxData>(); } // result will contain 1 series in the Series collection (or potentially multiple if you specify a GROUP BY clause) var series = result.Series[0]; var dataList = new List<TInfluxData>(); dataList.AddRange(series.Rows); return dataList; } catch (Exception e) { LogHelper.AddErrorLog(e.Message); return new List<TInfluxData>(); } } /// <summary> /// 獲取表數據個數的輔助類 /// </summary> private class CountInfo { [InfluxTimestamp] // ReSharper disable once UnusedMember.Local public DateTime Timestamp { get; set; } [InfluxField("count")] // ReSharper disable once UnusedAutoPropertyAccessor.Local public int Count { get; set; } } } }
(3) 使用封裝類
首先定義Vibrant.InfluxDB.Client數據庫表類型。
using System; using Vibrant.InfluxDB.Client; namespace AccelerateSensor.Service.DbProxy.InfluxDb.Models { public class NodeData { [InfluxTimestamp] public DateTime Timestamp { get; set; } /// <summary> /// 節點編號 /// </summary> [InfluxTag("NodeUuid")] public string NodeUuid { get; set; } /// <summary> /// 數據類型 /// </summary> [InfluxField("AcquireDataType")] public int AcquireDataType { get; set; } /// <summary> /// 節點值 /// </summary> [InfluxField("Value")] public int Value { get; set; } /// <summary> /// 更新時間 /// </summary> [InfluxField("UpdateTime")] public DateTime UpdateTime { get; set; } } }
然后通過InfluxDbHelper實現數據庫初始化(創建)、添加數據、分頁查詢數據、條件查詢數據等功能。
using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using AccelerateSensor.Database.InfluxDb; using AccelerateSensor.Service.Tools; using InfluxNodeData = AccelerateSensor.Service.DbProxy.InfluxDb.Models.NodeData; namespace AccelerateSensor.Service.DbProxy.InfluxDb { internal class InfluxDbProxy { private readonly InfluxDbHelper _influxDbHelper; public InfluxDbProxy() { _influxDbHelper = new InfluxDbHelper(Constants.InfluxHost, Constants.Username, Constants.Password); } public void Init() { _influxDbHelper.Init(Constants.DatabaseName); } public void AddNodeData(InfluxNodeData influxNodeData) { _influxDbHelper.AddData(Constants.MeasurementName.NodeData, new[] { influxNodeData }); } public async Task<int> GetNodeDataCountAsync(string nodeUuid) { var query = $"SELECT count(AcquireDataType) FROM {Constants.MeasurementName.NodeData} " + $"WHERE NodeUuid = '{nodeUuid}'"; return await _influxDbHelper.GetDataCountAsync(query); } public async Task<List<NodeData>> GetPageNodeDataAsync( string nodeUuid, int countPerPage, int curPage, int count) { var offset = countPerPage * (curPage - 1); var query = $"SELECT * FROM {Constants.MeasurementName.NodeData} " + $"WHERE NodeUuid = '{nodeUuid}' " + @"ORDER BY time DESC " + $"LIMIT {count} OFFSET {offset}"; var influxNodeDataList = await _influxDbHelper.GetDataAsync<InfluxNodeData>(query); return influxNodeDataList.Select(GetNodeData).ToList(); } public async Task<List<InfluxNodeData>> GetNodeDataAsync(string nodeUuid, DateTime start, DateTime stop) { var query = $"SELECT * FROM {Constants.MeasurementName.NodeData} " + $"WHERE NodeUuid = '{nodeUuid}' " + $"AND time >= '{start:yyyy-MM-dd hh:mm:ss}' " + $"AND time <= '{stop:yyyy-MM-dd hh:mm:ss}'"; var influxNodeDataList = await _influxDbHelper.GetDataAsync<InfluxNodeData>(query); return influxNodeDataList.Select(GetNodeData).ToList(); } } }
3、注意事項
需要注意的是在Vibrant.InfluxDB.Client數據庫表對應的類定義中,對數據類型是有要求的。帶有InfluxTimestamp屬性的字段是必須的,表示添加數據的時間戳,是InluxDB中measurement(表)的主鍵。帶有InfluxTag屬性的字段會被建立索引,只能是string類型。帶有InfluxField屬性的字段是常規的表字段,可以表示具體行的若干數值。
4、附:InfluxDB Client常用命令
(1) 登錄
> auth
> admin
> password
(2) 查看用戶
> show users
(3) 查看數據庫
> show databases
(4) 使用數據庫
> use database_name
(5) 查看數據庫表
> show measurements
(6) 查看field及tag
> show field keys from measurement_name;
> show tag keys from measurement_name;
(7) 格式化時間戳
> precision rfc3339
(8) 查詢數據: 按時間降序,查詢第101-200條記錄
> select * from measurement_name where tag_name = 'tag_value' order by time desc limit 100 offset 100