MongoDB按照天數或小時聚合
需求
最近接到需求,需要對用戶賬戶下的設備狀態,分別按照天以及小時進行聚合,以此為基礎繪制設備狀態趨勢圖.
實現思路是啟動定時任務,對各用戶的設備狀態數據分別按照小時以及天進行聚合,並存儲進數據庫中供用戶后續查詢.
涉及到的技術棧分別為:Spring Boot
,MongoDB
,Morphia
.
數據模型
@Data
@Builder
@Entity(value = "rawDevStatus", noClassnameStored = true)
// 設備狀態索引
@Indexes({
// 設置數據超時時間(TTL,MongoDB根據TTL在后台進行數據刪除操作)
@Index(fields = @Field("time"), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),
@Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
})
public class RawDevStatus {
@Id
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
private ObjectId objectId;
private String userId;
private Instant time;
@Embedded("points")
List<Point> protocolPoints;
@Data
@AllArgsConstructor
public static class Point {
/**
* 協議類型
*/
private Protocol protocol;
/**
* 設備總數
*/
private Integer total;
/**
* 設備在線數目
*/
private Integer onlineNum;
/**
* 處於啟用狀態設備數目
*/
private Integer enableNum;
}
}
上述代碼是設備狀態
實體類,其中設備狀態數據是按照設備所屬協議
進行區分的.
@Data
@Builder
@Entity(value = "aggregationDevStatus", noClassnameStored = true)
@Indexes({
@Index(fields = @Field("expireAt"), options = @IndexOptions(expireAfterSeconds = 0)),
@Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
})
public class AggregationDevStatus {
@Id
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
private ObjectId objectId;
/**
* 用戶ID
*/
private String userId;
/**
* 設備總數
*/
private Double total;
/**
* 設備在線數目
*/
private Double onlineNum;
/**
* 處於啟用狀態設備數目
*/
private Double enableNum;
/**
* 聚合類型(按照小時還是按照天聚合)
*/
@Property("aggDuration")
private AggregationDuration aggregationDuration;
private Instant time;
/**
* 動態設置文檔過期時間
*/
private Instant expireAt;
}
上述代碼是期待的聚合結果,其中構建兩個索引:(1)超時索引;(2)復合索引,程序會根據用戶名以及時間查詢設備狀態聚合結果.
聚合操作符介紹
聚合操作類似於管道,管道中的每一步操作產生的中間結果作為下一步的輸入源,最終輸出聚合結果.
此次聚合主要涉及以下操作:
$project
:指定輸出文檔中的字段.$unwind
:拆分數據中的數組;match
:選擇要處理的文檔數據;group
:根據key
分組聚合結果.
原始聚合語句
db.getCollection('raw_dev_status').aggregate([
{$match:
{
time:{$gte: ISODate("2019-06-27T00:00:00Z")},
}
},
{$unwind: "$points"},
{$project:
{
userId:1,points:1,
tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
}
},
{$project:
{
userId:1,points:1,
groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
}
},
{$group:
{
_id:{user_id:'$userId', cal_time:'$groupTime'},
devTotal:{'$avg':'$points.total'},
onlineTotal:{'$avg':'$points.onlineNum'},
enableTotal:{'$avg':'$points.enableNum'}
}
},
])
上述代碼是按小時聚合數據,以下來逐步介紹處理思路:
(1) $match
根據小時聚合數據,因為只需要獲取近24小時的聚合結果,所以對數據進行初步篩選.
(2) $unwind
raw_dev_status
中的設備狀態是按照協議區分的數組,因此需要對其進行展開,以便下一步進行篩選;
(3) $project
{$project:
{
userId:1,points:1,
tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
}
}
選擇需要輸出的數據,分別為:userId
,points
以及tmp
.
需要注意,為了按照時間聚合,對$time
屬性進行操作,提取%Y:%m:%dT%H
時信息至$tmp
作為下一步的聚合依據.
如果需要按天聚合,則
format
數據可修改為:%Y:%m:%dT00:00:00Z
即可滿足要求.
(4) $project
{$project:
{
userId:1,points:1,
groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
}
}
因為上一步project
操作中,tmp
為字符串數據,最終的聚合結果需要時間戳(主要懶,不想在程序中進行轉換操作).
因此,此處對$tmp
進行操作,轉換為時間類型數據,即groupTime
.
(5) $group
對聚合結果進行分類操作,並生成最終輸出結果.
{$group:
{
# 根據_id進行分組操作,依據是`user_id`以及`$groupTime`
_id:{user_id:'$userId', cal_time:'$groupTime'},
# 求設備總數平均值
devTotal:{'$avg':'$points.total'},
# 求設備在線數平均值
onlineTotal:{'$avg':'$points.onlineNum'},
# ...
enableTotal:{'$avg':'$points.enableNum'}
}
}
代碼編寫
此處ODM
選擇Morphia
,亦可以使用MongoTemplate
,原理類似.
/**
* 創建聚合條件
*
* @param pastTime 過去時間段
* @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z)
* @return 聚合條件
*/
private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) {
Query<RawDevStatus> query = datastore.createQuery(RawDevStatus.class);
return datastore.createAggregation(RawDevStatus.class)
.match(query.field("time").greaterThanOrEq(pastTime))
.unwind("points", new UnwindOptions().preserveNullAndEmptyArrays(false))
.match(query.field("points.protocol").equal("ALL"))
.project(Projection.projection("userId"),
Projection.projection("points"),
Projection.projection("convertTime",
Projection.expression("$dateToString",
new BasicDBObject("format", dateToString)
.append("date", "$time"))
)
)
.project(Projection.projection("userId"),
Projection.projection("points"),
Projection.projection("convertTime",
Projection.expression("$dateFromString",
new BasicDBObject("format", stringToDate)
.append("dateString", "$convertTime"))
)
)
.group(
Group.id(Group.grouping("userId"), Group.grouping("convertTime")),
Group.grouping("total", Group.average("points.total")),
Group.grouping("onlineNum", Group.average("points.onlineNum")),
Group.grouping("enableNum", Group.average("points.enableNum"))
);
}
/**
* 獲取聚合結果
*
* @param pipeline 聚合條件
* @return 聚合結果
*/
private List<AggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) {
List<AggregationMidDevStatus> statuses = new ArrayList<>();
Iterator<AggregationMidDevStatus> resultIterator = pipeline.aggregate(
AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse(true).build());
while (resultIterator.hasNext()) {
statuses.add(resultIterator.next());
}
return statuses;
}
//......................................................................................
// 獲取聚合結果(省略若干代碼)
AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate);
List<AggregationMidDevStatus> midStatuses = getAggregationResult(pipeline);
if (CollectionUtils.isEmpty(midStatuses)) {
log.warn("Can not get dev status aggregation result.");
return;
}
PS:
如果您覺得我的文章對您有幫助,請關注我的微信公眾號,謝謝!