實時離線一體大數據在資產租賃saas服務中使用


流水查詢需求

  • 需求第一期:

基於TB級的在線數據,支持繳費帳單明細在線查詢。大家都知道,像銀行帳單流水一樣,查幾年的流水是常有的事。

支持的維度查詢:帳期、欠費狀態、日期范圍、費用科目類型、房屋分類、房屋所屬項目、關聯合同信息、統計列

什么是實時數據

實時可以分為:實時采集、實時計算、高性能,底延時的產出結果數據。實時數據指從源系統中實時采集的數據,以及對實時采集的數據進行實時計算直接產生的中間數據或結果數據。實時數據具有時間有效性,隨着時間的推移,實時數據會失效。

即時查詢系統

房屋租賃費用、水電費用、物業管理費用等數據的有效期,一般是不定的,比如辦公租賃可能預交費用5年、10年。那么這種數據,對於業務來說,仍然屬於線上數據,是不可歸檔的數據。 長時間無法歸檔數據,會造成數據越積越大,對於輕量級數據庫MySQL來說,是個很大的挑戰。就算做好分庫分表的准備。條件復雜的查詢在聚合的時候也一樣容易搞爆內存。何況系統在dal層設計得有所欠缺。為滿足流水查詢而重構,太得不償失。從需求來看,不涉及OLTP,只需實現OLAP的解決方案。為了不影響業務系統的改造、數據庫重構等方面。決定引入了即時查詢系統解決方案。

業務需求轉化技術需求:

  • 帳單明細查詢可由七十多個條件的隨機組合,不能使用類似kylin這樣的預處理技術來解決。支持N年范圍的在線查詢
  • 支持復雜條件查詢,如:聯合多表,嵌套多層left join
  • 為減少業務側的sql改動量,需要盡可能的支持標准SQL
  • 頻繁變更的業務數據需要實時同步更新

根據以上技術需求點和經過技術的篩選后,落地的架構是這樣:

即時查詢系統架構

架構實現

數據實時同步—Confluent Platform架構實現

debezuim:業務庫使用的是MySql,如果在即時查詢系統中查詢到的結果與業務系統查詢結果同等,需要實時同步業務數據,並實時提供查詢能力。debezium是一個低延遲的流式處理工具,能夠捕獲數據庫更改,並且利用Kafka和Kafka Connect記錄到kafka中,實現了自己的持久性、可靠性和容錯性。

Confluent Platform:Mysql到Kudu,需要穩定高效、可彈性伸縮、在異構數據源之間高速穩定同步能力的數據集成解決方案。基於紅火的kafka之上,Kafka Connect是首選。它使得能夠快速定義將大量數據集合移入和移出Kafka的連接器變得簡單。當在distributed的工作模式下,具有高擴展性,和自動容錯機制。confluent platform支持了很多Kafka connect的實現,為后續擴展數據集成服務提供了便利,debezium-connector就是其中之一。除此之外,confluent platform使用Kafka Schema Registry提供Avro序列化支持,為序列化提高了性能。但是Confluent平台的社區版本提供的功能還是比較有限的,比如提供監控界面化管理的confluent center是屬於商業版的。為此,我們自研了含有監控運維功能的《數據集成服務》,后續文章將詳細介紹並提供在線體驗。

Kudu-connector:confluent platform中雖然提供了Kudu Connector (Source and Sink),但是需要依賴Impala和Hive。而從需求和架構上看,並不需要這些東西,為遵守輕量原則、為避免太多依賴,我們自己實現了輕量級的Kudu-connector(源碼地址:https://github.com/dengbp/big-well)。Kudu-Connector需要借助同步管理配置好需要同步的表、同步規則,並創建好目標表,最后創建連接器同步數據。Kudu-Connector暫時沒有自動創建目標表能力。下個版本實現。

(網絡圖)

創建source和sink連接器參數:

//source connector:
curl -i -X POST  -H  "Content-Type:application/json" http://localhost:8083/connectors/  -d '{

"name": "test_data-bill_rating_rule_def-connector-souces",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "127.0.0.1",
"database.port": "3306",
"database.user": "platform",
"database.password": "platform@0708",
"database.server.id": "169798195",
"database.server.name": "test_data_connector",
"database.whitelist": "test_data",
"table.whitelist": "test_data.bill_rating_rule_def",
"database.history.kafka.bootstrap.servers": "broker161:9092,broker162:9092,broker163:9092,broker166:9092,broker164:9092,cdh6-slave1:9092,cmhk-b-sl-236:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"include.schema.changes": "true",
"database.history.skip.unparseable.ddl": "true",
"decimal.handling.mode": "string",
"event.deserialization.failure.handling.mode:": "ERROR"
}
}


//sink connector:
curl -i -X POST -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '

{
"name": "test_data-bill_rating_rule_def-connector-sink-49",
"config": {
"connector.class": "com.yr.connector.KuduSinkConnector",
"tasks.max": "16",
"topics": "test_data_connector.test_data.bill_rating_rule_def",
"topic.table.map": "test_data_connector.test_data.bill_rating_rule_def:bill_rating_rule_def",
"table.list": "bill_rating_rule_def",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"database.history.kafka.bootstrap.servers": "broker161:9092,broker162:9092,broker163:9092,broker166:9092,broker164:9092,cdh6-slave1:9092,cmhk-b-sl-236:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"kudu.masters": "kudu167:7051,cdh6-slave1:7051,cmhk-b-sl-236:7051,cdh6-slave2:7051,cdh6-slave3:7051,cdh6-slave4:7051,cdh6-slave5:7051,cdh6-slave6:7051,cdh6-slave7:7051,cdh6-slave8:7051,cdh6-slave9:7051",
"max.retries": "3",
"retry.backoff.ms": "1000",
"behavior.on.exception": "FAIL",
"linger.ms": "1000",
"batch.size": "5000",
"max.buffered.records": "8000",
"flush.timeout.ms": "6000",
"offset.flush.timeout.ms": "5000"
}
}'

 

 實時數倉—kudu

分布式列式存儲,支持行級更新,在OLAP場景下,能快速處理,跟其它大數據框架緊密集成(下文會與Hive Metastore集成,為上層開發訪問下層數據的提供統一入口),本身既具有高可用性,又是Cloudera家族的大數據生態圈成員,為系統自身擴展提供了極大空間。本次需求,主要是同步帳單表數據,和帳單查詢信息用到的關聯表數據,如:租賃合同數據、項目數據、房屋數據、帳單類型等數據。從業務數據特點分析,需要對帳單表ID和帳單類型做哈希分區,對帳單創建時間做范圍分區來創建帳單目標表,這樣既可以實現數據分布均勻,又可以在每個分片中保留指定的數據,同時對時間分區繼續擴展。其它關聯的數據表,根據查詢關聯特點,同樣使用哈希分區和范圍分區組合。

                                 (網絡圖)

查詢引擎—實現秒級響應—Presto

不依賴hive元數據服務,部署簡單。在sql語法方面,雖然存在小部分與標准相違背(如:分頁需要 ORDER BY、時間比較需要用TIMESTAMP先轉換等),但整體支持標准sql度極高。這對於當前業務系統改動成本小。業務接入時,除了部分sql在性能上需要做優化外,只需要配置多個JDBC數據源即可。這只是理想狀態,由於整個業務系統使用的是msyql數據庫,所以慢長的開發過程中,難免會用到mysql特有的語法,這就造成更麻煩的sql兼容問題。在這方面,我們選擇對官方提供的presto-jdbc做二開,使其盡可能多的支持mysql語法,如group by、時間大小比較等。

擴大業務覆蓋率

由最初的帳單明細查詢,發展成整個平台通用的即時查詢系統。所有涉及OLAP查詢,TB級以上的數據都接入了即時查詢系統。服務部署也由原來的十幾個節點,發展到了三十多個節點。

部署配置
服務名稱 節點數 配置
Confluent Platform 7 32核64g
Kudu 11 32核64g
Presto 15 32核64g
Zeppelin 1 6核32g

大數據需求

  • 需求第二期:

在資產租賃管理服務中,除了要了解客戶投訴情況、滿意度調查、水電使用情況、設備故障等統計分析之外,還需要幫客戶做租賃業務的精准營銷,網絡爬取同行公開數據,提供競品數據分析,指導客戶業務決策。

實時離線一體化系統之技術架構

實時離線一體化系統之技術架構 

實時離線一體化系統之數據流

實時離線一體化系統之數據流

實時離線一體化接入

大數據的來源主要分為三個:

  • 第一個來源是內部系統的Mysql數據庫(業務分析)
  • 第二個來源是應用App(用戶軌跡)
  • 第三個來源是外部系統網絡采集(同行數據,用於競品分析,行業分析)
  • 日志文件(業務訪問、打印在日志文件上的業務數據)

有些實時數據,只需簡單的清洗就可以產出,比如:異構數據同步、上面講到的即時查詢系統等這類數據是不需要進入ODS層的。為了能跟即時查詢系統的接入統一化。所有來源數據統一由集成服務實時接入ODS層(hdfs)或APP層(Kudu)。

數據倉庫分層規范化

數據分層大家都流行以四層划分(關於數倉分層,不了解的同學需要自己去找文章補腦),這里也不例外,只是我們每層的存儲和訪問需要解決整合問題,原因跟我們用的技術架構有關系。接下來我們講下每種數據流進來以后和經過層層分析后怎么存儲。先上個直觀圖:

對於要求實時的數據,進入到kafka后,經過ETL直接輸出應用數據到Kudu或Mysql,提供給應用使用。相當於在先前的即時查詢系統中加入了ETL功能,不再是之前簡單的kafka Connector了。需要做離線分析、定制查詢或實時性要求不高的數據分析,通過數據集成通道后通過Hive進入到ODS。然后再由已開發好的程序經過預計算出的結果往數據上層上放(DW和APP層),我們的原則是:越往上層的數據,越往實時倉Kudu上放。對於離線計算,可以固化的查詢,如果隨着數據量和計算復雜度的增長,即使我們用了上面的即時查詢系統,在響應時間上也不能得到保證(就算可以增加計算節點,如果查詢樹無法再拆分的情況下),所以我們選擇預計算方案

預計算方案(Kylin+Kudu)

大家都知道,企業中的查詢一般分為即席查詢和定制查詢兩種。對於即時查詢需求我們用presto和Impala做為引擎(為什么會用到兩個?這個問題跟我們的需求演化和公司系統架構有關系,presto從支持標准的sql上看,可以減輕業務側對現有的功能sql改造,簡單來說就是為了兼容現狀。部署的環境依賴也比較簡單,方便部署;而Impala主要是用在大數據需求新功能上,又方便檢索冷熱數據的聚合)。而定制查詢,它的場景多數是對用戶的操作或是對下線的業務數據做出實時分析,如果用Hive或SparkSQL作為查詢引擎,估計要花上數分鍾甚至數十分鍾的時間才能響應,顯然是不能滿足需求的。在很長一段時間里,企業只能對數據倉庫中的數據進行提前計算,再將算好后的結果存儲在APP層或DW層上,再提供給用戶進行查詢。但是上面我們也說了,當業務復雜度和數據量逐漸升高后,使用這套方案的開發成本和維護成本都顯著上升。因此,對於已經固化下來的查詢進行亞秒級返回的解決辦法。我們使用了Apache Kylin,我們只需要提前定義好查詢維度,Kylin就能幫助我們進行計算,並將結果存儲到結果表中。這樣不僅很好地解決了海量數據快速查詢的問題,也減少了手動開發和維護提前計算程序的成本。

但是Kylin默認將計算結果放入到Hbase中,從上圖看,沒有看到Hbase,而是Kudu。因為我們自己實現了Kylin與Kudu的整合。

Kylin使用Kudu存儲引擎

存儲引擎,我們引入自研的storage-kudu模塊替代默認的storage-hbase。Kylin依賴的三大模塊:數據源、構建引擎、存儲引擎。數據源我們還是使用Hive, 至於在kudu中的數據,因為上面已經解決了Hive支持kudu的方案,所以Kylin通過Hive也可以加載到Kudu中的數據。構建引擎我們使用了Kylin支持的spark計算引擎。而spark同時也是支持與Kudu整合的。從源碼上看,Kylin架構要求擴展存儲引擎需要實現IStorage接口,這接口有兩個函數一個是指定構建Cube引擎接口adaptToBuildEngine和能夠查詢Cube的createQuery接口,剩下的數據在Kudu的存取細節基本都直接使用spark支持Kudu的api。

實時離線開發統一訪問數據入口

部分分析數據,比如用戶的滿意度調查、水電費使用統計等,在即時查詢系統中已經存在,不就需要再同步一份數據到hdfs當中。為了減少存儲空間成本,避免數據多份存儲,那么就至少需要解決在Kudu中的數據能讓hive能訪問到。但是我們使用的hive版本中,hive並不支持Kudu表的操作,預告最新的hive4.0版本中,也未開發完成。

需要解決的問題:

  • 即時系統中存在Kudu表數據,需要通過Hive能訪問,這點仿照Impala,創建外部表 ,將kudu的表映射到Hive上
  • Hive能像Impala一樣,能創建表、查詢、更新、刪除操作
  • Kylin能使用Kudu表
  • 保證數據結構和元數據信息的一致性

Hive、Kudu元數據整合:

從Hive官網公布信息和源碼分析來看,核心類KuduStorageHandler、KuduSerDe、KuduInputFormat、KuduOutputFormat已經實現一分部功能,KuduMetaHook還沒有,保證 meta 的一致性需要必須實現HiveMetaHook。從源碼上看KuduStorageHandler已經繼承了DefaultStorageHandler和實現了HiveStoragePredicateHandler,再實現與HMS的交互就可以對 Kudu meta 的相關操作和可以發現Kudu的表並進行操作了(與《CDH6.3.2升級Hive到4.0.0》文章中使用同個版本)。

其中即時系統實時同步到Kudu的表數據,也需要創建Hive外部表,把kudu表映射到Hive來,也是在KuduStorageHandler中實現,包括數據的查詢、修改、刪除。通過在數據集成服務的《同步管理》模塊中,每次創建數據同步任務時,都會去連接Hive並創建Kudu的外部映射表。

 如此一來,不管上層使用的SparkSQL、Kylin還是HQL訪問hdfs或kudu的表,對開發者或對數據使用者來說都是統一的入口。

透明的數據分層存儲

整個系統架構里,有兩個地方可以存儲數據,一個是Kudu,另一個是HDFS。而Kudu存儲的數據大多是即時查詢系統數據和經過業務處理分析后的APP層、DWS層數據。實時數據當不在有變更時,就可以刷到HDFS上;APP層等這些數據隨着時間的推移,也是逐漸變成冷數據。那么等變冷的數據,就需要遷移到HDFS上。而數據遷移后將面臨查詢數據不完整性、如何實現數據的平滑遷移,又不影響查詢其完整性呢?

一部分數據在Kudu,一部分數據在HDFS,解決查詢的完整性,主要通過View實現。

每天把Kudu里的冷數據遷移到HDFS上,如何識別哪些是冷數據,由業務提供,根據業務情況,業務側自行為每個表提供一個冷熱數據的時間周期。當超過時間周期的數據將被程序遷移進HDFS。每次遷移完成后都需要創建或修改View。不然數據就查不到了。View需要定義好Kudu和HDFS上的查詢時間范圍。如:

create view  tb_uhome_acct_item_view as
SELECT COMMUNITY_ID,STAGE_ID,NAME,UNIT,HOUSE_NAME,BILL_AREA,PAY_USERID,BILLING_CYCLE,FEE_ITEM_TYPE_ID,RULE_NAME,RES_INST_NAME,HOUSE_STATUS_TYPE,HOUSE_STATUS,REAL_CYCLE,CONCAT( BILL_DATE_START, BILL_DATE_END ),LEASE_POSITION,OBJ_CODE
FROM tb_uhome_acct_item
WHERE create_date >= "2017-01-01"
UNION ALL
SELECT COMMUNITY_ID,STAGE_ID,NAME,UNIT,HOUSE_NAME,BILL_AREA,PAY_USERID,BILLING_CYCLE,FEE_ITEM_TYPE_ID,RULE_NAME,RES_INST_NAME,HOUSE_STATUS_TYPE,HOUSE_STATUS,REAL_CYCLE,CONCAT( BILL_DATE_START, BILL_DATE_END ),LEASE_POSITION,OBJ_CODE
FROM tb_uhome_acct_item_hdfs
WHERE create_date < "2017-01-01"

每一邊的數據都有表字段create_date做了范圍限制,然后等遷移成功后再修改View,這樣無何什么時候查數據,都不會出現部分數據檢索不到。

再補充一點,先前的即時查詢系統中,通過連接器同步過來的Kudu表數據,在同步的時候,在數據集成系統中,要創建Impala的外部表,將kudu的表映射到impala上,這樣Impala才能查到。 

展望未來

1、基於整合后的架構,未來我們可以提供更多的能力,讓更多的存儲引擎支持Hive Metastore,使HMS的元數據服務支持豐富化。

2、數據延遲監控,對kafka每個topic消息的延遲、lag監控,做到整個數據鏈路的延遲監控。

3、Hive支持Kudu繼續優化。通過Hive查詢部分數據在Kudu和部分在hdfs中的數據view實現還未完善,還有部分ddl需要完善。

4、Kylin繼續二開,根據數據集成服務中采集到用戶的維度和度量需求,使用Spark 動態構建Cube。

 

【版權聲明】

本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須在文章頁面給出原文鏈接,否則保留追究法律責任的權利。如您有任何商業合作或者授權方面的協商,請給我留言:siqing0822@163.com


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM