在 Spark 中使用 IPython Notebook


本文是從 IPython Notebook 轉化而來,效果沒有本來那么好。

主要為體驗 IPython Notebook。至於題目,改成《在 IPython Notebook 中使用 Spark》也可以,沒什么差別。為什么是 Spark?因為這兩天在看《Spark 機器學習》這本書第 3 章,所以就順便做個筆記。

簡單介紹下,IPython notebook 對數據科學家來說是個交互地呈現科學和理論工作的必備工具,它集成了文本和 Python 代碼。Spark 是個通用的集群計算框架,通過將大量數據集計算任務分配到多台計算機上,提供高效內存計算。

搭建環境

一台阿里雲服務器,配置如下,108 元/月,然后在 Windows 7 上使用 Putty 和遠程桌面操作服務器。

CPU:1核
內存:2048 MB
操作系統:Ubuntu 14.04 64位
固定帶寬:1Mbps

IPython Notebook 的安裝很簡單,強烈推薦一個預編譯的科學 Python 套件 Anaconda,按照官方網站安裝,然后在 Terminal 里執行 ipython notebook 即可。

在我的 Ubuntu 服務器上打開 ipython notebook 時報錯了:socket.error Errno 99 Cannot assign requested address。
這個問題耗了我一天時間。關於這個問題, ,但只有下面兩種方法管用。

一種是,在命令后加參數 --ip:ipython notebook --ip=127.0.0.1

另一種是,先生成 notebook 配置文件:命令行執行 jupyter notebook --generate-config,然后打開生成的文件: vi ~/.jupyter/jupyter_notebook_config.py,修改 c.NotebookApp.ip = '127.0.0.1'。

如果想外網也可以訪問,ip 就設為外網 IP 地址。我選擇的是第二種,設的外網 IP 地址,這樣就可以在 Windows 上編輯 ipython notebook 文件了,非常方便。

Spark 的安裝也很簡單,具體安裝和使用可參考我之前的筆記官方網站

如何在 Spark 中使用 IPython Notebook,或者如何在 IPython Notebook 中使用 spark,也耗費了我一天時間。

網上很多文章都是建議:1、執行 ipython profile create spark;2、創建 ~/.ipython/profile_spark/startup/00-pyspark-setup.py 文件並修改;3、啟動 IPython notebook:ipython notebook --profile spark。
但這種方法在我這行不通,百般折騰,就是各種不行。
后來終於發現一種簡單可行的方法,那就是修改 ~/.bashrc 文件,添加以下內容:

export PYSPARK_DRIVER_PYTHON=ipython2 # As pyspark only works with python2 and not python3
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"

然后source .bashrc,就可以通過啟動 pyspark 來啟動 IPython Notebook 了。也就可以在 IPython Notebook 中使用 pyspark 了。

下面我們通過實例演示 Spark 在 IPython Notebook 中的使用。

Spark 上數據的探索和處理

MovieLens 100k 數據集

這里要使用的是著名的 MovieLens 100k 數據集,該數據集包含用戶對電影的 10 萬次評分數據,也包含電影元數據和用戶屬性數據。數據集不大,壓縮文件不到 5M,常用於推薦系統研究。

可到官方網站下載,解壓后會創建一個名為 ml-100k 的文件夾,該目錄中重要的文件有 u.user(用戶屬性文件)、u.item(電影元數據) 和 u.data(用戶對電影的評分)。

數據集的更多信息可以從 README 獲得,包括每個數據文件里的變量定義。我們可以使用 head 命令來查看各個文件的內容。

先來看 u.user:

#head -5 u.user
1|24|M|technician|85711
2|53|F|other|94043
3|23|M|writer|32067
4|24|M|technician|43537
5|33|F|other|15213

可以看到 u.user 文件包含 user id、age、gender、occupation 和 ZIP code 這些屬性,各屬性之間用管道符(|)隔開。

再來看 u.item:

# head -5 u.item
1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0

u.item 文件包含 movie id、title、release date 以及若干與 IMDB Link 和電影分類相關的屬性,各屬性之間也用 | 符號分隔。電影分類的屬性有 unknown | Action | Adventure | Animation | Children's | Comedy | Crime | Documentary | Drama | Fantasy | Film-Noir | Horror | Musical | Mystery | Romance | Sci-Fi | Thriller | War | Western。

最后 u.data:

# head -5 u.data
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 886397596

u.data 文件包含 user id、movie id、rating(從 1 到 5)和 timestamp 屬性。各屬性間用指標符分隔。timestamp 是從 1/1/1970 UTC 開始的秒數。

探索與可視化數據集

先來探索用戶數據

user_data = sc.textFile("ml-100k/u.user") 
user_data.first()  #此處如能輸出數據文件首行,則說明環境搭建沒問題
u'1|24|M|technician|85711'

sc 是 Spark shell 啟動時自動創建的一個 SparkContext 對象,shell 通過該對象來訪問 Spark。可以通過下列方法輸出 sc 來查看它的類型。

sc
<pyspark.context.SparkContext at 0x7fb65c173450>

一旦有了 SparkContext,你就可以用它來創建 RDD。RDD 是彈性分布式數據集(Resilient Distributed Dataset),在 Spark 中,我們通過對 RDD 的操作來表達我們的計算意圖,這些計算會自動地在集群上並行進行。

如上面代碼創建了一個名為 user_data 的 RDD,然后使用 user_data.first() 輸出了 RDD 中的第一個元素。

下面用“|”字符來分隔各行數據。這將生成一個 RDD,其中每一個記錄對應一個 Python 列表,各列表由用戶 ID、年齡、性別、職業和郵編五個屬性構成。之后再統計用戶、性別、職業和郵編的數目。可通過下列代碼實現。

user_fields = user_data.map(lambda line: line.split("|"))
num_users = user_fields.map(lambda fields: fields[0]).count()
num_genders = user_fields.map(lambda fields: fields[2]).distinct().count()
num_occupations = user_fields.map(lambda fields: fields[3]).distinct().count()
num_zipcodes = user_fields.map(lambda fields: fields[4]).distinct().count()
print "Users: %d, genders: %d, occupations: %d, ZIP codes: %d" % (num_users, num_genders, num_occupations, num_zipcodes)
Users: 943, genders: 2, occupations: 21, ZIP codes: 795

接着用 Matplotlib 的 hist 函數來創建一個直方圖,以分析用戶年齡的分布情況。

%pylab inline
Populating the interactive namespace from numpy and matplotlib
ages = user_fields.map(lambda x: int(x[1])).collect()
hist(ages, bins=20, color='lightblue', normed=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)

這里 hist 函數的輸入參數有 ages 數組、直方圖的 bins 數目(即區間數,這里為 20),同時,還使用了 normed=True 參數來正則化直方圖,即讓每個方條表示年齡在該區間內的數量占總數量的比。

從中可以看出 MovieLens 的大量用戶處於 15 到 55 之間。

若想了解用戶的職業分布情況,可以用如下代碼來實現。首先利用 MapReduce 方法來計算數據集中各種職業的出現次數,然后用 matplotlib 的 bar 函數來會繪制一個不同職業的數量的條形圖。

count_by_occupation = user_fields.map(lambda fields: (fields[3], 1)).reduceByKey(lambda x, y: x + y).collect()
x_axis1 = np.array([c[0] for c in count_by_occupation])
y_axis1 = np.array([c[1] for c in count_by_occupation])
x_axis = x_axis1[np.argsort(y_axis1)]
y_axis = y_axis1[np.argsort(y_axis1)]

pos = np.arange(len(x_axis))
width = 1.0

ax = plt.axes()
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(x_axis)

plt.bar(pos, y_axis, width, color='lightblue')
plt.xticks(rotation=30)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)

count_by_occupation
[(u'administrator', 79),
 (u'writer', 45),
 (u'retired', 14),
 (u'student', 196),
 (u'doctor', 7),
 (u'entertainment', 18),
 (u'marketing', 26),
 (u'executive', 32),
 (u'none', 9),
 (u'scientist', 31),
 (u'educator', 95),
 (u'lawyer', 12),
 (u'healthcare', 16),
 (u'technician', 27),
 (u'librarian', 51),
 (u'programmer', 66),
 (u'artist', 28),
 (u'salesman', 12),
 (u'other', 105),
 (u'homemaker', 7),
 (u'engineer', 67)]
x_axis1
array([u'administrator', u'writer', u'retired', u'student', u'doctor',
       u'entertainment', u'marketing', u'executive', u'none', u'scientist',
       u'educator', u'lawyer', u'healthcare', u'technician', u'librarian',
       u'programmer', u'artist', u'salesman', u'other', u'homemaker',
       u'engineer'], 
      dtype='<U13')
y_axis1
array([ 79,  45,  14, 196,   7,  18,  26,  32,   9,  31,  95,  12,  16,
        27,  51,  66,  28,  12, 105,   7,  67])

從中可以看出,數量最多的職業是 student、other、educator、administrator、engineer 和 programmer。

Spark 對 RDD 提供了一個名為 countByValue 的便捷函數,它會計算 RDD 里各不同值所分別出現的次數,並將其以 Python dict 函數的形式返回給驅動程序。

count_by_occupation2 = user_fields.map(lambda fields: fields[3]).countByValue()
print "Map-reduce approach:"
print dict(count_by_occupation2)
print ""
print "countByValue approach:"
print dict(count_by_occupation)
Map-reduce approach:
{u'administrator': 79, u'retired': 14, u'lawyer': 12, u'healthcare': 16, u'marketing': 26, u'executive': 32, u'scientist': 31, u'student': 196, u'technician': 27, u'librarian': 51, u'programmer': 66, u'salesman': 12, u'homemaker': 7, u'engineer': 67, u'none': 9, u'doctor': 7, u'writer': 45, u'entertainment': 18, u'other': 105, u'educator': 95, u'artist': 28}

countByValue approach:
{u'administrator': 79, u'executive': 32, u'retired': 14, u'doctor': 7, u'entertainment': 18, u'marketing': 26, u'writer': 45, u'none': 9, u'healthcare': 16, u'scientist': 31, u'homemaker': 7, u'student': 196, u'educator': 95, u'technician': 27, u'librarian': 51, u'programmer': 66, u'artist': 28, u'salesman': 12, u'other': 105, u'lawyer': 12, u'engineer': 67}

可以看到,上述兩種方式的結果相同。

接下來探索電影數據,跟之前一樣,先簡單看下第一行記錄,然后再統計電影總數。

movie_data = sc.textFile("ml-100k/u.item")
print movie_data.first()
num_movies = movie_data.count()
print "Movies: %d" % num_movies
1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
Movies: 1682

在此要繪制電影年齡分布圖,電影年齡即其發行年份相對於現在過了多少年(在本數據中現在是 1998 年)。電影數據有些不完整,需要一個函數來處理解析 release date 時可能出現的解析錯誤。這里命名該函數為 convert_year。

def convert_year(x):
    try:
        return int(x[-4:])
    except:
        return 1900 # there is a 'bad' data point with a blank year, which we set to 1900 and will filter out later

有了以上函數來解析發行年份后,便可在調用電影數據進行 map 轉換時應用該函數,並取回其結果。

movie_fields = movie_data.map(lambda lines: lines.split("|"))
years = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x))
# we filter out any 'bad' data points here
years_filtered = years.filter(lambda x: x != 1900)
# plot the movie ages histogram
movie_ages = years_filtered.map(lambda yr: 1998-yr).countByValue()
values = movie_ages.values()
bins = movie_ages.keys()
hist(values, bins=bins, color='lightblue', normed=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16,10)

從圖中可以看到,大部分電影發行於 1998 年的前幾年。

現實的數據經常會有不規整的情況,對其解析時就需要進一步處理。上面即是一個很好的例子。事實上,這也表明了數據探索的重要性所在,即它有助於發現數據在完整性和質量上的問題。

現在來探索評級數據

rating_data_raw = sc.textFile("ml-100k/u.data")
print rating_data_raw.first()
num_ratings = rating_data_raw.count()
print "Ratings: %d" % num_ratings
196	242	3	881250949
Ratings: 100000

可以看到評級次數共有 10 萬條。另外和用戶與電影數據不同,評分記錄用“\t”分隔。

rating_data = rating_data_raw.map(lambda line: line.split("\t"))
ratings = rating_data.map(lambda fields: int(fields[2]))
max_rating = ratings.reduce(lambda x, y: max(x, y))
min_rating = ratings.reduce(lambda x, y: min(x, y))
mean_rating = ratings.reduce(lambda x, y: x + y) / float(num_ratings)
median_rating = np.median(ratings.collect())
ratings_per_user = num_ratings / num_users
ratings_per_movie = num_ratings / num_movies
print "Min rating: %d" % min_rating
print "Max rating: %d" % max_rating
print "Average rating: %2.2f" % mean_rating
print "Median rating: %d" % median_rating
print "Average # of ratings per user: %2.2f" % ratings_per_user
print "Average # of ratings per movie: %2.2f" % ratings_per_movie
Min rating: 1
Max rating: 5
Average rating: 3.53
Median rating: 4
Average # of ratings per user: 106.00
Average # of ratings per movie: 59.00

從中可以看到,最低的評級為 1,而最大的評級為 5.這並不意外,因為評級的范圍便是從 1 到 5。

Spark 對 RDD 也提供一個名為 states 的函數。該函數包含一個數值變量用於做類似的統計:

ratings.stats()
(count: 100000, mean: 3.52986, stdev: 1.12566797076, max: 5.0, min: 1.0)

可以看出,用戶對電影的平均評級是 3.5 左右,而評分中位數為 4,。說明評級的分布稍傾向高分。要驗證這點,可創建一個評級值分布的條形圖。

# create plot of counts by rating value
count_by_rating = ratings.countByValue()
x_axis = np.array(count_by_rating.keys())
y_axis = np.array([float(c) for c in count_by_rating.values()])
# we normalize the y-axis here to percentages
y_axis_normed = y_axis / y_axis.sum()

pos = np.arange(len(x_axis))
width = 1.0

ax = plt.axes()
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(x_axis)

plt.bar(pos, y_axis_normed, width, color='lightblue')
plt.xticks(rotation=30)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)

其特征和之前的期待相同,評分分布確實偏向中等以上。

同樣,也可以求各個用戶評級次數的分布情況。計算各用戶評級次數的分布時,先從 rating_data RDD 里提取出以用戶 ID 為主鍵、評級為值的鍵值對。之后調用 Spark 的 groupByKey 函數,來對評級以用戶 ID 為主鍵進行分組。

# to compute the distribution of ratings per user, we first group the ratings by user id
user_ratings_grouped = rating_data.map(lambda fields: (int(fields[0]), int(fields[2]))).groupByKey() 

接着求出每一個主鍵(用戶 ID)對應的評級集合的大小,這會給出各用戶評級的次數:

# then, for each key (user id), we find the size of the set of ratings, which gives us the # ratings for that user 
user_ratings_byuser = user_ratings_grouped.map(lambda (k, v): (k, len(v)))
user_ratings_byuser.take(5)
[(1, 272), (2, 62), (3, 54), (4, 24), (5, 175)]

最后,用 hist 來繪制用戶評級分布的直方圖。

# and finally plot the histogram
user_ratings_byuser_local = user_ratings_byuser.map(lambda (k, v): v).collect()
hist(user_ratings_byuser_local, bins=200, color='lightblue', normed=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16,10)

可以看出,大部分用戶的評級次數少於 100,但也表明仍然有較多用戶做出過上百次的評級。

參考資料

  1. Spark入門(Python版)
  2. Ipython-Spark setup for pyspark application
  3. Spark 機器學習》 第 3 章


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM