Flink+Druid構建實時OLAP的探索


場景

k12在線教育公司的業務場景中,有一些業務場景需要實時統計和分析,如分析在線上課老師數量、學生數量,實時銷售額,課堂崩潰率等,需要實時反應上課的質量問題,以便於對整個公司的業務情況有大致的了解。

方案對比

對比了很多解決方案,如下幾種,列出來供參考。

方案 實時入庫 SQL支持度
Spark+CarbonData 支持 Spark SQL語法豐富
Kylin 不支持 支持join
Flink+Druid 支持 0.15以前不支持SQL,不支持join
  1. 上一篇文章所示,使用Spark+CarbonData也是一種解決方案,但是他的缺點也是比較明顯,如不能和Flink進行結合,因為我們整個的大數據規划的大致方向是,Spark用來作為離線計算,Flink作為實時計算,並且這兩個大方向短時間內不會改變;
  2. Kylin一直是老牌OLAP引擎,但是有個缺點無法滿足我們的需求,就是在技術選型的那個時間點kylin還不支持實時入庫(后續2.0版本支持實時入庫),所以就選擇了放棄;
  3. 使用Flink+Druid方式實現,這個時間選擇這個方案,簡直是順應潮流呀,Flink現在如日中天,各大廠都在使用,Druid是OLAP的新貴,關於它的文章也有很多,我也不贅述太多。有興趣的可以看下這篇文章,我的博客其它文章也有最新版本的安裝教程,實操方案哦。

設計方案

實時處理采用Flink SQL,實時入庫Druid方式采用 druid-kafka-indexing-service,另一種方式入庫方式,Tranquility,這種方式測試下來問題多多,放棄了。數據流向如下圖。

 

場景舉例

實時計算課堂連接掉線率。此事件包含兩個埋點上報,進入教室和掉線分別上報數據。druid設計的字段

flink的處理

將上報的數據進行解析,上報使用的是json格式,需要解析出所需要的字段然后發送到kafka。字段包含如下

sysTime,DateTime格式 pt,格式yyyy-MM-dd eventId,事件類型(enterRoom|disconnect) lessonId,課程ID 
Druid處理

啟動Druid Supervisor,消費Kafka里的數據,使用預聚合,配置如下

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "sac_core_analyze_v1",
    "parser": {
      "parseSpec": {
        "dimensionsSpec": {
          "spatialDimensions": [],
          "dimensions": [
            "eventId",
            "pt"
          ]
        },
        "format": "json",
        "timestampSpec": {
          "column": "sysTime",
          "format": "auto"
        }
      },
      "type": "string"
    },
    "metricsSpec": [
      {
            "filter": {
                "type": "selector",
                "dimension": "msg_type",
                "value": "disconnect"
            },
            "aggregator": {
                "name": "lesson_offline_molecule_id",
                "type": "cardinality",
                "fields": ["lesson_id"]
            },
            "type": "filtered"
        }, {
            "filter": {
                "type": "selector",
                "dimension": "msg_type",
                "value": "enterRoom"
            },
            "aggregator": {
                "name": "lesson_offline_denominator_id",
                "type": "cardinality",
                "fields": ["lesson_id"]
            },
            "type": "filtered"
        }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "DAY",
      "queryGranularity": {
        "type": "none"
      },
      "rollup": true,
      "intervals": null
    },
    "transformSpec": {
      "filter": null,
      "transforms": []
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsInMemory": 1000000,
    "maxBytesInMemory": 0,
    "maxRowsPerSegment": 5000000,
    "maxTotalRows": null,
    "intermediatePersistPeriod": "PT10M",
    "basePersistDirectory": "/tmp/1564535441619-2",
    "maxPendingPersists": 0,
    "indexSpec": {
      "bitmap": {
        "type": "concise"
      },
      "dimensionCompression": "lz4",
      "metricCompression": "lz4",
      "longEncoding": "longs"
    },
    "buildV9Directly": true,
    "reportParseExceptions": false,
    "handoffConditionTimeout": 0,
    "resetOffsetAutomatically": false,
    "segmentWriteOutMediumFactory": null,
    "workerThreads": null,
    "chatThreads": null,
    "chatRetries": 8,
    "httpTimeout": "PT10S",
    "shutdownTimeout": "PT80S",
    "offsetFetchPeriod": "PT30S",
    "intermediateHandoffPeriod": "P2147483647D",
    "logParseExceptions": false,
    "maxParseExceptions": 2147483647,
    "maxSavedParseExceptions": 0,
    "skipSequenceNumberAvailabilityCheck": false
  },
  "ioConfig": {
    "topic": "sac_druid_analyze_v2",
    "replicas": 2,
    "taskCount": 1,
    "taskDuration": "PT600S",
    "consumerProperties": {
      "bootstrap.servers": "bd-prod-kafka01:9092,bd-prod-kafka02:9092,bd-prod-kafka03:9092"
    },
    "pollTimeout": 100,
    "startDelay": "PT5S",
    "period": "PT30S",
    "useEarliestOffset": false,
    "completionTimeout": "PT1200S",
    "lateMessageRejectionPeriod": null,
    "earlyMessageRejectionPeriod": null,
    "stream": "sac_druid_analyze_v2",
    "useEarliestSequenceNumber": false
  },
  "context": null,
  "suspended": false
}
View Code

 

最重要的配置是metricsSpec,他主要定義了預聚合的字段和條件。

數據查詢

數據格式如下

pt eventId lesson_offline_molecule_id lesson_offline_denominator_id
2019-08-09 enterRoom "AQAAAAAAAA==" "AQAAAAAAAA=="
2019-08-09 disconnect "AQAAAAAAAA==" "AQAAAAAAAA=="

結果可以按照這樣的SQL出

SELECT pt,CAST(APPROX_COUNT_DISTINCT(lesson_offline_molecule_id) AS DOUBLE)/CAST(APPROX_COUNT_DISTINCT(lesson_offline_denominator_id) AS DOUBLE) from sac_core_analyze_v1 group by pt 

可以使用Druid的接口查詢結果,肥腸的方便~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM