package cn.test.jmeter.util; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Point.Builder; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; /** * @author weisy * @date 2021/5/11 * @Description */ public class InfluxDbUtil { private static String openurl = "http://IP:8086";//連接地址 private static String username = "admin";//用戶名 private static String password = "admin";//密碼 private static String database = "jmeter";//數據庫 private String measurement;//表名 private InfluxDB influxDB; public InfluxDbUtil(String username, String password, String openurl, String database) { this.username = username; this.password = password; this.openurl = openurl; this.database = database; } public static InfluxDbUtil setUp() { //創建 連接 InfluxDbUtil influxDbUtil = new InfluxDbUtil(username, password, openurl, database); influxDbUtil.influxDbBuild(); // influxDbUtil.createRetentionPolicy(); // influxDB.deleteDB(database); // influxDB.createDB(database); return influxDbUtil; } /** * 連接時序數據庫;獲得InfluxDB **/ public InfluxDB influxDbBuild() { if (influxDB == null) { influxDB = InfluxDBFactory.connect(openurl, username, password); influxDB.createDatabase(database); // influxDB.setRetentionPolicy("one_month"); } return influxDB; } /** * * 設置數據保存策略 * * defalut 策略名 /database 數據庫名/ 30d 數據保存時限30天/ 1 副本個數為1/ 結尾DEFAULT 表示 設為默認的策略 * */ // public void createRetentionPolicy() { // String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT", // "defalut", database, "30d", 1); // this.query(command); // } /** * * 查詢 * * @param command 查詢語句 * * @return * */ public QueryResult query(String command) { return influxDB.query(new Query(command, database)); } /** * * 插入 * * @param tags 標簽 * * @param fields 字段 * */ /* * 單條插入:多個tag多個field * */ public void insert1(Map<String, String> tags, Map<String, Object> fields,String measurement) { Builder builder = Point.measurement(measurement); builder.tag(tags); builder.fields(fields); influxDB.write(database, "", builder.build()); } /* * 單條插入:多個tag多個field自定義時間戳 * */ public void insert2(Map<String, String> tags, Map<String, Object> fields,String measurement,Long timeStamp,TimeUnit timeUnit) { Builder builder = Point.measurement(measurement); builder.tag(tags); builder.fields(fields); if (0 != timeStamp) { builder.time(timeStamp, timeUnit); } influxDB.write(database, "", builder.build()); } /* * 單條插入:單條tag單條field * */ public void insert3(String tags, Integer fields,String measurement,Long timeStamp) { Point point = Point.measurement(measurement) .time(timeStamp, TimeUnit.MILLISECONDS) .tag("label", tags) .addField("value", fields).build(); influxDB.write("jmeter", "", point); } /* * 批量插入 * */ public void insertBatch(ArrayList<Map<String, String>> sqlserverlist, String measurement) { BatchPoints batchPoints = BatchPoints .database("jmeter") .build(); //遍歷sqlserver獲取數據 for(Map<String, String> map : sqlserverlist) { //創建單條數據對象——表名 Point point = Point.measurement(measurement) .time(Long.parseLong(map.get("timeStamp")), TimeUnit.MILLISECONDS) //tag屬性——只能存儲String類型 .tag("label", map.get("label")) //field存儲數據 .addField("value", 1) .addField("rt", map.get("rt")) .build(); //將單條數據存儲到集合中 batchPoints.point(point); } influxDB.write(batchPoints); } public void insertBatch2(ArrayList<Map<String, String>> sqlserverlist, String measurement) { BatchPoints batchPoints = BatchPoints .database("jmeter") .build(); //遍歷sqlserver獲取數據 for(Map<String, String> map : sqlserverlist) { //創建單條數據對象——表名 Point point = Point.measurement(measurement) .time(Long.parseLong(map.get("timeStamp")), TimeUnit.MILLISECONDS) //tag屬性——只能存儲String類型 .tag("label", map.get("label")) //field存儲數據 .addField("value", Integer.valueOf(map.get("value"))) .build(); //將單條數據存儲到集合中 batchPoints.point(point); } influxDB.write(batchPoints); } /** * * 刪除 * * @param command 刪除語句 * * @return 返回錯誤信息 * */ public String deleteMeasurementData(String command) { QueryResult result = influxDB.query(new Query(command, database)); return result.getError(); } /** * * 創建數據庫 * * @param dbName * */ @SuppressWarnings("deprecation") public void createDB(String dbName) { influxDB.createDatabase(dbName); } /** * * 刪除數據庫 * * @param dbName * */ public void deleteDB(String dbName) { influxDB.deleteDatabase(dbName); } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getOpenurl() { return openurl; } public void setOpenurl(String openurl) { this.openurl = openurl; } public void setDatabase(String database) { this.database = database; } public static void main(String[] args) { InfluxDbUtil influxDB = InfluxDbUtil.setUp(); Map<String, String> tags = new HashMap<>(); Map<String, Object> fields = new HashMap<>(); tags.put("TAG_NAME","abc"); fields.put("TAG_VALUE","111"); // try { // fields.put("TIMAMPEST", DateUtil.getCurrentDateStr()); // } catch (Exception e) { // e.printStackTrace(); // } influxDB.insert1(tags, fields,"table2"); //查詢 QueryResult result= influxDB.query("select *from table2 order by time"); System.out.println(result); } }