一.數據處理架構
如圖,數據流轉主要有兩條線,實時計算流程和離線計算流程
- 實時計算:事件(hive表)----(使用dw-event-to-collector.sh發送事件)---->收數工具collector-------->flume分發-------->kafka緩存-------->flink計算-------->hbase-------->elasticsearch
- 離線計算:事件(hive表)----(主動讀hive表)---->hdfs-------->flink計算-------->hbase-------->elasticsearch
二.實時計算過程中工具使用
1.hive
- 進入hive數倉: hive
- 查看當前數據庫: show databases;
- 切換到cdp庫: use cdp;
- 創建表(SMH前端的導出事件配置中有自動生成的語句):
CREATE TABLE IF NOT EXISTS tablename(
uid string,
event_time bigint,
touch_point_id string
)partitioned by (process_date string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
- 查看建表命令: show create table c8_shopping;
- 查看當前表: show tables;
- 查看表中列名: desc tablename;
- 把事件插入對應hive表中: load data local inpath "/home/hadoop/shopping.txt" into table tablename partition(process_date="2019-07-22");
- 查詢表中數據: select * from tablename where process_date = '2019-04-26' limit 10;
- 查詢前執行該命令列名和數據一起顯示: set hive.cli.print.header=true;
- 刪除表中數據: truncate table tablename;
- 刪除表: drop table tablename;
2.kafka
查詢kafka消費情況,路徑:/home/hadoop/kafka_2.11-0.10.2.0/bin
命令: sh kafka-console-consumer.sh --topic event_c8 --from-beginning --bootstrap-server 172.00.0.000:9092 > event_c8
3.flink
- 重啟flink任務,路徑:/home/hadoop/cdp-etl-jobs/bin/job/realtime
- 關閉flink任務:yarn application -kill 任務id
- 啟動flink任務:sh indexing-trait.sh sh calculate-trait.sh
4.hbase
- 進入hbase:hbase shell
- 查看已存在的表:list
- 查詢某特性的值:scan 'trait_c8',{COLUMNS=>['d:t1425','d:uid']}
- 查詢某uid刪除狀態:scan 'trait_c8', {COLUMNS => 'd:delete_status',FILTER => "ValueFilter(=,'substring:true')"}
- 查詢某個uid: get 'trait_c8','fff144eb653e7348f051307cde7db169'
- 刪除表中數據:truncate "tablename"; flush "tablename";
- 刪除表:disable table; drop table;
- hbase全量同步到es:cdp/cdp-etl-jobs/bin/job/batch/trait-crowd-calc.sh -calcType sync 增量為:incr
5.elasticsearch
查詢工具可以使用kibana或者elasticsearch head插件,常用命令:
- 查詢特性:
GET /trait_c39/trait_c39/_search?size=1000
{
"query": {
"match_all": {}
},
"_source": ["t596"]
} -
查詢人群:
GET /trait_c39/trait_c39/_search?size=1000
{
"query": {
"match_all": {}
},
"post_filter": {"term": {
"crowds_code": "cr197"
}}
} -
查詢某個uid:
GET /trait_c33/trait_c33/uid-1
三.離線計算過程中工具使用
1.hdfs
前端頁面查詢地址:http://172.23.x.xxx:50070/explorer.html#/cdp/warehouse
查看目錄:hadoop fs -ls /cdp/warehouse/c8/offline/
查看文件:hadoop fs -cat /cdp/warehouse/c8/offline/shopping.txt
下載數據:hadoop fs -get /cdp/warehouse/c8/offline/
刪除文件:hadoop fs -rm -r /cdp/warehouse/c8/offline/shopping.txt
2.azkaban
- cdp-batch-process 離線批處理數據
dw-etl-process 數倉etl開始
dw-event-to-hdfs 主動讀取事件導入到hdfs
user-delete 用戶刪除
event-ub-to-hbase 發送事件到hbase,用於用戶檔案數據展示
common-jobs-config 生成作業配置信息,地址:/home/hadoop/cdp-etl-jobs/jobs-tmp/codes/
ALL_EVENT_TRAIT 所有事件到達時觸發的特性list
ALL_ACC_TRAIT 除timeline外,所有事件觸發類的累加特性list
ALL_REF_TRAIT 所有特性變化觸發的特性list
ALL_CROWD 渠道內全量人群list
CALC_EVENT_TRAIT 事件到達時觸發且需重新計算的特性list
CALC_TRAIT 特性變化時觸發且需重新計算的特性list
CALC_CROWD 當天需要計算的人群list,包括重新計算的人群,符合周期的人群
CLEAN_CROWD 需刪除的人群list
CLEAN_TRAIT 需刪除的特性list
EXPORT_TRAIT idmapping時需要導出的特性list
CANCELED_TRAIT 取消授權影響的特性list
event-trait-calc-full 全量重跑數據,traitupdate判斷歷史最新數據賦值給特性
event-trait-calc-incr 計算每日數倉增量的數據,traitupdate只發送當天的數據
event-trait-calc-init 對事件到達時觸發特性的重新計算,traitupdate只發送當天的數據
trait-crowd-calc 計算人群,對特性變化時觸發的特性重新計算,timeline類型特性,更新節點管理員/運營專員數據
id-mapping-clean 刪除作廢的mapping關系
id-mapping-init idMapping初始化和建立mapping關系
id-mapping-copy idMapping后的特性拷貝
report-crowd-count 更新人群數量到mysql,cdp_crowd表crowd_scale列
report-metric 定時計算所有人群長期跟蹤指標及全渠道的看板指標
cdp-batch-process - cdp-clean-jobs 清除臨時文件,人群導出過期文件
- crowd-export 人群導出
- init-channels 初始化渠道
- trait-import 特性導入