MapReduce 實現數據join操作


前段時間有一個業務需求,要在外網商品(TOPB2C)信息中加入 聯營自營 識別的字段。但存在的一個問題是,商品信息  自營聯營標示數據是 兩份數據;商品信息較大,是存放在hbase中。他們之前唯一的關聯是url。所以考慮用urlkey將兩者做join,將 聯營自營標識 信息加入的商品信息中,最終生成我需要的數據;

 

一,首先展示一下兩份數據的demo example

1. 自營聯營標識數據(下面開始就叫做unionseller.txt

http://cn.abc.www/product43675,1

http://cn.abc.www/product43710,0

 

(是兩列的數據,其中第一列是url也即是join操作要使用的key,后面的0,1是表示自營聯營標識,0表示自營,1表示聯營)

 

2. 商品信息數據(下面開始叫做shangpin_hbase.txt

ROWKEY=http://cn.abc.www/product43675^_

 

image:image_url@1375896006814=http://*/*.jpg^_

image:is_default_image@1375897937280=0^_

image:thumbnail@1375897937280=**^_

parser:attribute@1375896006814={

......

...... 

uri:url@1375896006814=http://www.baby.com.cn/product-35520^_

^^

(上面是一條商品數據的樣例,使用的是spider團隊提供的一個jar包從hbase中導出來的,但也是比較標准的模式,可以使用hbase中HBaseDocInputFormat的類進行讀取,其中貌似笑臉的那些field separator和line separator實際上是不可見字符,這里主要是為了展示而用可見字符代替;處於隱私原因,將一些內容用*代替)

 

由於商品數據是在hbase里面的,所以首先將商品數據從hbase中dump出來,通過rowkey來訪問hbase大致有兩種方式:

1. 通過單個row key訪問:即按照某個row key鍵值進行get操作;

2. 通過row key的range進行scan;在范圍內進行掃描;

一般來說,scan的方式會快一些。由於商品數據集的數量較為龐大(會有2000多萬條url的商品及屬性),並且商品是以url(host反轉的方式,例如http://cn.abc.www/product43675)作為rowkey進行存儲的。所以采用scan的方式從hbase中取出上商品數據集。

 

接下來獲取自營聯營識別數據,由於自營聯營標識在hbase中是沒有的,是spider團隊在進行商品數據dump的時候計算出來的,所以直接用python寫個streaming程序從dump數據中取得url字段和自營聯營標識就可以了。較為簡單,略過,本文主要來講解join操作和hbase字段解析的過程。

 

二,現在我們手頭上有了shangpin_hbase.txt以及unionseller.txt下面就可以考慮實現join操作的mapreduce了。

用mapreduce實現join比較常見的有兩種方法,map端join和reduce端join

1. Mapjoin所針對的場景是,兩個要進行join的數據集中,其中一個非常大,另一個非常小,以至於我們可以將小的數據集放到內存中。這樣我們可以將小數據集復制多份,每個運行map task的內存中都存在一份(可以使用hash table),這樣map task可以只掃描大的數據,對於大數據中的每一條記錄,去小數據中找到相應的key的數據,然后連接輸出;如果想讓小數據集復制到每個map task中,可以使用mapreduce提供的Distributed Cache機制。

​2. Reduce端join是一種比較簡單和容易想到的方式,適合的環境也更為普遍。基本思路是在mapper中為每一個記錄打上標記,並且使用連接鍵作為map輸出鍵,使鍵相同的記錄能夠被分到同一個reducer中。

Reduce端join比較簡單,但缺點是兩個數據集都要經過mapreduce的shuffle過程(里面涉及到寫磁盤,歸並排序,還有網絡傳輸等)。所以reduce端的join操作,效率往往低些。本文采用簡單一些的reduce端join的實現。

 

​1. 首先是mapper編寫,由於join操作至少有兩份數據集,所以常需要使用mapReduce的MultipleInputs.addInputPath()來添加多個輸入文件的路徑。也需要兩個mapper類來實現對不同數據集的處理,首先來看一下處理unionseller.txt的mapper程序:

alt

由於unionseller.txt的每一行是以逗號分割的兩列,所以只要用逗號將兩者分隔開就好,得到作為連接鍵的urllst[0]和自營聯營識別標識項lst[1]Map的的輸出keyTextPair類型,輸出valuePut類型。這兩個重點說一下:

​(1)TextPair類型

map的輸出即reduce的輸入,有相同key的數據會被運送到同一個reduce來處理。為了在reduce端能夠區分unionseller.txtshangpin_hbase.txt類型,在TextPair中增加了一個識別字段,unionseller.txt “0”,即new TextPair(lst[0],”0”);而shangpin_hbase.txt字段是”1”,這樣在reduce端就可以有依據來區分數據的類型。

TextPair是一個自定義的數據類型,其包含兩個成員變量,firstsecond,兩個成員變量都是Hadoop自帶的Text類型。自己實現自定義的Key是要有些限制的,原因是:

(aMapper會根據key進行hash操作來決定記錄被分配到哪一個reducer中去;

(b)在MapReduce內部機制中,在Mapper以及Reducer階段都涉及到將數據寫到磁盤的IO操作(spill階段)和根據key對數據進行排序的操作。所以這就要求Mapper中的key是可以比較的,並且keyvalue是都是可以序列化的。Hadoop對自帶的數據類型(Text,IntWritable等)都實現了序列化方法和內置的比較方法,但是對於自定義類型,就必須自己去實現相應的接口,並且重寫方法。TextPair代碼如下:

alt

altalt

由於key需要進行比較並且需要序列化,所以實現了WritableComparable接口,其中compareTo()是實現了比較的方法,而readFieldswrite方法則是用於序列化的,告訴hadoop怎樣讀和寫自定義的數據結構。這里解決了對key比較和序列化的問題,那么mapper對數據進行分發(決定到將記錄發送到哪個reduce)的操作該怎樣實現,這個需要集成Partitioner類並重寫getPartition方法,實現自己的分發策略。

alt

​這里面自己定義了hash方法。

​(2)介紹完了TextPair類,說一下Put類型,說Put類型之前,先看下處理shangpin_hbase.txtmapper實現:

alt

Put類型是Hbase自帶的一個類型,由於shangpin_hbase.txt是從Hbase中導出來的,需要Hbase中的HBaseDocInputFormat.class進行解析,解析之后會成為<ImmutableBytesWritable, Put>的鍵值對,所以Mapperkeyvalue分別為ImmutableBytesWritablePut類型。Put”row”就是這條數據的rowkeyPut本質上是一個<byte[], List<KeyValue>結構的map,其中KeyValue也是Hbase所定義的一個數據類型,KeyValue是將Hbasetimestampfamilyqualifiervalue扁平化存儲的數據結構,要細講就多了,感興趣的可以看一下hbase的源代碼。本文在后面會一個將Put轉化為String的代碼。處理shangpin_hbase.txtmap輸出的textpair的標識是”1”

​2. 兩個Mapper的工作介紹完了,接下來就要編寫Reducer了。想一想,Reducer的任務比較簡單,含有同樣key的數據都在一起了。只要根據”0””1”的,取出自營,聯營的識別標識,然后將這個塞到商品信息中就好了。

但這里面還有一個坑,就是shuffle的group過程,其實MapReduce框架中,在Reduce之前有一個group操作,將數據進行分組,同一個分組的數據會在一次reduce函數中被處理。group默認會使用keycompareTo方法來進行分組操作,按照上面TextPaircompareTo方法,url相同的”0””1”數據是分不到一個group里面的。這樣從業務邏輯上分析是有問題的,所以我們需要對group的比較方法進行調整,MapReduce框架中也可以自定義group的比較方法:

alt

這里我們設置,只有url相同,數據就會被放到同一個group里面。

下面是Reduce的代碼以及 將Hbase中Put類型轉化為String的方法:

alt

alt

至此,這次join操作就講完了。附件中有實例代碼。其實這個需求實現起來也可以不使用這么多的自定義函數,只不過文章中的實現更有助於了解MapReduce的原理。

 

By the way,用hive實現join是更簡單的。。。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM