業務場景大概是這樣,我需要在公司hadoop集群上對博文進行結巴分詞。我的數據是存儲在hive表格中的,數據量涉及到五百萬用戶三個月內發的所有博文。
首先對於數據來說,很簡單,在hive表格中就是兩列,一列代表的是uid,一列代表的是博文內容。舉個例子如下:
uid content
12345 今天天氣真好啊
23456 中午的食物真不錯啊
... ...
對於hive表格,我在使用hadoop的時候,方法一般使用的是hive+python的形式,也就是從hive中一行行的讀取數據,每一行都經過python文件進行處理,然后將結果插入到另一個新的表格中去。
在這個過程中,我們在python文件中需要導入結巴分詞(或者其他比較好的分詞包),即 import jieba;但是在運行程序的時候發現程序報錯,通過查詢錯誤(查詢的方法很笨拙,第一感覺就是包出了問題,只要程序中沒有包的時候沒有錯誤,加入結巴分詞有了錯誤,就知道錯在哪里了)也就是說在公司集群的python運行環境下並沒有結巴分詞。
因為使用的是公司的集群,我沒有權限加載新的包,這就需要我找到別的辦法。
通過谷歌,我知道的絕大多是的解決辦法都是基於hadoop mapreduce的(簡單來講,就是通過一個mapper文件,一個reducer文件實現一個程序的執行)我在這里是實現了一個簡單的python版本的mapreduce程序
正如我在開頭提到的,我在這里使用的是hive+python的方式處理hive表格,正式的名稱我們可以稱它為hadoop streaming(從本質上來講hadoop streaming 和mapreudce是一樣的,只不過使用的方式不同)。也正是基於兩者的本質是一樣的,所以經過我的苦思冥想,終於靈光一現,從hadoop mapreduce中的解決辦法中類比出了在hadoo streaming 中的解決辦法。 簡單講一下,mapreduce中解決辦法就是在最后的執行程序中加上一行
hadoop -file jieba.mod
從本質上來講,這句話的作用就是把本地的文件傳輸到集群上面,讓這個hadoop集群有能力使用這個文件。我為什么這么理解呢? 從上面那個簡單的python版mapreduce中的程序中,我們知道最后的執行命令大概是這樣的:
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH \
-output $OUTPUT_PATH \
-mapper "python mapper.py" \
-reducer "python reducer.py" \
-file ./mapper.py \
-file ./reducer.py
具體上面的命令是什么意思,在這里就不詳細講了,可以去我那邊文章中學一下。我這邊就從感覺上講一下這個東西。我們在程序中是使用了mapper文件和mapper文件的。這個文件也是在本地上,如果我們想要在集群上使用這兩個文件,就需要我們使用命令將這兩個文件從本地上傳到集群上。我們使用的命令就是hadoop -file.
類比的,我們就知道hadoop -file jieba.mod 的作用就是把結巴壓縮包上傳到集群上去。
這一點就是最關鍵的地方!
在hadoop streaming中(hive+python),我們在使用hive UDF(hive 自定義函數)的時候,一定會用到add file這個命令。比如
add file ./process.py
process.py就是python處理數據的文件。我們為什么使用到這個命令呢?就是因為這個處理文件是在本地的。如果想要在集群上使用這個命令,就需要把這個文件上傳到集群上去,在streaming我們就是使用add file 這個命令來把文件上傳到集群上去的。
這樣,我們就把兩者的聯系在了一起,也就是說,如果我們想把結巴分詞這個包上傳到集群上去,在mapreduce中我們使用的是haddop -file,在streaming中我們就要使用的是add file.
邏輯理清了,接下來就非常清楚了,就是寫代碼的工作了。
當然具體的博文數據肯定是比這個更加復雜的,比如在博文中會有着表情內容等等。我們假設上面的這博文表格名稱為 temp_zida_blog_content.
大致的思想就是這樣,接下來上代碼:
首先我們從官網下載結巴分詞包,點這里.
下載的文件名字為:jieba-0.39.zip 可能版本不同,名字不同。解壓之后,進入這個文件,找到一個文件夾名字為'jieba'(不帶版本號)(在這里一定要注意我們一定要要進入到解壓縮之后的文件中找到這個不帶版本號的文件,這個才是我們要壓縮的文件,如果我們直接使用從官網下載之后的壓縮文件,是會出錯誤的),對這個不帶版本號的文件夾進行壓縮,並改成mod后綴名:
zip -r jieba.zip jieba
mv jieba.zip jieba.mod
為了在python中使用結巴分詞,我們在python代碼中引入的就是jieba.mod。需要注意的一點就是這個jieba.mod需要和python代碼在同一個目錄下,所以為了處理數據方便,最好是把處理數據需要的所有代碼和相應配置文件全部放在一個文件夾中。
這樣在python代碼中,我們就可以這樣引入結巴分詞包:
zida.py
import sys,math,random
reload(sys)
sys.setdefaultencoding('utf-8')
sys.path.append('./') #這個步驟非常的重要,點名了導入文件的路徑
import zipimport
importer = zipimport.zipimporter('jieba.mod')
jieba = importer.load_module('jieba')
import jieba
import re
##下面的代碼就是對每一行導進來的hive表格內容進行處理,然后返回相應的處理之后的數據
正如我之前所說的,只是python中引入這個包,是不夠的,因為我們現在python文件中使用的包是從本地中導進去的,那么也就是說並不能適用在公司的hadoop的集群中,所以我們還需要在shell腳本中進行相應的處理(這一步也就是我這個業務場景和我在谷歌找到解決辦法很不一樣的地方,他們都是在執行mapreduce的那一步加入了本地相應的文件)
##在下面這個函數中,最重要的就是add file ./jieba.mod;,它代表的就是使用上本地的包文件。這一步其實很容易通過dd file ./zida.py這行代碼來理解。因為首先我們的shell腳本是在集群上運行的,為了處理hive表格數據,我們使用本地的python腳本,為了讓python腳本能夠在集群上跑,我們使用了這行代碼,類似於把python腳本拋到集群上。類比去想,如果我們想在python腳本中使用相應的包,那么只需要在這里把相應的包拋上去就好了,然后在python腳本中導入就好,因為已經被拋到集群中了,當然可以被引用。
function process_data(){
cat <<EOF
add file ./jieba.mod;
add file ./zida.py;
INSERT OVERWRITE TABLE temp_zida_uids_bowen_content
select transform(tmp.*) using 'python zida.py test'
AS uid,bowen
FROM(
select a.uid,b.extend,b.content from
(select distinct(uid) from temp_zida_uids_day)a
left outer join
(select uid,extend,content from ods_tblog_content where dt>=dt_ods_tblog_content_90days_ago and dt<=dt_ods_tblog_content)b
on a.uid=b.uid
)tmp
EOF
}
hive -e "`process_data`"
echo "process_data"
其實可以引申一下,如果我們想在python腳本中使用或者說讀進來本地的文件,比如說停用詞,我們還需要在shell腳本中add這個停用詞文件,然后才能從python腳本中讀取。具體可以看我這里的講解如何在處理hive表格的python代碼中導入外部文件
公司的python運行環境比較老舊了,還是2的版本,不支持中文,搞得我代碼正確的情況下,失敗了好多次,浪費了大量時間,一定要記住在python腳本文件最上面寫上:
#coding=utf-8
接下來我列取一些我參考的網頁
這個網頁是我主要參考的部分-給了我很多的思考-使用Hadoop的MapReduce和jieba分詞統計西游記中的詞頻-這個網頁中很厲害,它不只是導入了結巴分詞,還導入了Jieba.analyse,同時進行了相應的改變,比我這個要復雜一些
How can I include a python package with Hadoop streaming job?
[How to use Cascading with Hadoop Streaming]-谷歌上其他的解決辦法基本上都是基於這個網頁的解答,給了我很多啟發(http://eigenjoy.com/2009/11/18/how-to-use-cascading-with-hadoop-streaming/)-從這個網頁學到了在下載完結巴分詞之后,解壓之后還需要進去,把沒有帶版本號的那個文件夾進行一個壓縮.