基於Morphia實現MongoDB按小時、按天聚合操作


MongoDB按照天數或小時聚合

需求

最近接到需求,需要對用戶賬戶下的設備狀態,分別按照天以及小時進行聚合,以此為基礎繪制設備狀態趨勢圖.
實現思路是啟動定時任務,對各用戶的設備狀態數據分別按照小時以及天進行聚合,並存儲進數據庫中供用戶后續查詢.
涉及到的技術棧分別為:Spring Boot,MongoDB,Morphia.

數據模型

@Data
@Builder
@Entity(value = "rawDevStatus", noClassnameStored = true)
// 設備狀態索引
@Indexes({
        // 設置數據超時時間(TTL,MongoDB根據TTL在后台進行數據刪除操作)
        @Index(fields = @Field("time"), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),
        @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
})
public class RawDevStatus {

    @Id
    @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
    private ObjectId objectId;

    private String userId;

    private Instant time;

    @Embedded("points")
    List<Point> protocolPoints;

    @Data
    @AllArgsConstructor
    public static class Point {
        /**
         * 協議類型
         */
        private Protocol protocol;

        /**
         * 設備總數
         */
        private Integer total;

        /**
         * 設備在線數目
         */
        private Integer onlineNum;

        /**
         * 處於啟用狀態設備數目
         */
        private Integer enableNum;
    }
}

上述代碼是設備狀態實體類,其中設備狀態數據是按照設備所屬協議進行區分的.

@Data
@Builder
@Entity(value = "aggregationDevStatus", noClassnameStored = true)
@Indexes({
        @Index(fields = @Field("expireAt"), options = @IndexOptions(expireAfterSeconds = 0)),
        @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
})
public class AggregationDevStatus {

    @Id
    @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
    private ObjectId objectId;

    /**
     * 用戶ID
     */
    private String userId;

    /**
     * 設備總數
     */
    private Double total;

    /**
     * 設備在線數目
     */
    private Double onlineNum;

    /**
     * 處於啟用狀態設備數目
     */
    private Double enableNum;

    /**
     * 聚合類型(按照小時還是按照天聚合)
     */
    @Property("aggDuration")
    private AggregationDuration aggregationDuration;

    private Instant time;

    /**
     * 動態設置文檔過期時間
     */
    private Instant expireAt;
}

上述代碼是期待的聚合結果,其中構建兩個索引:(1)超時索引;(2)復合索引,程序會根據用戶名以及時間查詢設備狀態聚合結果.

聚合操作符介紹

聚合操作類似於管道,管道中的每一步操作產生的中間結果作為下一步的輸入源,最終輸出聚合結果.
此次聚合主要涉及以下操作:

  • $project:指定輸出文檔中的字段.
  • $unwind:拆分數據中的數組;
  • match:選擇要處理的文檔數據;
  • group:根據key分組聚合結果.

原始聚合語句

db.getCollection('raw_dev_status').aggregate([
    {$match:
        {
            time:{$gte: ISODate("2019-06-27T00:00:00Z")},
        }
    },
    {$unwind: "$points"},
    {$project:
        {
            userId:1,points:1,
            tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
        }
    },
    {$project:
        {
            userId:1,points:1,
            groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
        }
    },
    {$group:
        {
            _id:{user_id:'$userId', cal_time:'$groupTime'},
            devTotal:{'$avg':'$points.total'},
            onlineTotal:{'$avg':'$points.onlineNum'},
            enableTotal:{'$avg':'$points.enableNum'}
        }
    },
])

上述代碼是按小時聚合數據,以下來逐步介紹處理思路:

(1) $match

根據小時聚合數據,因為只需要獲取近24小時的聚合結果,所以對數據進行初步篩選.

(2) $unwind

raw_dev_status中的設備狀態是按照協議區分的數組,因此需要對其進行展開,以便下一步進行篩選;

(3) $project

    {$project:
        {
            userId:1,points:1,
            tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
        }
    }

選擇需要輸出的數據,分別為:userId,points以及tmp.
需要注意,為了按照時間聚合,對$time屬性進行操作,提取%Y:%m:%dT%H時信息至$tmp作為下一步的聚合依據.

如果需要按天聚合,則format數據可修改為:%Y:%m:%dT00:00:00Z即可滿足要求.

(4) $project

    {$project:
        {
            userId:1,points:1,
            groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
        }
    }

因為上一步project操作中,tmp為字符串數據,最終的聚合結果需要時間戳(主要懶,不想在程序中進行轉換操作).
因此,此處對$tmp進行操作,轉換為時間類型數據,即groupTime.

(5) $group

對聚合結果進行分類操作,並生成最終輸出結果.

    {$group:
        {
            # 根據_id進行分組操作,依據是`user_id`以及`$groupTime`
            _id:{user_id:'$userId', cal_time:'$groupTime'},
            # 求設備總數平均值
            devTotal:{'$avg':'$points.total'},
            # 求設備在線數平均值
            onlineTotal:{'$avg':'$points.onlineNum'},
            # ...
            enableTotal:{'$avg':'$points.enableNum'}
        }
    }

代碼編寫

此處ODM選擇Morphia,亦可以使用MongoTemplate,原理類似.

    /**
     * 創建聚合條件
     *
     * @param pastTime     過去時間段
     * @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z)
     * @return 聚合條件
     */
    private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) {
        Query<RawDevStatus> query = datastore.createQuery(RawDevStatus.class);
        return datastore.createAggregation(RawDevStatus.class)
                .match(query.field("time").greaterThanOrEq(pastTime))
                .unwind("points", new UnwindOptions().preserveNullAndEmptyArrays(false))
                .match(query.field("points.protocol").equal("ALL"))
                .project(Projection.projection("userId"),
                        Projection.projection("points"),
                        Projection.projection("convertTime",
                                Projection.expression("$dateToString",
                                        new BasicDBObject("format", dateToString)
                                                .append("date", "$time"))
                        )
                )
                .project(Projection.projection("userId"),
                        Projection.projection("points"),
                        Projection.projection("convertTime",
                                Projection.expression("$dateFromString",
                                        new BasicDBObject("format", stringToDate)
                                                .append("dateString", "$convertTime"))
                        )
                )
                .group(
                        Group.id(Group.grouping("userId"), Group.grouping("convertTime")),
                        Group.grouping("total", Group.average("points.total")),
                        Group.grouping("onlineNum", Group.average("points.onlineNum")),
                        Group.grouping("enableNum", Group.average("points.enableNum"))
                );
    }

    /**
     * 獲取聚合結果
     *
     * @param pipeline 聚合條件
     * @return 聚合結果
     */
    private List<AggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) {
        List<AggregationMidDevStatus> statuses = new ArrayList<>();
        Iterator<AggregationMidDevStatus> resultIterator = pipeline.aggregate(
                AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse(true).build());
        while (resultIterator.hasNext()) {
            statuses.add(resultIterator.next());
        }
        return statuses;
    }

    //......................................................................................
    // 獲取聚合結果(省略若干代碼)
    AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate);
    List<AggregationMidDevStatus> midStatuses = getAggregationResult(pipeline);
    if (CollectionUtils.isEmpty(midStatuses)) {
        log.warn("Can not get dev status aggregation result.");
        return;
    }

PS:
如果您覺得我的文章對您有幫助,請關注我的微信公眾號,謝謝!
程序員打怪之路


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM