python with hdfs
hdfs 可以在 linux 本地操作
bin/hdfs dfs -ls /foo
但是這種只能在 命令行 操作。
通常我們需要在程序中實現遠程操作,python 是可以的。需要用到一個模塊 snakebite,目前僅支持 python2
snakebite 有兩種方式遠程操作 hdfs,一種是通過命令行,這里不做介紹,另一種是通過 python 腳本實現。
僅需兩步:1. 連接 hdfs;2. 執行 各種命令,只是要注意,每條操作都返回一個 Iterator,所以需要寫在 for 循環中才能生效。
python 示例
from snakebite.client import Client client = Client('192.168.10.10', 9000) for x in client.ls(['/']): print x # client.mkdir(['/foo2'], create_parent=True) # 沒有 for 循環是不行的 for i in client.mkdir(['/foo/f1', '/input4'], create_parent=True): # 創建多個目錄,而且 create_parent=True 確保每個目錄可以是多層的 print(i) ### 這個 for 循環相當於 挨個 執行創建目錄的操作,如果只循環了一次,就只建一個目錄 client.delete(['/foo', '/input4'], recurse=True).next() # 刪除文件或目錄,而且 recurse=True 確保刪除多層目錄 ### 這里只 next 了一下,只刪除了 一個目錄,input4 並沒有被刪除 client.copyToLocal(['/usr/input/yanshw/README.txt'], 'e:').next() # 下載文件,下載到 e 盤當前目錄,而不是搭建hadoop的服務器【文件是跑在 win 上的】 print client.text(['/usr/input/yanshw/README.txt']).next() # 顯示文件,打印出了文件
9000 是 core-site.xml 中 namenode 指定的端口號
這里涉及到一個權限問題:我的 hadoop 集群是在 linux root 搭建的,我的腳本是在 windows 上跑的,windows 主機名 叫 HP,在 操作目錄時,會提示 HP 沒有權限
當時我為了 測試 以上 API 是否有效,我把 hdfs 根目錄的權限改成了 777
bin/hdfs dfs -chmod -R 777 /
然后我發現可以新增和刪除目錄了,新增的目錄 owner 就是 HP

所以,大家在 搭建 集群時就可以考慮后續遠程操作的問題了。
python with mapreduce
用 python 寫 mapreduce 完成 詞頻統計
數據文件 test.txt
python|thread|process python|xlrd|pyinotiy python|print|c++ c++|java|php node.js|javascript|go
上傳至 hdfs
bin/hadoop fs -put /usr/lib/hadoop-2.6.5/tmp/test.txt /usr/yanshw
數據必須上傳至 hdfs 才能使用
1)mapper 文件
import sys for line in sys.stdin: words = line.strip().split('|') for word in words: print word
從 stdin 逐行讀入,拆分,print,這里的 print 相當於 stdout;
mapper 的 stdout 作為 reducer 的 stdin;
hadoop 內部 用 hadoop steaming 實現標准輸入輸出,使得數據在 map 和 reduce 之間流動
測試 mapper
執行本地測試,確保程序正確
cat tmp/test.txt | python mapper.py
2)reducer 文件
在 reduce 之前 是需要 sort 的,hadoop 內默認是 帶 sort 的
sort 其實是個 分區的過程;
hadoop 自動把 不同的 key 發送給不同的 reducer;
雖然 reduce 之前會 分區,sort,發送給不同的 reducer,但 編寫 reducer 時,不能認為是 一個分區 的數據,還得按照多個分區進行編寫,只是 數據是經過 sort 的
import sys from operator import itemgetter from itertools import groupby def read_mapper_output(files, separator='\t'): for line in files: yield line.strip().split(separator, 1) def main(): data = read_mapper_output(sys.stdin) for key, data in groupby(data, itemgetter(0)): count = 0 for value in data: count += 1 print "{word}\t{count}".format(word=key, count=count) if __name__ == '__main__': main()
逐行讀取 mapper 的 輸出,就是一個個的單詞;
reducer 逐個累加詞的 個數,如果下個詞與當前詞不同,就把當前詞的個數 print,然后統計下個詞的個數,依次直至完畢;
測試 reducer
cat tmp/test.txt | python mapper.py |sort|python reducer.py
把 mapper 的輸出 進行排序后再 送給 reducer;這也是 hadoop 的默認方式
3)運行 mapreduce
需要用 hadoop-streaming 方式來執行 python
命令行方式
hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar
-files /usr/lib/hadoop-2.6.5/mapper.py,/usr/lib/hadoop-2.6.5/reducer.py
-mapper 'python mapper.py' -reducer 'python reducer.py'
-input /usr/input/yanshw
-output /usr/output/yanshuw2
解釋如下
hadoop:bin 下的 hadoop,這里設置了環境變量,所以直接 hadoop 就可以了
jar share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar:運行 java streaming 程序
-files:輸入 mapper 和 reducer 的絕對路徑,hadoop 會把 這倆文件上傳到 datanode,用於執行
// 也可以寫 兩個 -file mapper -file reducer
// 注意兩個文件之間不能有空格
-mapper:后面跟命令
-reducer:后面跟命令
// 這兩個命令是執行 python 文件,如果 python 文件中帶 shebang,直接寫 mapper.py 即可;如果沒有,需要寫 'python mapper.py',引號不能省略,無需帶路徑
-input:hdfs 的輸入路徑
-output:hdfs 的輸出路徑,這個路徑不能事先存在
完整命令
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.2.jar \ -input /ngrams \ -output /output-streaming \ -mapper mapper.py \ -combiner reducer.py \ -reducer reducer.py \ -jobconf stream.num.map.output.key.fields=3 \ -jobconf stream.num.reduce.output.key.fields=3 \ -jobconf mapred.reduce.tasks=10 \ -file mapper.py \ -file reducer.py
shell 腳本方式
編寫 run.sh 來執行 mapreduce
#!/bin/bash hadoop fs -rm -r -f /usr/output/yanshw102 hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar \ -file /usr/lib/hadoop-2.6.5/mapper2.py \ -file /usr/lib/hadoop-2.6.5/reducer2.py \ -mapper "python mapper2.py" \ -reducer "python reducer2.py" \ -input /usr/yanshw/test.txt \ -output /usr/output/yanshw102 \
第 1 行 是刪除 已經存在的 路徑,這個路徑用於存放 output,不能事先存在
然后直接執行
run2.sh
執行結束后,下載 output
bin/hdfs dfs -getmerge /usr/output/yanshw102 ttt
直接打印
[root@hadoop10 hadoop-2.6.5]# cat ttt c++ 2 go 1 java 1 javascript 1 node.js 1 php 1 print 1 process 1 pyinotiy 1 python 3 thread 1 xlrd 1
一張圖解釋 上文 的 mapreduce

mapreduce 實例
背景描述:兩個文件,file1.txt 記錄學生點擊新聞的信息,file2.txt 記錄活躍學生的信息;
目標:現在有某一天的這倆個文件,統計這一天 使用 ios/android 的 活躍學生 的 總點擊次數
file1.txt
20170001 xiaoming android 331 學費 20170002 xiaohong ios 332 食堂 20170003 xiaohua android 333 考研 20170001 xiaoming android 222 評優 20170001 xiaoming android 225 學費 20170003 xiaohua android 111 期末考試 20170002 xiaohong ios 117 空調安裝
file2.txt
20170001 xiaoming 20 android 20170002 xiaohong 19 ios 20170001 xiaoqiang 20 android
難點是同時利用兩個文件,如何設計 key,能夠很好的進行分區
方案1:把 ios/andriod 設為 key,統計每種手機各個學生的訪問量,取出活躍學生的;可以實現,但是這個 分區 之分 2 個區,效率很低
方案2:把 學號 作為 key,統計每個學生的 不同手機的點擊次數... 貌似不行
方案3:把 ios/andriod 和 學號 作為 key,然后統計每個 key 中 學號是 活躍學生的 點擊次數;其實 直接從 需求看 也能確定這個 key
mapper
ios/andriod 和 學號 作為 key
import sys def main(): for line in sys.stdin: data = line.strip().split(" ") if len(data) == 5: # file1 student_id = data[0] os_name = data[2] key = student_id + '_' + os_name print "\t".join([key,"file1"]) elif len(data) == 4: # file2 student_id = data[0] os_name = data[-1] key = student_id + '_' + os_name print "\t".join([key,"file2"]) if __name__ == "__main__": main()
reducer
按 key 排序后 喂給 reducer,不同的 key 給到不同的 reducer
在寫 reducer 方法時,既定輸入是 排序后的 多個 key 的序列
import sys import json dic_result = {'android': 0, 'ios': 0} pre_key = "" def post_deal(pre_key): # 相同key的數據的處理 global dic # pre_key: student_id + '_' + os_name os_name = pre_key.strip().split("_")[1] if 'click' in dic and dic['click'] == 'yes' and 'active' in dic and dic['active'] == 'yes': # 點擊 的 活躍用戶 +1 dic_result[os_name] += 1 def deal(data): # 對每一行數據的處理 # data:20170001_android file1 global dic if data[1].strip() == "file1": dic['click'] = 'yes' # file1 代表點擊 if data[1].strip() == "file2": dic['active'] = 'yes' # file2 代表活躍 def pre_deal(): # 預處理,用於存儲同一個key的一組value global dic dic = dict() def main(): # sys.stdin = ['20170001_android file1', # '20170001_android file1', # '20170001_android file1', # '20170001_android file2', # '20170001_android file2', # '20170002_ios file1', # '20170002_ios file1', # '20170002_ios file2', # '20170003_android file1', # '20170003_android file1' # ] # sort 之后的 for line in sys.stdin: data = line.strip().split("\t") key = data[0] global pre_key if key != pre_key: # 如果換 key,先給 上一個 key 加1,然后 初始化新 key if pre_key != "": post_deal(pre_key) # 計數 # 初始化 for 下一個 key pre_deal() pre_key = key deal(data) # 來一個學生我先標記,然后計數 post_deal(pre_key) if pre_key != "": post_deal(pre_key) print json.dumps(dic_result) if __name__ == "__main__": main()
本地測試
[root@hadoop10 hadoop-2.6.5]# cat tmp/example1/file1.txt tmp/example1/file2.txt | python tmp/example1/mapper.py | sort| python tmp/example1/reducer.py {"android": 3, "ios": 2}
hadoop 上 運行 mapreduce
這里有些問題,解決方法 見 下面 進階和 最后一個鏈接
具體的解決思路我后面會專門寫一篇博客
進階配置
-jobconf stream.num.map.output.key.fields=2 這行代碼用於指定排序的字段,數字2指定map函數輸出數據的第幾列用於排序,就是例子中的sales字段。 【從 1 開始】
-jobconf num.key.fields.for.partition=1這行代碼指定partition字段,數字1指定map函數輸出數據的第一列用於分區。
參考資料:
https://www.jianshu.com/p/70bd81b2956f
《Hadoop with Python》 我的電子書
http://www.zhangdongshengtech.com/article-detials/236 十分清晰地解釋了 python 怎么玩 mr
https://www.iteye.com/blog/blackproof-2154523 內容比較廣,設計一些不太常用的操作
https://www.cnblogs.com/airnew/p/9574309.html 自定義排序 分區 等的實現
