Python實現MapReduce,wordcount實例,MapReduce實現兩表的Join


Python實現MapReduce

下面使用mapreduce模式實現了一個簡單的統計日志中單詞出現次數的程序: 

from functools import reduce from multiprocessing import Pool from collections import Counter def read_inputs(file): for line in file: line = line.strip() yield line.split() def count(file_name): file = open(file_name) lines = read_inputs(file) c = Counter() for words in lines: for word in words: c[word] += 1 return c def do_task(): job_list = ['log.txt'] * 10000 pool = Pool(8) return reduce(lambda x, y: x+y, pool.map(count, job_list)) if __name__ == "__main__": rv = do_task()







一個python實現的mapreduce程序

版權聲明:本文為博主原創文章,未經博主允許不得轉載。 https://blog.csdn.net/weijianpeng2013_2015/article/details/71908340

map:

# !/usr/bin/env python import sys for line in sys.stdin: line = line.strip() words = line.split() for word in words: print ("%s\t%s") % (word, 1) 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

reduce:

#!/usr/bin/env python import operator import sys current_word = None curent_count = 0 word = None for line in sys.stdin: line = line.strip() word, count = line.split('\t', 1) try: count = int(count) except ValueError: continue if current_word == word: curent_count += count else: if current_word: print '%s\t%s' % (current_word,curent_count) current_word=word curent_count=count if current_word==word: print '%s\t%s' % (current_word,curent_count)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

測試:

[root@node1 input]# echo "foo foo quux labs foo bar zoo zoo hying" | /home/hadoop/input/max_map.py | sort | /home/hadoop/input/max_reduce.py
  • 1

這里寫圖片描述

執行:可將其寫入腳本文件

 //注意\-file之間一定不能空格
hadoop jar /hadoop64/hadoop-2.7.1/share/hadoop/tools/lib/hadoop-*streaming*.jar -D stream.non.zero.exit.is.failure=false \-file /home/hadoop/input/max_map.py -mapper /home/hadoop/input/max_map.py \-file /home/hadoop/input/max_reduce.py -reducer /home/hadoop/input/max_reduce.py \-input /input/temperature/ -output /output/temperature
  • 1
  • 2

這里寫圖片描述

 

 
 
 
 
 

Hadoop(三):MapReduce程序(python)

 

使用python語言進行MapReduce程序開發主要分為兩個步驟,一是編寫程序,二是用Hadoop Streaming命令提交任務。

還是以詞頻統計為例

一、程序開發
1、Mapper

1 for line in sys.stdin:
2     filelds = line.strip.split(' ')
3     for item in fileds:
4         print item+' '+'1'

2、Reducer

復制代碼
 1 import sys
 2 
 3 result={}
 4 for line in  sys.stdin:
 5     kvs = line.strip().split(' ')
 6     k = kvs[0]
 7     v = kvs[1]
 8     if k in result:
 9         result[k]+=1
10     else:
11         result[k] = 1
12 for k,v in result.items():
13     print k+' '+v
復制代碼

....

寫完發現其實只用map就可以處理了...reduce只用cat就好了

3、運行腳本

1)Streaming簡介

  Hadoop的MapReduce和HDFS均采用Java進行實現,默認提供Java編程接口,用戶通過這些編程接口,可以定義map、reduce函數等等。
  但是如果希望使用其他語言編寫map、reduce函數怎么辦呢?
  Hadoop提供了一個框架Streaming,Streaming的原理是用Java實現一個包裝用戶程序的MapReduce程序,該程序負責調用hadoop提供的Java編程接口。

2)運行命令

  /.../bin/hadoop streaming

  -input /..../input

  -output /..../output

  -mapper "mapper.py"

  -reducer "reducer.py"

  -file mapper.py

  -file reducer.py

  -D mapred.job.name ="wordcount"

  -D mapred.reduce.tasks = "1"

3)Streaming常用命令

(1)-input <path>:指定作業輸入,path可以是文件或者目錄,可以使用*通配符,-input選項可以使用多次指定多個文件或目錄作為輸入。

(2)-output <path>:指定作業輸出目錄,path必須不存在,而且執行作業的用戶必須有創建該目錄的權限,-output只能使用一次。

(3)-mapper:指定mapper可執行程序或Java類,必須指定且唯一。

(4)-reducer:指定reducer可執行程序或Java類,必須指定且唯一。

(5)-file, -cacheFile, -cacheArchive:分別用於向計算節點分發本地文件、HDFS文件和HDFS壓縮文件,具體使用方法參考文件分發與打包

(6)numReduceTasks:指定reducer的個數,如果設置-numReduceTasks 0或者-reducer NONE則沒有reducer程序,mapper的輸出直接作為整個作業的輸出。

(7)-jobconf | -D NAME=VALUE:指定作業參數,NAME是參數名,VALUE是參數值,可以指定的參數參考hadoop-default.xml。

   -jobconf mapred.job.name='My Job Name'設置作業名

   -jobconf mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW設置作業優先級

   -jobconf mapred.job.map.capacity=M設置同時最多運行M個map任務

   -jobconf mapred.job.reduce.capacity=N設置同時最多運行N個reduce任務

   -jobconf mapred.map.tasks 設置map任務個數

   -jobconf mapred.reduce.tasks 設置reduce任務個數   

   -jobconf mapred.compress.map.output 設置map的輸出是否壓縮

   -jobconf mapred.map.output.compression.codec 設置map的輸出壓縮方式   

   -jobconf mapred.output.compress 設置reduce的輸出是否壓縮

   -jobconf mapred.output.compression.codec 設置reduce的輸出壓縮方式

   -jobconf stream.map.output.field.separator 設置map輸出分隔符

    例子:-D stream.map.output.field.separator=: \  以冒號進行分隔

            -D stream.num.map.output.key.fields=2 \  指定在第二個冒號處進行分隔,也就是第二個冒號之前的作為key,之后的作為value

(8)-combiner:指定combiner Java類,對應的Java類文件打包成jar文件后用-file分發。

(9)-partitioner:指定partitioner Java類,Streaming提供了一些實用的partitioner實現,參考KeyBasedFiledPartitonerIntHashPartitioner

(10)-inputformat, -outputformat:指定inputformat和outputformat Java類,用於讀取輸入數據和寫入輸出數據,分別要實現InputFormat和OutputFormat接口。如果不指定,默認使用TextInputFormat和TextOutputFormat。

(11)cmdenv NAME=VALUE:給mapper和reducer程序傳遞額外的環境變量,NAME是變量名,VALUE是變量值。

(12)-mapdebug, -reducedebug:分別指定mapper和reducer程序失敗時運行的debug程序。

(13)-verbose:指定輸出詳細信息,例如分發哪些文件,實際作業配置參數值等,可以用於調試。

 

 
 
 
 
 
 

使用python實現MapReduce的wordcount實例

版權聲明:本文為博主原創文章,未經博主允許不得轉載。 https://blog.csdn.net/sinat_33741547/article/details/54428025

Hadopp的基本框架是用java實現的,而各類書籍基本也是以java為例實現mapreduce,但筆者日常工作都是用python,故此找了一些資料來用python實現mapreduce實例。

一、環境

1、Hadoop-2.7.3完全分布式搭建

2、python3.5

二、基本思想介紹

使用python實現mapreduce調用的是Hadoop Stream,主要利用STDIN(標准輸入),STDOUT(標准輸出)來實現在map函數和reduce函數之間的數據傳遞。

我們需要做的是利用python的sys.stdin讀取輸入數據,並把輸入傳遞到sys.stdout,其他的工作Hadoop的流API會為我們處理。

三、實例

以下是在hadoop官網下載的python版本mapper函數和reducer函數,文件位置默認在/usr/local/working中,

1、mapper.py

(1)代碼

 

  1.  
    import sys
  2.  
    #輸入為標准輸入stdin
  3.  
    for line in sys.stdin:
  4.  
    #刪除開頭和結果的空格
  5.  
    line = line.strip( )
  6.  
    #以默認空格分隔行單詞到words列表
  7.  
    words = line.split( )
  8.  
    for word in words:
  9.  
    #輸出所有單詞,格式為“單詞,1”以便作為reduce的輸入
  10.  
    print( '%s\t%s' % (word,1))
(2)可對代碼進行檢驗

 

echo "aa bb cc dd aa cc" | python mapper.py

2、reducer.py

(1)代碼

 

  1.  
    import sys
  2.  
     
  3.  
    current_word = None
  4.  
    current_count = 0
  5.  
    word = None
  6.  
     
  7.  
    #獲取標准輸入,即mapper.py的輸出
  8.  
    for line in sys.stdin:
  9.  
    line = line.strip()
  10.  
    #解析mapper.py輸出作為程序的輸入,以tab作為分隔符
  11.  
    word,count = line.split( '\t',1)
  12.  
    #轉換count從字符型成整型
  13.  
    try:
  14.  
    count = int(count)
  15.  
    except ValueError:
  16.  
    #非字符時忽略此行
  17.  
    continue
  18.  
    #要求mapper.py的輸出做排序(sort)操作,以便對連續的word做判斷
  19.  
    if current_word == word:
  20.  
    current_count +=count
  21.  
    else:
  22.  
    if current_word:
  23.  
    #輸出當前word統計結果到標准輸出
  24.  
    print( '%s\t%s' % (current_word,current_count))
  25.  
    current_count =count
  26.  
    current_word =word
  27.  
     
  28.  
    #輸出最后一個word統計
  29.  
    if current_word ==word:
  30.  
    print( '%s\t%s' % (current_word,current_count))
(2)對代碼進行檢驗

 

echo "aa aa bb cc dd dd" | python mapper.py | python reducer.py

3、確保已經搭建好完全分布式的hadoop環境,在HDFS中新建文件夾

bin/hdfs dfs -mkdir /temp/

bin/hdfs dfs -mkdir /temp/hdin

4、將hadoop目錄中的LICENSE.txt文件上傳到HDFS中

bin/hdfs dfs -copyFromLocal LICENSE.txt /temp/hdin

5、執行文件work.sh

(1)代碼

 

  1.  
    #!/bin/bash  
  2.  
    #mapper函數和reducer函數文件地址
  3.  
    export CURRENT=/usr/local/working
  4.  
    #先刪除輸出目錄  
  5.  
    $HADOOP_HOME/bin/hdfs dfs -rm -r /temp/hdout   
  6.  
    $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
  7.  
    -input "/temp/hdin/*" \
  8.  
    -output "/temp/hdout" \
  9.  
    -mapper "python mapper.py" \
  10.  
    -reducer "python reducer.py" \
  11.  
    -file "$CURRENT/mapper.py" \
  12.  
    -file "$CURRENT/reducer.py"
(2)執行代碼

 

sh work.sh

6、查看結果

bin/hdfs dfs -cat /temp/hdout/*

 

  1.  
    "AS 16
  2.  
    "COPYRIGHTS 1
  3.  
    "Contribution" 2
  4.  
    "Contributor" 2
  5.  
    "Derivative 1
  6.  
    "Legal 1
  7.  
    "License" 1
  8.  
    "License"); 1
  9.  
    "Licensed 1
  10.  
    "Licensor" 1
  11.  
    "Losses") 1
  12.  
    "NOTICE" 1
  13.  
    "Not 1
  14.  
    ...
  15.  

 
 
 














Python結合Shell/Hadoop實現MapReduce

 

基本流程為: 

cat data | map | sort | reduce

cat devProbe | ./mapper.py | sort| ./reducer.py

echo "foo foo quux labs foo bar quux" | ./mapper.py | sort -k1,1 | ./reducer.py

# -k, -key=POS1[,POS2]     鍵以pos1開始,以pos2結束

 

如不執行下述命令,可以再py文件前加上python調用

chmod +x mapper.py
chmod +x reducer.py

 

對於分布式環境下,可以使用以下命令:

hadoop jar /[YOUR_PATH]/hadoop/tools/lib/hadoop-streaming-2.6.0-cdh5.4.4.jar \
 -file mapper.py -mapper mapper.py \
 -file reducer.py -reducer reducer.py \
 -input [IN_FILE]    -output [OUT_DIR]

 

 

mapper.py

復制代碼
#!/usr/bin/python
# -*- coding: UTF-8 -*-

__author__ = 'Manhua'

import sys
for line in sys.stdin:
    line = line.strip()
    item = line.split('`')
    print "%s\t%s" % (item[0]+'`'+item[1], 1)
復制代碼

 

reducer.py

復制代碼
#!/usr/bin/python
# -*- coding: UTF-8 -*-

__author__ = 'Manhua'


import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:  #count如果不是數字的話,直接忽略掉
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print "%s\t%s" % (current_word, current_count)
        current_count = count
        current_word = word

if word == current_word:  #不要忘記最后的輸出
    print "%s\t%s" % (current_word, current_count)
復制代碼

 

 

 

 

其它:

Python+Hadoop Streaming實現MapReduce任務:https://blog.csdn.net/czl389/article/details/77247534

用Python編寫MapReduce代碼與調用-某一天之前的所有活躍用戶統計https://blog.csdn.net/babyfish13/article/details/53841990

Python 實踐之 400 行 Python 寫一個類 Hadoop 的 MapReduce 框架:https://www.v2ex.com/t/149803,https://github.com/xiaojiaqi/py_hadoop

http://xiaorui.cc/2014/11/14/python使用mrjob實現hadoop上的mapreduce/ 








MapReduce實現兩表的Join--原理及python和java代碼實現

版權聲明:本文為博主原創文章,未經博主允許不得轉載。 https://blog.csdn.net/yimingsilence/article/details/70242604

用Hive一句話搞定的,但是有時必須要用mapreduce

1. 概述

在傳統數據庫(如:MYSQL)中,JOIN操作是非常常見且非常耗時的。而在HADOOP中進行JOIN操作,同樣常見且耗時,由於Hadoop的獨特設計思想,當進行JOIN操作時,有一些特殊的技巧。
本文首先介紹了Hadoop上通常的JOIN實現方法,然后給出了幾種針對不同輸入數據集的優化方法。

2. 常見的join方法介紹

假設要進行join的數據分別來自File1和File2.

2.1 reduce side join

reduce side join是一種最簡單的join方式,其主要思想如下:
在map階段,map函數同時讀取兩個文件File1和File2,為了區分兩種來源的key/value數據對,對每條數據打一個標簽(tag),比如:tag=0表示來自文件File1,tag=2表示來自文件File2。即:map階段的主要任務是對不同文件中的數據打標簽。
在reduce階段,reduce函數獲取key相同的來自File1和File2文件的value list, 然后對於同一個key,對File1和File2中的數據進行join(笛卡爾乘積)。即:reduce階段進行實際的連接操作。

2.2 map side join

之所以存在reduce side join,是因為在map階段不能獲取所有需要的join字段,即:同一個key對應的字段可能位於不同map中。Reduce side join是非常低效的,因為shuffle階段要進行大量的數據傳輸。
Map side join是針對以下場景進行的優化:兩個待連接表中,有一個表非常大,而另一個表非常小,以至於小表可以直接存放到內存中。這樣,我們可以將小表復制多份,讓每個map task內存中存在一份(比如存放到hash table中),然后只掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,如果有,則連接后輸出即可。
為了支持文件的復制,Hadoop提供了一個類DistributedCache,使用該類的方法如下:
(1)用戶使用靜態方法DistributedCache.addCacheFile()指定要復制的文件,它的參數是文件的URI(如果是HDFS上的文件,可以這樣:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口號)。JobTracker在作業啟動之前會獲取這個URI列表,並將相應的文件拷貝到各個TaskTracker的本地磁盤上。(2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,並使用標准的文件讀寫API讀取相應的文件。

2.3 SemiJoin

SemiJoin,也叫半連接,是從分布式數據庫中借鑒過來的方法。它的產生動機是:對於reduce side join,跨機器的數據傳輸量非常大,這成了join操作的一個瓶頸,如果能夠在map端過濾掉不會參加join操作的數據,則可以大大節省網絡IO。
實現方法很簡單:選取一個小表,假設是File1,將其參與join的key抽取出來,保存到文件File3中,File3文件一般很小,可以放到內存中。在map階段,使用DistributedCache將File3復制到各個TaskTracker上,然后將File2中不在File3中的key對應的記錄過濾掉,剩下的reduce階段的工作與reduce side join相同。
更多關於半連接的介紹,可參考:半連接介紹:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html

2.4 reduce side join + BloomFilter

在某些情況下,SemiJoin抽取出來的小表的key集合在內存中仍然存放不下,這時候可以使用BloomFiler以節省空間。
BloomFilter最常見的作用是:判斷某個元素是否在一個集合里面。它最重要的兩個方法是:add() 和contains()。最大的特點是不會存在false negative,即:如果contains()返回false,則該元素一定不在集合中,但會存在一定的true negative,即:如果contains()返回true,則該元素可能在集合中。
因而可將小表中的key保存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(但是在小表中的記錄一定不會過濾掉),這沒關系,只不過增加了少量的網絡IO而已。
更多關於BloomFilter的介紹,可參考:http://blog.csdn.net/jiaomeng/article/details/1495500

3. 二次排序

在Hadoop中,默認情況下是按照key進行排序,如果要按照value進行排序怎么辦?即:對於同一個key,reduce函數接收到的value list是按照value排序的。這種應用需求在join操作中很常見,比如,希望相同的key中,小表對應的value排在前面。
有兩種方法進行二次排序,分別為:buffer and in memory sort和 value-to-key conversion。
對於buffer and in memory sort,主要思想是:在reduce()函數中,將某個key對應的所有value保存下來,然后進行排序。 這種方法最大的缺點是:可能會造成out of memory。
對於value-to-key conversion,主要思想是:將key和部分value拼接成一個組合key(實現WritableComparable接口或者調用setSortComparatorClass函數),這樣reduce獲取的結果便是先按key排序,后按value排序的結果,需要注意的是,用戶需要自己實現Paritioner,以便只按照key進行數據划分。Hadoop顯式的支持二次排序,在Configuration類中有個setGroupingComparatorClass()方法,可用於設置排序group的key值,

 

reduce-side-join python代碼 

hadoop有個工具叫做steaming,能夠支持python、shell、C++、PHP等其他任何支持標准輸入stdin及標准輸出stdout的語言,其運行原理可以通過和標准java的map-reduce程序對比來說明:

 

使用原生java語言實現Map-reduce程序
  1. hadoop准備好數據后,將數據傳送給java的map程序
  2. java的map程序將數據處理后,輸出O1
  3. hadoop將O1打散、排序,然后傳給不同的reduce機器




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM