Taste 是 Apache Mahout 提供的一個協同過濾算法的高效實現,它是一個基於Java實現的可擴展的高效的推薦引擎。
該推薦引擎是用<userid,itemid,preference>這樣簡單的數據格式表達用戶對物品的偏好。
以此為輸入數據,計算后就可以得到為每個user推薦的items列表。
他提供了方便的單機版的編程接口,也提供了基於hadoop的分布式的實現。
單機版的編程接口主要適用於寫demo和做算法的評估,若處理大規模數據,還是需分布式的實現。
以下是對org.apache.mahout.cf.taste.hadoop.item.RecommenderJob的各MapReduce步驟的一個解讀。
Taste 實現一個分布式的協同過濾推薦共經歷了如下12個MapReduce步驟。
以下分析了各步驟的Mapper和Reducer都做了哪些工作,並有什么格式的數據輸出。
代碼分析:
1、計算item的itemid_index和最小itemid值
1.1、ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,
用原始輸入,將userid,itemid,pref數據轉成itemid_index,itemid
1.2、ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,
在itemid_index,Iterator<itemid>中找最小的itemid,輸出itemid_index,minimum_itemid
此處只是保存一個int型的itemid_index索引和對應的long型的itemid的映射
2、計算各user的item偏好向量,即Vector<item,pref>
2.1、ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,
用原始輸入,讀入偏好數據,得到userid,<itemid,pref>
2.2、ToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class,
將userid,Iterator<itemid,pref>中的itemid變成itemid_index,得到userid,Vector<itemid_index,pref>,后者用RandomAccessSparseVector來存。
3、統計數據中有多少個user
3.1、CountUsersMapper.class,CountUsersKeyWritable.class,VarLongWritable.class,
用步驟2的輸出,統計獨立userid數目,先轉換數據為userid,userid
3.2、CountUsersReducer.class,VarIntWritable.class,NullWritable.class,
通過CountUsersPartitioner將所有數據發到一個區,一個Reducer來處理
由於userid都已排序,所以可以用極簡單的方式來統計出獨立userid數
輸出只有一個值,即用戶數
4、計算item的user偏好向量,即Vector<userid,pref>,也即拿步驟2的結果做矩陣的修剪和轉置
4.1、MaybePruneRowsMapper.class,IntWritable.class,DistributedRowMatrix.MatrixEntryWritable.class,
用步驟2的輸出,按指定的maxCooccurrences參數值來修剪Vector的數目,目的是控制計算的規模,減少計算量
然后轉為以userid_index為列號、itemid_index為行號、pref為值的矩陣,用MatrixEntryWritable表示矩陣。
輸出為itemid_index,Matrix<userid_index,itemid_index,pref>
4.2、ToItemVectorsReducer.class,IntWritable.class,VectorWritable.class,
輸出為itemid_index,Vector<userid_index,pref>,相當於對步驟2的結果進行了矩陣的轉置,
有了偏好矩陣數據,接下來會調用RowSimilarityJob來計算行的相似度
此處的行是item,所以默認是item-base的CF。
但其實可以通過傳入是否轉置的參數來對步驟1進行調整,將userid和itemid轉換,就可以實現user-base的CF。
此處也可以通過similarityClassname參數來指定用哪種算法來計算相似度。
RowSimilarityJob將通過接下來的3個步驟來實現:
5、用相似度算法給向量賦權
5.1、RowWeightMapper.class,VarIntWritable.class,WeightedOccurrence.class,
用相應的相似度算法來計算步驟4的輸出,計算每個itemid_index所對應的Vector<userid_index,pref>的weight。
輸出為userid_index,WeightedOccurrence<itemid_index,pref,weight>,WeightedOccurrence是一個簡單的數據封裝類。
5.2、WeightedOccurrencesPerColumnReducer.class,VarIntWritable.class,WeightedOccurrenceArray.class,
將Iterator<WeightedOccurrence>簡單變為WeightedOccurrenceArray,后者只是簡單繼承了ArrayWritable。
最后輸出結果為userid_index,WeightedOccurrenceArray,數組的數據項是WeightedOccurrence<itemid_index,pref,weight>
6、用相似度算法計算相似度,得到相似度矩陣
6.1、CooccurrencesMapper.class,WeightedRowPair.class,Cooccurrence.class,
取出步驟5的結果,將WeightedOccurrenceArray的數據雙重循環,拼裝如下的KV數據結構
WeightedRowPair<itemid_indexA,itemid_indexB,weightA,weightB>,Cooccurrence<userid_index,prefA,prefB>
6.2、SimilarityReducer.class,SimilarityMatrixEntryKey.class,DistributedRowMatrix.MatrixEntryWritable.class,
此步驟的Map輸出,也即Reduce的輸入是WeightedRowPair<itemid_indexA,itemid_indexB,weightA,weightB>, Iterator<Cooccurrence<userid_index,prefA,prefB>>
也即itemA和itemB的weight,以及不同user對itemA和itemB的pref。
相應的Similarity實例就可以利用以上數據計算itemA與itemB的相似度評分similarityValue
輸出結果為SimilarityMatrixEntryKey<itemid_indexA,similarityValue>,Matrix<itemid_indexA,itemid_indexB,similarityValue>
也就是不同item和itemA的倆倆相似度,得到一個相似度矩陣
7、將相似度矩陣轉為向量存儲
7.1、Mapper.class,SimilarityMatrixEntryKey.class,DistributedRowMatrix.MatrixEntryWritable.class,
將步驟6的結果簡單讀入,item相似度矩陣
7.2、EntriesToVectorsReducer.class,IntWritable.class,VectorWritable.class,
輸出為itemid_indexA,Vector<itemid_indexX,similarityValue>,Vector用SequentialAccessSparseVector存儲。
也就是輸出為不同的其他item與itemA之間的相似度值
8、PartialMultiply的預處理1,填充vector部分的數據
8.1、SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
用步驟7的相似度數據,輸出itemid_index,VectorOrPrefWritable(vector,null,null)
8.2、Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
默認Reducer,直接輸出Mapper的輸出
9、PartialMultiply的預處理2,填充userid和pref部分的數據
9.1、UserVectorSplitterMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
如果提供了一個userid列表文件,Mapper初始化時會先讀入該文件到FastIDSet<userid>中
如果userid不在這個Set中,則會直接return,也就是只會為該列表中的user做推薦
用步驟2的用戶對各item的偏好數據,輸出itemid_index,VectorOrPrefWritable(null,userid,pref)
9.2、Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
默認Reducer,直接輸出Mapper的輸出
10、拼裝兩個PartialMultiply預處理的數據
10.1、Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
用FileInputFormat.setInputPaths指定多個路徑,將步驟8和9的輸出同時作為輸入
10.2、ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
將VectorOrPrefWritable(vector,null,null)和VectorOrPrefWritable(null,userid,pref)
變為VectorAndPrefsWritable(vector,List<userid>,List<pref>)
最后的輸出是itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>)
11、如果設置了item過濾文件則讀取,作為黑名單
11.1、ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,
簡單讀入item過濾文件,輸出為itemid,userid,這相當於“黑”名單,用於后面推薦結果的過濾。
11.2、ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
輸出為itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>)
其中vector的值為vector(itemid_index,Double.NaN),pref的值都用1.0f來填充。
注意,vector的第二項數據,也即similarityValue被設置為Double.NaN,后面將會用這個來判斷這是否是黑名單。
12、用相似度矩陣的PartialMultiply做推薦計算
12.1、PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
如果步驟11存在,則用FileInputFormat.setInputPaths指定多個路徑,將步驟10和11的輸出同時作為輸入
也即輸入為itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>),其中vector的值為Vector<itemid_index,similarityValue>
輸出為userid,PrefAndSimilarityColumnWritable(pref,vector<itemid_index,similarityValue>)
12.2、AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
初始化時,會讀入步驟1的結果,是一個HashMap<itemid_index,itemid>,也即index和itemid的映射
若設置了item白名單文件,則初始化時也會讀入文件到FastIDSet<itemid>,推薦結果必須在這里邊。和步驟11的黑名單相反。
Reducer在處理時會區分是否是booleanData而用不同的處理邏輯,此處我們主要討論非booleanData,也即有實際pref數據的情況而不是默認用1.0f來填充的pref。
Reducer中進行PartialMultiply,按乘積得到的推薦度的大小取出最大的幾個item。
處理的過程中需要將itemid_index通過HashMap轉換回itemid,並且用“黑”“白”名單進行過濾。
白名單很容易理解,用集合是否為空和集合的contains();
黑名單是判斷Float.isNaN(similarityValue),因為此前在步驟11的輸出時黑名單的similarityValue被設置為了Double.NaN。
對於非booleanData,是用pref和相似度矩陣的PartialMultiply得到推薦度的值來進行排序。
而booleanData的pref值都是1.0f,所以去計算矩陣相乘的過程沒有意義,直接累加相似度的值即可。
用這個數據排序就可得到推薦結果。
輸出為userid,RecommendedItemsWritable,后者實際是List<RecommendedItem<itemid,pref>>,
這里的pref是相似度矩陣的PartialMultiply或是相似度累加計算出來的值而非實際值。
后注:
以上提到的FastIDSet,SequentialAccessSparseVector,RandomAccessSparseVector等等數據結構,
是Mahout提供的一些大數據量存儲和處理的一些高效實現,
針對數據的特點而做的有針對性的優化,同時解決性能和空間的問題。
在Mahout in Action的討論CF和Cluster等的“數據的表達”章節中有專門的闡述,此處不再詳細解釋。