Python實現MapReduce
下面使用mapreduce模式實現了一個簡單的統計日志中單詞出現次數的程序:
from functools import reduce from multiprocessing import Pool from collections import Counter def read_inputs(file): for line in file: line = line.strip() yield line.split() def count(file_name): file = open(file_name) lines = read_inputs(file) c = Counter() for words in lines: for word in words: c[word] += 1 return c def do_task(): job_list = ['log.txt'] * 10000 pool = Pool(8) return reduce(lambda x, y: x+y, pool.map(count, job_list)) if __name__ == "__main__": rv = do_task()
一個python實現的mapreduce程序
map:
# !/usr/bin/env python import sys for line in sys.stdin: line = line.strip() words = line.split() for word in words: print ("%s\t%s") % (word, 1)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
reduce:
#!/usr/bin/env python import operator import sys current_word = None curent_count = 0 word = None for line in sys.stdin: line = line.strip() word, count = line.split('\t', 1) try: count = int(count) except ValueError: continue if current_word == word: curent_count += count else: if current_word: print '%s\t%s' % (current_word,curent_count) current_word=word curent_count=count if current_word==word: print '%s\t%s' % (current_word,curent_count)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
測試:
[root@node1 input]# echo "foo foo quux labs foo bar zoo zoo hying" | /home/hadoop/input/max_map.py | sort | /home/hadoop/input/max_reduce.py
- 1
執行:可將其寫入腳本文件
//注意\-file之間一定不能空格
hadoop jar /hadoop64/hadoop-2.7.1/share/hadoop/tools/lib/hadoop-*streaming*.jar -D stream.non.zero.exit.is.failure=false \-file /home/hadoop/input/max_map.py -mapper /home/hadoop/input/max_map.py \-file /home/hadoop/input/max_reduce.py -reducer /home/hadoop/input/max_reduce.py \-input /input/temperature/ -output /output/temperature
- 1
- 2
Hadoop(三):MapReduce程序(python)
使用python語言進行MapReduce程序開發主要分為兩個步驟,一是編寫程序,二是用Hadoop Streaming命令提交任務。
還是以詞頻統計為例
一、程序開發
1、Mapper
1 for line in sys.stdin: 2 filelds = line.strip.split(' ') 3 for item in fileds: 4 print item+' '+'1'
2、Reducer
1 import sys 2 3 result={} 4 for line in sys.stdin: 5 kvs = line.strip().split(' ') 6 k = kvs[0] 7 v = kvs[1] 8 if k in result: 9 result[k]+=1 10 else: 11 result[k] = 1 12 for k,v in result.items(): 13 print k+' '+v
....
寫完發現其實只用map就可以處理了...reduce只用cat就好了
3、運行腳本
1)Streaming簡介
Hadoop的MapReduce和HDFS均采用Java進行實現,默認提供Java編程接口,用戶通過這些編程接口,可以定義map、reduce函數等等。
但是如果希望使用其他語言編寫map、reduce函數怎么辦呢?
Hadoop提供了一個框架Streaming,Streaming的原理是用Java實現一個包裝用戶程序的MapReduce程序,該程序負責調用hadoop提供的Java編程接口。
2)運行命令
/.../bin/hadoop streaming
-input /..../input
-output /..../output
-mapper "mapper.py"
-reducer "reducer.py"
-file mapper.py
-file reducer.py
-D mapred.job.name ="wordcount"
-D mapred.reduce.tasks = "1"
3)Streaming常用命令
(1)-input <path>:指定作業輸入,path可以是文件或者目錄,可以使用*通配符,-input選項可以使用多次指定多個文件或目錄作為輸入。
(2)-output <path>:指定作業輸出目錄,path必須不存在,而且執行作業的用戶必須有創建該目錄的權限,-output只能使用一次。
(3)-mapper:指定mapper可執行程序或Java類,必須指定且唯一。
(4)-reducer:指定reducer可執行程序或Java類,必須指定且唯一。
(5)-file, -cacheFile, -cacheArchive:分別用於向計算節點分發本地文件、HDFS文件和HDFS壓縮文件,具體使用方法參考文件分發與打包。
(6)numReduceTasks:指定reducer的個數,如果設置-numReduceTasks 0或者-reducer NONE則沒有reducer程序,mapper的輸出直接作為整個作業的輸出。
(7)-jobconf | -D NAME=VALUE:指定作業參數,NAME是參數名,VALUE是參數值,可以指定的參數參考hadoop-default.xml。
-jobconf mapred.job.name='My Job Name'設置作業名
-jobconf mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW設置作業優先級
-jobconf mapred.job.map.capacity=M設置同時最多運行M個map任務
-jobconf mapred.job.reduce.capacity=N設置同時最多運行N個reduce任務
-jobconf mapred.map.tasks 設置map任務個數
-jobconf mapred.reduce.tasks 設置reduce任務個數
-jobconf mapred.compress.map.output 設置map的輸出是否壓縮
-jobconf mapred.map.output.compression.codec 設置map的輸出壓縮方式
-jobconf mapred.output.compress 設置reduce的輸出是否壓縮
-jobconf mapred.output.compression.codec 設置reduce的輸出壓縮方式
-jobconf stream.map.output.field.separator 設置map輸出分隔符
例子:-D stream.map.output.field.separator=: \ 以冒號進行分隔
-D stream.num.map.output.key.fields=2 \ 指定在第二個冒號處進行分隔,也就是第二個冒號之前的作為key,之后的作為value
(8)-combiner:指定combiner Java類,對應的Java類文件打包成jar文件后用-file分發。
(9)-partitioner:指定partitioner Java類,Streaming提供了一些實用的partitioner實現,參考KeyBasedFiledPartitoner和IntHashPartitioner。
(10)-inputformat, -outputformat:指定inputformat和outputformat Java類,用於讀取輸入數據和寫入輸出數據,分別要實現InputFormat和OutputFormat接口。如果不指定,默認使用TextInputFormat和TextOutputFormat。
(11)cmdenv NAME=VALUE:給mapper和reducer程序傳遞額外的環境變量,NAME是變量名,VALUE是變量值。
(12)-mapdebug, -reducedebug:分別指定mapper和reducer程序失敗時運行的debug程序。
(13)-verbose:指定輸出詳細信息,例如分發哪些文件,實際作業配置參數值等,可以用於調試。
使用python實現MapReduce的wordcount實例
Hadopp的基本框架是用java實現的,而各類書籍基本也是以java為例實現mapreduce,但筆者日常工作都是用python,故此找了一些資料來用python實現mapreduce實例。
一、環境
2、python3.5
二、基本思想介紹
使用python實現mapreduce調用的是Hadoop Stream,主要利用STDIN(標准輸入),STDOUT(標准輸出)來實現在map函數和reduce函數之間的數據傳遞。
我們需要做的是利用python的sys.stdin讀取輸入數據,並把輸入傳遞到sys.stdout,其他的工作Hadoop的流API會為我們處理。
三、實例
以下是在hadoop官網下載的python版本mapper函數和reducer函數,文件位置默認在/usr/local/working中,
1、mapper.py
(1)代碼
-
import sys
-
#輸入為標准輸入stdin
-
for line in sys.stdin:
-
#刪除開頭和結果的空格
-
line = line.strip( )
-
#以默認空格分隔行單詞到words列表
-
words = line.split( )
-
for word in words:
-
#輸出所有單詞,格式為“單詞,1”以便作為reduce的輸入
-
print( '%s\t%s' % (word,1))
echo "aa bb cc dd aa cc" | python mapper.py
2、reducer.py
(1)代碼
-
import sys
-
-
current_word = None
-
current_count = 0
-
word = None
-
-
#獲取標准輸入,即mapper.py的輸出
-
for line in sys.stdin:
-
line = line.strip()
-
#解析mapper.py輸出作為程序的輸入,以tab作為分隔符
-
word,count = line.split( '\t',1)
-
#轉換count從字符型成整型
-
try:
-
count = int(count)
-
except ValueError:
-
#非字符時忽略此行
-
continue
-
#要求mapper.py的輸出做排序(sort)操作,以便對連續的word做判斷
-
if current_word == word:
-
current_count +=count
-
else:
-
if current_word:
-
#輸出當前word統計結果到標准輸出
-
print( '%s\t%s' % (current_word,current_count))
-
current_count =count
-
current_word =word
-
-
#輸出最后一個word統計
-
if current_word ==word:
-
print( '%s\t%s' % (current_word,current_count))
echo "aa aa bb cc dd dd" | python mapper.py | python reducer.py
3、確保已經搭建好完全分布式的hadoop環境,在HDFS中新建文件夾
bin/hdfs dfs -mkdir /temp/
bin/hdfs dfs -mkdir /temp/hdin
4、將hadoop目錄中的LICENSE.txt文件上傳到HDFS中
bin/hdfs dfs -copyFromLocal LICENSE.txt /temp/hdin
5、執行文件work.sh
(1)代碼
-
#!/bin/bash
-
#mapper函數和reducer函數文件地址
-
export CURRENT=/usr/local/working
-
#先刪除輸出目錄
-
$HADOOP_HOME/bin/hdfs dfs -rm -r /temp/hdout
-
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
-
-input "/temp/hdin/*" \
-
-output "/temp/hdout" \
-
-mapper "python mapper.py" \
-
-reducer "python reducer.py" \
-
-file "$CURRENT/mapper.py" \
-
-file "$CURRENT/reducer.py"
sh work.sh
6、查看結果
bin/hdfs dfs -cat /temp/hdout/*
-
"AS 16
-
"COPYRIGHTS 1
-
"Contribution" 2
-
"Contributor" 2
-
"Derivative 1
-
"Legal 1
-
"License" 1
-
"License"); 1
-
"Licensed 1
-
"Licensor" 1
-
"Losses") 1
-
"NOTICE" 1
-
"Not 1
-
...
-
Python結合Shell/Hadoop實現MapReduce
基本流程為:
cat data | map | sort | reduce
cat devProbe | ./mapper.py | sort| ./reducer.py
echo "foo foo quux labs foo bar quux" | ./mapper.py | sort -k1,1 | ./reducer.py
# -k, -key=POS1[,POS2] 鍵以pos1開始,以pos2結束
如不執行下述命令,可以再py文件前加上python調用
chmod +x mapper.py
chmod +x reducer.py
對於分布式環境下,可以使用以下命令:
hadoop jar /[YOUR_PATH]/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.4.4.jar \
-file mapper.py -mapper mapper.py \
-file reducer.py -reducer reducer.py \
-input [IN_FILE] -output [OUT_DIR]
mapper.py
#!/usr/bin/python # -*- coding: UTF-8 -*- __author__ = 'Manhua' import sys for line in sys.stdin: line = line.strip() item = line.split('`') print "%s\t%s" % (item[0]+'`'+item[1], 1)
reducer.py
#!/usr/bin/python # -*- coding: UTF-8 -*- __author__ = 'Manhua' import sys current_word = None current_count = 0 word = None for line in sys.stdin: line = line.strip() word, count = line.split('\t', 1) try: count = int(count) except ValueError: #count如果不是數字的話,直接忽略掉 continue if current_word == word: current_count += count else: if current_word: print "%s\t%s" % (current_word, current_count) current_count = count current_word = word if word == current_word: #不要忘記最后的輸出 print "%s\t%s" % (current_word, current_count)
其它:
Python+Hadoop Streaming實現MapReduce任務:https://blog.csdn.net/czl389/article/details/77247534
用Python編寫MapReduce代碼與調用-某一天之前的所有活躍用戶統計https://blog.csdn.net/babyfish13/article/details/53841990
Python 實踐之 400 行 Python 寫一個類 Hadoop 的 MapReduce 框架:https://www.v2ex.com/t/149803,https://github.com/xiaojiaqi/py_hadoop
http://xiaorui.cc/2014/11/14/python使用mrjob實現hadoop上的mapreduce/
MapReduce實現兩表的Join--原理及python和java代碼實現
用Hive一句話搞定的,但是有時必須要用mapreduce
1. 概述
在傳統數據庫(如:MYSQL)中,JOIN操作是非常常見且非常耗時的。而在HADOOP中進行JOIN操作,同樣常見且耗時,由於Hadoop的獨特設計思想,當進行JOIN操作時,有一些特殊的技巧。本文首先介紹了Hadoop上通常的JOIN實現方法,然后給出了幾種針對不同輸入數據集的優化方法。
2. 常見的join方法介紹
假設要進行join的數據分別來自File1和File2.2.1 reduce side join
reduce side join是一種最簡單的join方式,其主要思想如下:在map階段,map函數同時讀取兩個文件File1和File2,為了區分兩種來源的key/value數據對,對每條數據打一個標簽(tag),比如:tag=0表示來自文件File1,tag=2表示來自文件File2。即:map階段的主要任務是對不同文件中的數據打標簽。
在reduce階段,reduce函數獲取key相同的來自File1和File2文件的value list, 然后對於同一個key,對File1和File2中的數據進行join(笛卡爾乘積)。即:reduce階段進行實際的連接操作。
2.2 map side join
之所以存在reduce side join,是因為在map階段不能獲取所有需要的join字段,即:同一個key對應的字段可能位於不同map中。Reduce side join是非常低效的,因為shuffle階段要進行大量的數據傳輸。Map side join是針對以下場景進行的優化:兩個待連接表中,有一個表非常大,而另一個表非常小,以至於小表可以直接存放到內存中。這樣,我們可以將小表復制多份,讓每個map task內存中存在一份(比如存放到hash table中),然后只掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,如果有,則連接后輸出即可。
為了支持文件的復制,Hadoop提供了一個類DistributedCache,使用該類的方法如下:
(1)用戶使用靜態方法DistributedCache.addCacheFile()指定要復制的文件,它的參數是文件的URI(如果是HDFS上的文件,可以這樣:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口號)。JobTracker在作業啟動之前會獲取這個URI列表,並將相應的文件拷貝到各個TaskTracker的本地磁盤上。(2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,並使用標准的文件讀寫API讀取相應的文件。
2.3 SemiJoin
SemiJoin,也叫半連接,是從分布式數據庫中借鑒過來的方法。它的產生動機是:對於reduce side join,跨機器的數據傳輸量非常大,這成了join操作的一個瓶頸,如果能夠在map端過濾掉不會參加join操作的數據,則可以大大節省網絡IO。實現方法很簡單:選取一個小表,假設是File1,將其參與join的key抽取出來,保存到文件File3中,File3文件一般很小,可以放到內存中。在map階段,使用DistributedCache將File3復制到各個TaskTracker上,然后將File2中不在File3中的key對應的記錄過濾掉,剩下的reduce階段的工作與reduce side join相同。
更多關於半連接的介紹,可參考:半連接介紹:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html
2.4 reduce side join + BloomFilter
在某些情況下,SemiJoin抽取出來的小表的key集合在內存中仍然存放不下,這時候可以使用BloomFiler以節省空間。BloomFilter最常見的作用是:判斷某個元素是否在一個集合里面。它最重要的兩個方法是:add() 和contains()。最大的特點是不會存在false negative,即:如果contains()返回false,則該元素一定不在集合中,但會存在一定的true negative,即:如果contains()返回true,則該元素可能在集合中。
因而可將小表中的key保存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(但是在小表中的記錄一定不會過濾掉),這沒關系,只不過增加了少量的網絡IO而已。
更多關於BloomFilter的介紹,可參考:http://blog.csdn.net/jiaomeng/article/details/1495500
3. 二次排序
在Hadoop中,默認情況下是按照key進行排序,如果要按照value進行排序怎么辦?即:對於同一個key,reduce函數接收到的value list是按照value排序的。這種應用需求在join操作中很常見,比如,希望相同的key中,小表對應的value排在前面。有兩種方法進行二次排序,分別為:buffer and in memory sort和 value-to-key conversion。
對於buffer and in memory sort,主要思想是:在reduce()函數中,將某個key對應的所有value保存下來,然后進行排序。 這種方法最大的缺點是:可能會造成out of memory。
對於value-to-key conversion,主要思想是:將key和部分value拼接成一個組合key(實現WritableComparable接口或者調用setSortComparatorClass函數),這樣reduce獲取的結果便是先按key排序,后按value排序的結果,需要注意的是,用戶需要自己實現Paritioner,以便只按照key進行數據划分。Hadoop顯式的支持二次排序,在Configuration類中有個setGroupingComparatorClass()方法,可用於設置排序group的key值,
reduce-side-join python代碼
hadoop有個工具叫做steaming,能夠支持python、shell、C++、PHP等其他任何支持標准輸入stdin及標准輸出stdout的語言,其運行原理可以通過和標准java的map-reduce程序對比來說明:
使用原生java語言實現Map-reduce程序
- hadoop准備好數據后,將數據傳送給java的map程序
- java的map程序將數據處理后,輸出O1
- hadoop將O1打散、排序,然后傳給不同的reduce機器