概要:
Oracle Stream Analytics(OSA)是企業級大數據流實時分析計算平台。它可以通過使用復雜的關聯模式,擴充和機器學習算法來自動處理和分析大規模實時信息。流式傳輸的大數據可以源自IoT傳感器,Web管道,日志文件,銷售點設備,ATM機,社交媒體,事務數據庫,NoSQL數據庫或任何其他數據源。
OSA為業務用戶提供了動態創建和實施即時洞察解決方案。它允許用戶通過實時圖表,地圖,可視化視圖來實時瀏覽實時數據,並以圖形方式構建流傳輸管道,而無需進行任何手工編碼。 OSA使用與Oracle連續查詢引擎集成的Apache Kafka和Apache Spark Streaming在可伸縮且高度可用的集群大數據環境中執行。
OSA廣泛應用於以下場景,以解決現代企業中的關鍵實時用例:
- 金融服務:實時反欺詐,實時風控,實時營銷。
- 交通運輸:公共交通,車輛調度,集裝箱追蹤。
- 運營商:精准營銷,網絡管理,“萬物互聯”。
- 零售:個性化推薦,動態定價,貨架管理。
- 制造業:智能庫存,品質管控,預測性維護。
- 公共安全:平安城市。
本文我們將介紹如何使用Oracle Stream Analytics實現實時數據采集,實時數據處理,實時數據可視化以及實時數據同步到大數據平台的整個過程。
Oracle DB→Oracle GoldenGate→Kafka→OSA REST→HBase
具體場景及工具如下:
測試環境說明如下:
整體步驟大致如下:
- 源端數據同步及KAFKA准備
- OSA安裝及HBASE配置
- OSA配置
詳細步驟如下:
源端數據同步及KAFKA准備
1.在源端數據庫Enable GoldenGate並創建測試表
2.配置源端Golden Gate實時數據捕獲
3.部署並配置GoldenGate for BigData
- OGG for BigData無需安裝,只需解壓即可使用
- 編輯配置文件:kafka.props,custom_kafka_producer.properties
4. 拉起extract, pump, replicat進程
Oracle stream analytics安裝及Hbase配置
5. OSA安裝
OSA下載鏈接如下:
Oracle Stream Analytics Downloads
目前最新版本為19.1,按照官方安裝文檔進行安裝。
6.HBase的安裝及測試表創建
安裝並創建同步測試表:
Oracle Stream Analytics配置
下面將詳細介紹OSA配置過程:
7.登錄OSA
osaadmin為OSA預制用戶
8.進入OSA界面,選擇【Catalog】選項
9.選擇創建新連接選項,點擊【Create New Item】
10.創捷連接選項,輸入連接名稱,連接類型選擇【kafka】
並在kafka bootstrap里輸入broker地址,測試連接成功
11.創建新的流數據,點擊創建【Create Stream】
類型選擇【kafka】選擇之前創建的連接,輸入Topic名稱,選擇數據格式【JSON】。
topic名稱為OGG for bigdata配置文件里指定的Topic
12.預先定義好JSON數據格式的文件如下:
下面定義可以從kafka消費信息中獲取,使用bin/kafka-console-consumer.sh
13.OSA支持的數據格式有【CSV】,【JSON】,【AVRO】
這次使用的為上述預定義好的JSON格式
14.創建Custom Jar,選擇創建【Custom Jar】
輸入名稱,選擇類型為【Custom Jar】,在Jar URL上指定預先創建好的java程序
需要使用Custom Jar獲取流數據並且進行Base64加密處理,Hbase上使用Rest進行操作時,僅支持Base64加密過的數據的增刪改查。
注意: Custom Jar所使用的Java程序可以只做針對Event的Base64加密處理
REST可以在OSA上添加REST類型的Stage執行寫入到Hbase
OSA目前版本尚未支持流數據base64加密輸出
本文使用Java示例程序執行Base64加密及REST操作寫入到Hbase
15.創建Pipeline,選擇創建【Pipeline】
指定名稱,選擇上述已創建好的Stream,並且保存。
16.添加已創建的Custom Stage到Pipeline上
右擊Pipeline上的Stream標識,選擇【Custom Stage from Custom Jar】
17. 指定【Custom Stage】名稱,點擊保存
18.在Custom Stage的配置選項,我們選擇之前已經上傳的Custom Jar對象
19.在【Input Mapping】選項,需要把Stream內容和Custom Stage進行關聯
並且點擊【Publish】發布
注意:如果不選擇Publish則無法把OSA輸出進行寫入操作到Hbase
JOB_ID=after_JOB_ID、JOB_TITILE=after_JOB_TITLE、MIN_SALARY= after_ MIN_SALARY 、MAX_SALARY= after_ MAX_SALARY
20.【Publish】的Pipeline將被鎖定,如需更改配置點擊【Unpublish】進行更改
Publish的Pipeline如下:
在下方可以看到實時流數據的監聽
21.至此為止,OSA端的配置已完成。最后一步為測試與演示。
測試與演示
1.源端數據庫插入一條數據並提交
2.OGG源端捕獲確認(Extract和Pump)
3.OGG目標端Kafka投遞確認
4.Kafka端消費信息確認
5.Kafka端顯示以下紀錄已經消費,OP_TYPE代表操作類型,
OP_TYPE=I ,I表示這是一條INSERT插入操作。
6.查看OSA Stream監聽狀態
點擊【kafka-stream】圖標,可以看到該條記錄已經被Kafka Stream正確的監聽
7.查看OSA輸出結果
點擊【restapi】圖標,可以看到OSA輸出結果
8.使用HBase寫入確認
從HBase的HR_JOBS表中,可以看到源端插入的數據已經寫入到該表中
寫在最后
Oracle Stream Analytics 還具有以下產品特性:
1.提供友好的圖形交互頁面,集成可視化並可以通過Java語言進行擴展
2. 提供豐富的內建流模式庫並可通過Java進行擴展
3. 與位置和地理空間功能集成
4. 預測分析與機器學習集成
5. 可對接Druid Superset
Oracle Stream Analytics支持Continuous Query Language (CQL) ,構建於分布式分內存計算網格框架之上,使得查詢處理的性能可得到線性增長。學習成本低,使得業務人員和開發人員可以更大限度的關注業務而非應用技術架構。是您快速構建企業級大數據流實時分析計算平台不二之選。