在(1)中介紹了FM模型的理論和python實現二分類模型。作為用於CTR預估的模型之一,FM重點在於實現ctr。
一、數據集
電商數據中的用戶行為日志數據。召回完成,在排序階段,需要考慮用戶特征和物品特征,用戶特征來源於用戶畫像,物品特征來源於物品自身固有屬性;用戶畫像一部分是通過物品畫像得到。
1、物品畫像
在電商領域,以臍橙為例,物品畫像通常包含如下維度:
1、關鍵詞:商品標題和詳情頁的文字部分提取關鍵詞(topN),其數據格式為 keyword = [(光滑,0.32),(飽滿,0.34),...],
2、實體詞:商品標題和詳情頁的文字部分提起實體詞(topN),其數據格式為:entity=["雲南省","贛南",...],
3、價格:商品本身的價格是一個連續型特征,進行分桶處理為類別特征,如果划分10個區間,則商品的價格為10個特征之一:price=[0,0,1,0,...],
4、分類:臍橙本身又分為了多個種類,每個臍橙屬於一個分類。如果區分大分類和小分類,則可得到兩個分類特征。分類為類別特征:category=[0,1,0,...],
5、產地: 商品產地為類別特征:area=[0,1,0,...],
如果商品有和時間或者節氣、節日等強相關的特征,可以將其加入物品畫像。
2、用戶畫像
很明顯,用戶畫像是基於物品畫像的。用戶購買、收藏、點擊等行為日志通過ios端或者android客戶端埋點獲得,過濾清洗之后存入hdfs,供后續推薦算法使用。
1、關於日志
問題1:需要考慮用戶哪些行為是有價值的。很明顯,用戶購買、收藏了某個商品,他是喜歡這個商品的。那么構建用戶畫像只使用這類日志可以么?答案是No,因為對於一個日活有限的電商平台而言,這類日志很少。一般要加上瀏覽日志。
問題2:用戶瀏覽、購買、收藏了一個商品,產生了2+N(代表多次瀏覽該商品)條日志,日志如何處理。一個方案,只保留購買行為的日志,因為購買和用戶喜歡是最強相關的。
問題3:日志時間。如果喜歡瀏覽了商品A2秒,商品B20秒,如何確定其喜好。設定閾值,用戶點進去一個商品然后快速返回,不能表示其喜好該商品。
2、用戶畫像字段
這里,用戶畫像直接使用物品畫像進行構建,放棄了用戶自身屬性(年齡、性別等)。因為這些屬性大部分用戶都為空。實際場景中,很多屬性是app不能獲得的。
1、關鍵詞:用戶操作過的商品的關鍵詞,按照權重加權求和。數據格式為 keyword = [(光滑,1.32),(飽滿,2.34),...],
2、實體詞:用戶操作過的商品的實體詞,按照實體詞總數取分數。數據格式為:entity=[("雲南省,0.1"),("贛南",0.2),...],
3、價格:用戶的價格畫像為每個價格區間的比例:price=[("0-100",0.2),("100-500",0.4),...],
4、分類:用戶分類為操作過的商品的每個分類的比例:category=[(1,0.2),(2,0.1),...],
5、產地: 用戶操作過的商品的產地的分布:area=[("雲南省",0.1),("安徽省",0.3),...]。
3、模型輸入向量生成
用戶向量+物品向量
假設有2000個關鍵詞,1000個實體詞,10種價格區間,10個分類,50個產地,則最終的用戶向量維度為:\(2000+1000+10+10+50=3070\),物品向量維度為3070。
ps:這里沒有使用物品之外的特征,比如時間信息、app相關信息,行為信息等數據。
故,最終的模型輸入特征向量:input_vector = np.zeros(6140+1,dtype=np.float),然后在對應特征位置賦值。
生成的numpy數組保存為xxx.npy
二、torch實現FM
用於CTR時,模型輸出為sigmoid之后的概率值:[0,1]。
分為幾個模塊
1、數據集加載
import torch
from torch.utils.data import Dataset
import numpy as np
from dataprocess import DataLoad # 自定義的npy數據讀取類
class CtrDataset(Dataset):
"""
Custom dataset class for dataset in order to use efficient
dataloader tool provided by PyTorch.
"""
def __init__(self, train=True,split_=0.8):
"""
Initialize file path and train/test mode.
Inputs:
- train: bool.是否為訓練階段
- split_: 訓練數據比例。
"""
self.train = train
train_data,test_data = DataLoad().split_sample(split_)
if self.train:
self.train_x = train_data[:, :-1]
self.train_y = train_data[:, -1]
else:
self.test_x = test_data[:,:-1]
self.test_y = test_data[:,-1]
def __getitem__(self, idx):
'''
self.train_data的值:[[0,1,...],[],...],y要修改為:[[1],[0],...]的格式。
'''
if self.train:
dataI, targetI = self.train_x[idx, :], self.train_y[idx]
targetI = np.array(targetI)
targetI = torch.from_numpy(targetI)
targetI = torch.unsqueeze(targetI,-1)
return dataI,targetI
else:
dataI, targetI = self.test_x[idx, :], self.test_y[idx]
targetI = np.array(targetI)
targetI = torch.from_numpy(targetI)
targetI = torch.unsqueeze(targetI, -1)
return dataI, targetI
def __len__(self):
if self.train:
return len(self.train_x)
else:
return len(self.test_x)
2、 DataLoader加載數據
train_data = CtrDataset( train=True,split_=split_)
test_data = CtrDataset( train=True,split_=split_)
loader_train = DataLoader(train_data, batch_size=50,
shuffle=True)
常用操作有:batch_size(每個batch的大小), shuffle(是否進行shuffle操作), num_workers(加載數據的時候使用幾個子進程)。
3、選擇使用CPU還是GPU進行訓練
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
4、FM模型
class FMLayer(nn.Module):
def __init__(self, n=10, k=5):
"""
:param n: 特征維度
:param k: 隱向量維度
"""
super(FMLayer, self).__init__()
self.dtype = torch.float
self.n = n
self.k = k
self.linear = nn.Linear(self.n, 1) # 前兩項線性層
'''
torch.nn.Parameter是繼承自torch.Tensor的子類,其主要作用是作為nn.Module中的可訓練參數使用。它與torch.Tensor的區別就是nn.Parameter會自動被認為是module的可訓練參數,即加入到parameter()這個迭代器中去;而module中非nn.Parameter()的普通tensor是不在parameter中的。
注意到,nn.Parameter的對象的requires_grad屬性的默認值是True,即是可被訓練的,這與torth.Tensor對象的默認值相反。
在nn.Module類中,pytorch也是使用nn.Parameter來對每一個module的參數進行初始化的。'''
self.v = nn.Parameter(torch.randn(self.n, self.k)) # 交互矩陣
nn.init.uniform_(self.v, -0.1, 0.1)
def fm_layer(self, x):
# x 屬於 R^{batch*n}
linear_part = self.linear(x)
#print("linear_part",linear_part.shape)
# linear_part = torch.unsqueeze(linear_part, 1)
# print(linear_part.shape)
# 矩陣相乘 (batch*p) * (p*k)
inter_part1 = torch.mm(x, self.v) # out_size = (batch, k) # 矩陣a和b矩陣相乘。 vi,f * xi
# 矩陣相乘 (batch*p)^2 * (p*k)^2
inter_part2 = torch.mm(torch.pow(x, 2), torch.pow(self.v, 2)) # out_size = (batch, k)
# 這里torch求和一定要用sum
inter = 0.5 * torch.sum(torch.sub(torch.pow(inter_part1, 2), inter_part2),1,keepdim=True)
#print("inter",inter.shape)
output = linear_part + inter
output = torch.sigmoid(output)
#print(output.shape) # out_size = (batch, 1)
return output
def forward(self, x):
return self.fm_layer(x)
上述為FM公式的torch版本。作為網絡模型,還需要定義損失函數和訓練過程。
模型輸出已經是經過sigmoid的概率值,直接使用交叉熵作為損失函數。
def fit(self,data,optimizer,epochs=100):
"""
Training a model and valid accuracy.
Inputs:
- loader_train: I
- optimizer: Abstraction of optimizer used in training process, e.g., "torch.optim.Adam()""torch.optim.SGD()".
- epochs: Integer, number of epochs.
"""
criterion = F.binary_cross_entropy
for epoch in range(epochs):
for t, (batch_x, batch_y) in enumerate(data):
batch_x = batch_x.to(device)
batch_y = batch_y.to(device)
total = self.forward(batch_x)
loss = criterion(total, batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
loader_test = DataLoader(test_data, batch_size=50,
shuffle=True)
r = self.test(loader_test)
print('Epoch %d , loss = %.4f' % (epoch, r))
def test(self,data):
'''
測試集測試
:return:
'''
criterion = F.binary_cross_entropy
all_loss = 0
i = 0
for t, (batch_x, batch_y) in enumerate(data):
batch_x = batch_x.to(device)
batch_y = batch_y.to(device)
total = self.forward(batch_x)
loss = criterion(total, batch_y)
all_loss += loss.item()
i += 1
return all_loss/i
三、模型訓練和保存
使用flask作為web服務框架。
為了線上部署,使用torchscript進行模型的保存。
https://pytorch.org/tutorials/beginner/Intro_to_TorchScript_tutorial.html
https://discuss.pytorch.org/t/infer-torch-model-via-gunicorn-wsgi/60437
fm = FMLayer(n=features,k=30)
fm = fm.to(device)
optimizer = optim.Adam(fm.parameters(), lr=1e-4, weight_decay=0.0)
fm.fit(loader_train, optimizer, epochs=100)
fm = fm.to("cpu")
temp = torch.zeros((1,6140))
traced_model = torch.jit.trace(fm,temp)
torch.jit.save(traced_model, 'model.pt')
使用torch.save(model,path)進行保存的模型,在加載的使用,要求可以找到原始的FMLayer類,直接xx.py沒有問題。但是,如果web服務使用gunicorn進行啟動,就會報錯:
AttributerError:Can't get attribute 'FMLayer' on <module '__main__' from '/usr/local/bin/gunicorn'
因為:torch.load(model_path)的時候,需要在當前位置有模型類。而使用gunicorn的時候,它會在gunicorn那里尋找模型類。
使用torch.jit.load(model_path, map_location='cpu')可以不用在當前位置有對應的模型類。
四、線上部署
對外提供api接口,接收輸入數據:用戶id和召回算法得到的物品id,返回排序后的物品id列表。
使用docker部署注意事項:
1、完整的requirements.txt
2、gunicorn 的配置 daemon = "false"
3、時區改變:RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo 'Asia/Shanghai' >/etc/timezone
4、主機和容器數據同步,日志和新的模型文件.
volumes: #映射的數據卷
- ./app:/www/web
- ./nginx/conf:/etc/nginx
- ./nginx/logs:/www/web_logs
五、模型更新和線上服務更新
使用每天的日志訓練模型並實時更新線上模型,通過flask_apscheduler模塊在web服務中執行定時任務。
