PHP+Hadoop+Hive+Thrift+Mysql實現數據統計分析


 

安裝

Hadoop安裝: http://www.powerxing.com/install-hadoop/ 
Hadoop集群配置: http://www.powerxing.com/install-hadoop-cluster/ 
Hive安裝: https://chu888chu888.gitbooks.io/hadoopstudy/content/Content/8/chapter0807.html

安裝具體教程請看上面鏈接,本地測試只用了單機配置,集群配置(后面的flume用到)看上面的詳細鏈接, 因為之前沒有接觸過java的相關,這里說下遇到的幾個問題.

HadoopHive的1.x和2.x版本要對應 
JAVA/Hadoop相關的環境變量配置,習慣了PHP的童鞋在這塊可能容易忽略 
啟動Hadoop提示Starting namenodes on [],namenodes為空,是因為沒有指定ip或端口,修改hadoop/core-site.xml如下

<configuration>
<property>
<name>dfs.namenode.rpc-address</name>
<value>127.0.0.0:9001</value>
</property>
</configuration>

安裝完成后輸入jps可以查看到NameNode,DataNode等

 

上報和接收

swoole和workerman都有簡單版本實現的數據監控,包括上報,接收,存儲,展示, 主要使用udp上傳(swoole版本已升級為tcp長連接),redis緩存,文件持久化,highcharts展示,可以作為思路參考 
swoole-statistics : https://github.com/smalleyes/statistics 
workerman-statistics : https://github.com/walkor/workerman-statistics 
本例使用swoole提供的接口實現UDP傳輸,因為上報數據是一定程度可以容錯,所以選擇UDP效率優先 
接收數據臨時存儲在Redis中,每隔幾分鍾刷到文件中存儲,文件名按模塊和時間分割存儲,字段|分割(后面與hive對應)

 

數據轉存

創建Hive數據表

根據文件數據格式編寫Hive數據表, TERMINATED BY字段與前面文件字段分隔符想對應 
對表按日期分區PARTITIONED BY

CREATE TABLE login (
    time int comment '登陸時間', 
    type string comment '類型,email,username,qq等', 
    device string comment '登陸設備,pc,android,ios', 
    ip string comment '登陸ip', 
    uid int comment '用戶id', 
    is_old int comment '是否老用戶'
) 
PARTITIONED BY (
    `date` string COMMENT 'date'
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|';

定時(Crontab)創建hadoop分區

hive -e "use web_stat; alter table login add if not exists partition (date='${web_stat_day}')"

轉存

Flume監聽文件目錄,將數據傳輸到能訪問Hdfs集群的服務器上,這里傳輸到了224機器的7000端口

#agent3表示代理名稱 login
agent3.sources=source1
agent3.sinks=sink1
agent3.channels=channel1
配置source1

 

配置source1

agent3.sources.source1.type=spooldir
agent3.sources.source1.spoolDir=/data/releases/stat/Data/10001/
agent3.sources.source1.channels=channel1
agent3.sources.source1.fileHeader = false

 

配置sink1

agent3.sinks.sink1.type=avro
agent3.sinks.sink1.hostname=192.168.23.224
agent3.sinks.sink1.port=7000
agent3.sinks.sink1.channel=channel1

 

配置channel1

agent3.channels.channel1.type=file
agent3.channels.channel1.checkpointDir=/data/flume_data/checkpoint_login
agent3.channels.channel1.dataDirs=/data/flume_data/channelData_login

 

啟動flume

加到supervisor守護進程

/home/flume/bin/flume-ng agent -n agent3 -c /home/flume/conf/ -f /home/flume/conf/statistics/login_flume.conf -Dflume.root.logger=info,console

 

224機器監聽7000端口,將數據寫到hdfs集群

agent1表示代理名稱

agent4.sources=source1
agent4.sinks=sink1
agent4.channels=channel1

 

配置source1

agent4.sources.source1.type=avro
agent4.sources.source1.bind=192.168.23.224
agent4.sources.source1.port=7000
agent4.sources.source1.channels=channel1

 

配置sink1

agent4.sinks.sink1.type=hdfs
agent4.sinks.sink1.hdfs.path=hdfs://hdfs/umr-ubvzlf/uhiveubnhq5/warehouse/web_stat.db/login/date\=%Y-%m-%d
agent4.sinks.sink1.hdfs.fileType=DataStream
agent4.sinks.sink1.hdfs.filePrefix=buffer_census_
agent4.sinks.sink1.hdfs.writeFormat=TEXT
agent4.sinks.sink1.hdfs.rollInterval=30
agent4.sinks.sink1.hdfs.inUsePrefix = .
agent4.sinks.sink1.hdfs.rollSize=536870912
agent4.sinks.sink1.hdfs.useLocalTimeStamp = true
agent4.sinks.sink1.hdfs.rollCount=0
agent4.sinks.sink1.channel=channel1

 

配置channel1

agent4.channels.channel1.type=file
agent4.channels.channel1.checkpointDir=/data/flume_data/login_checkpoint
agent4.channels.channel1.dataDirs=/data/flume_data/login_channelData

 

啟動

加到supervisor守護進程

/usr/local/flume/bin/flume-ng agent -n agent4 -c /usr/local/flume/conf/ -f /usr/local/flume/conf/statistics/login_flume.conf -Dflume.root.logger=info,console

清洗數據

通過Thrift的PHP擴展包調用Hive,編寫類SQL的HQL轉換為MapReduce任務讀取計算HDFS里的數據, 將結果存儲在MySQL中 
php-thrift-client下載地址: https://github.com/garamon/php-thrift-hive-client

 

define('THRIFT_HIVE' , ROOT .'/libs/thrift');
$GLOBALS['THRIFT_ROOT'] = THRIFT_HIVE . '/lib';
require_once $GLOBALS['THRIFT_ROOT'] .         '/packages/hive_service/ThriftHive.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/transport/TSocket.php';
require_once $GLOBALS['THRIFT_ROOT'] . '/protocol/TBinaryProtocol.php';
require_once THRIFT_HIVE . '/ThriftHiveClientEx.php';
$transport = new \TSocket('127.0.0.1', 10000);
$transport->setSendTimeout(600 * 1000);
$transport->setRecvTimeout(600 * 1000);
$this->client = new \ThriftHiveClientEx(new \TBinaryProtocol($transport));
$this->client->open();
$this->client->execute("show databases");
$result = $this->client->fetchAll();
var_dump($result);
$this->client->close();

HQL語法說明: https://chu888chu888.gitbooks.io/hadoopstudy/content/Content/8/chapter0803.html

注意的是,盡量要將HQL語句能轉換為MapReduce任務,不然沒利用上Hadoop的大數據計算分析,就沒意義 
例如下面的邏輯,取出來在內存里分析,這樣的邏輯盡量避免,因為sql在hive里執行就是普普通通的數據,沒有轉換為mapreduce

select * from login limit 5;
// php處理
$count = 0;
    foreach ($queryResult as $row) {
      $count ++;
}

一次性轉換為MapReduce,利用Hadoop的計算能力

select type,count(*) from login group by type;  // 這樣就用到了

建表使用了PARTITIONED BY分區斷言后,查詢就可以利用分區剪枝(input pruning)的特性,但是斷言字段必須離where關鍵字最近才能被利用上 
// 如前面的login表使用到了date分區斷言,這里就得把date條件放在第一位

select count(*) from login where date='2016-08-23' and is_old=1;

Hive中不支持等值連表,如下

select * from dual a,dual b where a.key = b.key;

應寫為:

select * from dual a join dual b on a.key = b.key;

Hive中不支持insert,而且邏輯上也不允許,應為hadoop是我們用來做大數據分析,而不應該作為業務細分數據

 

數據報表展示

這一步就簡單了,讀取MySQL數據,使用highcharts等工具做各種展示,也可以用crontab定時執行php腳本發送日報,周報等等

 

后續更新

最近看一些資料和別人溝通發現,清洗數據這一步完全不用php,可以專注於HQL實現清洗邏輯,將結果保存在hadoop中,再用Sqoop將hadoop數據和MySQL數據同步。即簡化了流程,免去mysql手工插入,又做到了數據更實時,為二次清洗邏輯的連表HQL做了鋪墊


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM