一、BPR算法的原理:
1、貝葉斯個性化排序(BPR)算法小結
https://www.cnblogs.com/pinard/p/9128682.html
2、Bayesian Personalized Ranking 算法解析及Python實現
https://www.cnblogs.com/wkang/p/10217172.html
3、推薦系統中的排序學習
https://lumingdong.cn/learning-to-rank-in-recommendation-system.html?unapproved=401&moderation-hash=fcc3652917e1688fe59997e11c916297#配對法(Pairwise)
4、Recommender system using Bayesian personalized ranking
https://towardsdatascience.com/recommender-system-using-bayesian-personalized-ranking-d30e98bba0b9
5、推薦算法之貝葉斯個性化排序 BPR
https://www.biaodianfu.com/bpr.html
二、算法中的注意點
根據完整性和反對稱性,優化目標的第一部分
可以簡化為:
如何理解這個轉換:
1、\((u,i,j) \in (U \times I \times I)\)情況下,\(P(i >_u j|\theta)^{\delta((u,i,j) \in Ds)}\)和\((1-P(i >_u j|\theta))^{\delta((u,j,i) \not\in Ds) }\)提供了相同的信息。
這句話又如何理解:
論文中的訓練集\(Ds:=\left\{(u,i,j)| i \in I_u^+ \cap j \in I \backslash I_u^+ \right\}\),由於加了限定條件因為$(u,i,j) $中的 \(i\) 和 \(j\) 都是有限制的。。
其中 \((u,i,j) \in Ds\) 的表示是用戶 \(u\) 更喜歡 \(i\) 勝過\(j\);\(\cap\) 是表示交集;\(I \backslash I_u^+\) 表示物品集\(I\)除正例外其他剩余的樣本,即用戶沒有評分過的item。
\((u, i, j)\)只能取特定的值,而不是遍歷整個三維數組(1, 1, 1)-(m, n, n)。
以\(u_1\)為例,其訓練集合\(Ds={(u1,i2,j1),(u1,i2,j4),(u1,i3,j1),(u1,i3,j4)}\)
根據 \(u1×I×I\),它的負例為:\({(u,j,i)∉ Ds}:={(u1,j2,i1),(u1,j2,i4),(u1,j3,i1),(u1,j3,i4)}\)。
可見\(i,j\)互換了位置。因此當第二部分的指數部分為1時:\((u,j,i) \not\in D_s\),必有\((u,i,j) \in Ds\)。即當負例存在的時候,正例必定存在。所以,二者提供了相同的信息。
2、只有當\((u,i,j) \in Ds\)時,\({\delta((u,i,j) \in Ds =1}\),因此可以完成上述轉換。
三、pytorch實現
源碼來自: https://github.com/guoyang9/BPR-pytorch
1、讀取數據
import numpy as np
import pandas as pd
import scipy.sparse as sp
import torch.utils.data as data
import torch
import torch.nn as nn
import os
import time
dataset = 'ml-1m'
main_path = './Data/'
train_rating = main_path + '{}.train.rating'.format(dataset)
test_rating = main_path + '{}.test.rating'.format(dataset)
test_negative = main_path + '{}.test.negative'.format(dataset)
model_path = './models/'
BPR_model_path = model_path + 'BPR.pth'
# 1、訓練集
train_data = pd.read_csv(train_rating, sep='\t', header=None, names=['user', 'item'],
usecols=[0, 1], dtype={0: np.int32, 1: np.int32})
user_num = train_data['user'].max() + 1
item_num = train_data['item'].max() + 1
train_data = train_data.values.tolist()
# 2、訓練樣本轉換為稀疏矩陣
train_mat = sp.dok_matrix((user_num, item_num), dtype=np.float32)
for x in train_data:
train_mat[x[0], x[1]] = 1.0
# 3、測試集數據讀取,即為每個用戶賦值99個沒評分過的item。 1個評分過的+99個未評分的。
test_data = []
with open(test_negative, 'r') as fd:
line = fd.readline()
while line != None and line != '':
arr = line.split('\t')
u = eval(arr[0])[0]
test_data.append([u, eval(arr[0])[1]])
for i in arr[1:]:
test_data.append([u, int(i)])
line = fd.readline()
2、構建BPR數據類
# 根據繼承pytorch的Dataset類定義BPR數據類
class BPRData(data.Dataset):
def __init__(self, features,
num_item, train_mat=None, num_ng=0, is_training=None):
"""features=train_data,num_item=item_num,train_mat,稀疏矩陣,num_ng,訓練階段默認為4,即采樣4-1個負樣本對應一個評分過的
數據。
"""
super(BPRData, self).__init__()
""" Note that the labels are only useful when training, we thus
add them in the ng_sample() function.
"""
self.features = features
self.num_item = num_item
self.train_mat = train_mat
self.num_ng = num_ng
self.is_training = is_training
def ng_sample(self):
assert self.is_training, 'no need to sampling when testing'
self.features_fill = []
for x in self.features:
u, i = x[0], x[1]
for t in range(self.num_ng):
j = np.random.randint(self.num_item)
while (u, j) in self.train_mat:
j = np.random.randint(self.num_item)
self.features_fill.append([u, i, j])
def __len__(self):
return self.num_ng * len(self.features) if \
self.is_training else len(self.features)
def __getitem__(self, idx):
features = self.features_fill if \
self.is_training else self.features
user = features[idx][0]
item_i = features[idx][1]
item_j = features[idx][2] if \
self.is_training else features[idx][1]
return user, item_i, item_j
3、生成DataLoader
train_dataset = BPRData(
train_data, item_num, train_mat, 4, True)
test_dataset = BPRData(
test_data, item_num, train_mat, 0, False)
train_loader = data.DataLoader(train_dataset,
batch_size=4096, shuffle=True, num_workers=0)
test_loader = data.DataLoader(test_dataset,
batch_size=100, shuffle=False, num_workers=0)
4、定義BPR模型,完成前向傳播過程
class BPR(nn.Module):
def __init__(self, user_num, item_num, factor_num):
super(BPR, self).__init__()
"""
user_num: number of users;
item_num: number of items;
factor_num: number of predictive factors.
"""
self.embed_user = nn.Embedding(user_num, factor_num)
self.embed_item = nn.Embedding(item_num, factor_num)
nn.init.normal_(self.embed_user.weight, std=0.01)
nn.init.normal_(self.embed_item.weight, std=0.01)
def forward(self, user, item_i, item_j):
user = self.embed_user(user)
item_i = self.embed_item(item_i)
item_j = self.embed_item(item_j)
prediction_i = (user * item_i).sum(dim=-1)
prediction_j = (user * item_j).sum(dim=-1)
return prediction_i, prediction_j
model = BPR(user_num, item_num, 16)
model.cuda()
5、定義優化器
import torch.optim as optim
lamb = 0.001
lr = 0.01
optimizer = optim.SGD(model.parameters(), lr=lr, weight_decay=lamb)
6、定義一些評價指標
def hit(gt_item, pred_items):
if gt_item in pred_items:
return 1
return 0
def ndcg(gt_item, pred_items):
if gt_item in pred_items:
index = pred_items.index(gt_item)
return np.reciprocal(np.log2(index+2))
return 0
def metrics(model, test_loader, top_k):
HR, NDCG = [], []
for user, item_i, item_j in test_loader:
user = user.cuda()
item_i = item_i.cuda()
item_j = item_j.cuda() # not useful when testing
prediction_i, prediction_j = model(user, item_i, item_j)
_, indices = torch.topk(prediction_i, top_k)
recommends = torch.take(
item_i, indices).cpu().numpy().tolist()
gt_item = item_i[0].item()
HR.append(hit(gt_item, recommends))
NDCG.append(ndcg(gt_item, recommends))
return np.mean(HR), np.mean(NDCG)
7、訓練和測試
# 6、訓練過程,根據公式得到后驗概率,然后求導,更新兩個矩陣的值。
import os
epochs = 50
top_k = 10
best_hr = 0 # 記錄命中率。
import time
for epoch in range(epochs):
model.train() # 在使用pytorch構建神經網絡的時候,訓練過程中會在程序上方添加一句model.train(),作用是啟用batch normalization和drop out。
start_time = time.time()
train_loader.dataset.ng_sample() # 訓練階段,這一步生成真正的訓練樣本
for user,item_i,item_j in train_loader:
user = user.cuda()
item_i = item_i.cuda()
item_j = item_j.cuda()
model.zero_grad()
prediction_i,prediction_j = model(user,item_i,item_j) # 調用forward()方法
loss = -(prediction_i-prediction_j).sigmoid().log().sum() # 這里是最小化取了負號后對應的對數后驗估計函數。可以使用梯度下降。
loss.backward() # 在這里得到梯度
optimizer.step() # 根據上面得到的梯度進行梯度回傳。
# 一個epoch訓練結束,開始測試
model.eval() # 測試過程中會使用model.eval(),這時神經網絡會沿用batch normalization的值,並不使用drop out。
HR, NDCG = metrics(model, test_loader, top_k)
elapsed_time = time.time() - start_time
print("The time elapse of epoch {:03d}".format(epoch) + " is: " +
time.strftime("%H: %M: %S", time.gmtime(elapsed_time)))
print("HR: {:.3f}\tNDCG: {:.3f}".format(np.mean(HR), np.mean(NDCG)))
if HR > best_hr:
best_hr,best_ndcg,best_epoch = HR,NDCG,epoch
if not os.path.exists(model_path):
os.mkdir(model_path)
torch.save(model,'{}BPR.pt'.format(model_path))
print("End. Best epoch {:03d}: HR = {:.3f}, NDCG = {:.3f}".format(best_epoch, best_hr, best_ndcg))