問題背景
現在有兩份數據,file1是校園新聞版塊,每一條新聞點擊記錄;file2是校園新聞版塊使用活躍度高的學生記錄。用mr統計出某一天的點擊記錄里,使用ios/android手機的活躍學生的總的點擊次數
原始數據格式如下:
file 1,校園新聞點擊記錄,記錄了每一次學生點擊校園新聞的行為
格式:學號\t姓名\t手機端\t新聞id\新聞關鍵詞
20170001 xiaoming android 331 學費
20170002 xiaohong ios 332 食堂
20170003 xiaohua android 333 考研
20170001 xiaoming android 222 評優
20170001 xiaoming android 225 學費
20170003 xiaohua android 111 期末考試
20170002 xiaohong ios 117 空調安裝
...
file2,校園新聞活躍學生記錄,記錄了對校園新聞版塊點擊活躍度高的學生信息
格式:格式:學號\t姓名\t年齡\t手機端
20170001 xiaoming 20 android
20170002 xiaohong 19 ios
20170001 xiaoqiang 20 android
...
問題分析
1、key的選擇:選取學號+手機端共同作為key。學號是用於對是否活躍用戶的判斷,手機端是用於統計時的分類。注意,這里如果僅僅將手機端作為key有一個壞處,就是key值將只有兩種,這樣不利於將海量的數據分為多個小部分處理,因此選擇學號+手機端共同作為key。
2、關鍵點:在對每一行數據進行處理時,利用兩份數據,做好是否活躍用戶的標記;在對同一個key進行處理時,根據前一步處理的結果,判斷是否進行累加。
代碼實現
1、mapper.py
解析原始數據,將其處理為一系列的鍵值對(key-value pair),這里選取"學號_手機端"作為key。
import sys
def main():
"""
mapper
"""
for line in sys.stdin:
data = line.strip().split("\t")
if len(data) == 5:
student_id = data[0]
os_name = data[2]
key = student_id + '_' + os_name
print"\t".join([key,"file1"])
elif len(data) == 4:
student_id = data[0]
os_name = data[-1]
key = student_id + '_' + os_name
print"\t".join([key,"file2"])
if __name__ == "__main__":
main()
2、reducer.py
由MapReduce框架的機制,同一個key的所有數據都會被傳給同一個reducer,同一個reducer可以接受多個鍵值對。
reducer將相同鍵的一組值,進行處理產生一組規模更小的值(如累加得到一個值)。
import sys
import json
dic_result = {'android':0, 'ios':0}
pre_key = ""
def post_deal(pre_key):
"""
相同key的數據的處理
"""
global dic
os_name = pre_key.strip.split("\t")[1]
if 'click' in dic and dic['click'] == 'yes' and 'active' in dict and dic['active'] == 'yes':
dic_result[os_name] += 1
def deal(data):
"""
對每一行數據的處理
"""
if data[1] == "file1":
dic['click'] = 'yes'
if data[1] == "file2":
dic['active'] = 'yes'
def pre_deal():
"""
預處理,如將用於存儲同一個key的一組value中間結果的字典置零
"""
global dic
dic = dict()
def main():
"""
reducer
"""
for line in sys.stdin:
data = line.strip().split("\t")
key = data[0]
#當同一個key的數據全部deal完,到下一個key時
if key != pre_key:
if pre_key != "":
#處理同一個key所有數據
post_deal(pre_key)
#處理完成后,預處理准備下一輪處理
pre_deal()
#將pre_key更新為下一個要處理的key
pre_key = key
#將同一個key的一組值按需要進行處理/整合
deal(data)
#處理最后一個key的數據
if pre_key != "":
post_deal(pre_key)
#打印結果
print json.dumps(dic_result)
if __name__ == "__main__":
main()
3、線下測試 cat file|map|sort|reduce
STEP1:測試map
cat file1_test file2_test | python mapper.py > map_out
測試中,map_out中的數據將如下所示:
20170001_andorid file1
20170002_ios file1
20170003_andorid file1
20170001_andorid file1
...
20170001_andorid file2
20170002_ios file2
...
STEP2:對map_out按照key(key默認放在第一列)排序,使相同key的數據在一起,然后測試reducer
cat map_out |sort -k1,1|python reducer.py > red_out
reducer的輸出是json,形如:{'ios':1123, 'android':3333}
4、hadoop線上運行 run.sh
#!/bin/bash
WORKROOT="..."
HADOOP="hadoop客戶端路徑"
INPUT1="file1文件路徑"
INPUT2="file2文件路徑"
OUTPUT_DIR="輸出的HDFS路徑"
PYTHON="python路徑"
#設置一系列參數,並啟動MR任務
$HADOOP streaming \
-D mapred.map.tasks=200 \
-D mapred.job.map.capacity=400 \
-D mapred.red.tasks=100 \
-D mapred.job.red.capacity=400 \
-D mapred.job.priority=HIGH \
-D stream.num.map.output.key.fields=1 \
...
-partitionor ... \
-cacheArchive ... \
-input $INPUT1 \
-input $INPUT2 \
-output $OUTPUT_DIR \
-mapper "$PYTHON mapper.py" \
-reducer "$PYTHON reducer.py" \
-file $WORKROOT/mapper.py \
-flle $WORTROOT/reducer.py
#成功運行完成,創建一個空文件,加上成功的標簽
if [ ${?} -eq 0 ]; then
$HADOOP fs -touchz $OUTPUT_DIR succuss.tag
部分參數的解釋:
mapred.map.tasks:map數目
mapred.reduce.tasks:reduce數目
mapred.job.map.capacity:map並發數目
mapred.job.reduce.capacity:educe並發數目
stream.num.map.output.key.fields:設置map程序分隔符的位置,該位置之前的部分作為key,之后的部分作為value