推薦算法_CIKM-2019-AnalytiCup 冠軍源碼解讀


最近在幫一初創app寫推薦系統,順便學習一波用戶興趣高速檢索的冠軍算法。

寫總結前貼出冠軍代碼的git地址:https://github.com/ChuanyuXue/CIKM-2019-AnalytiCup

該算法分三步:基於Apririo的item_CF、特征提取、排序。

先看第一步,item_CF可以說是很傳統的算法了。該步被作者分為七個部分:

1、generate_user_logs:把用戶分組,並在每個組中統計用戶的行為日志,以方便后續的並行化處理

import multiprocessing as mp
import time
import pandas as pd
import numpy as np

def reduce_mem_usage(df):
    """ iterate through all the columns of a dataframe and modify the data type
        to reduce memory usage.        
    """
    start_mem = df.memory_usage().sum() 
    print('Memory usage of dataframe is {:.2f} MB'.format(start_mem))
    
    for col in df.columns:
        col_type = df[col].dtype
        
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        else:
            df[col] = df[col].astype('category')

    end_mem = df.memory_usage().sum() 
    print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
    print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))
    
    return df

def generate_logs_for_each_group(matrix, q):
    user_log = dict()
    for row in matrix:
        user_log.setdefault(row[0], [])
        user_log[row[0]].append(row[1])
    print('This batc is finished')
    q.put(user_log)

reduce_mem_usage()使用了機器學習中panda輸入數據通用的內存優化方法,通過memory_usage().sum()統計矩陣的內存使用量,np.iinfo().min()/max()分別用於找到一個列表中的最小/大數,這里np.iinfo(int8).min()/max()的作用類似於找到該數據類型的最小/大值,比較矩陣中的數據是否處於該區間,若處於,則通過astype()的方法改變數據類型,使其運算占用的內存空間最小;在循環遍歷的過程中,要保證數據的類型是由范圍由小到大排序的。

generate_logs_for_each_group用於構建用戶日記字典,其中setdefault()用於在用戶日志字典中添加鍵與對應的默認值;

主函數:

data = reduce_mem_usage(pd.read_csv(path+'user_behavior.csv', header=None))
user = pd.read_csv(path+'user.csv', header=None)
item = pd.read_csv(path+'item.csv', header=None)

data['day'] = data[3] // 86400
data['hour'] = data[3] // 3600 % 24

data = data.drop(3, axis=1)

data.columns = ['userID','itemID','behavoir','day','hour']
user.columns = ['userID', 'sex', 'age', 'ability']
item.columns = ['itemID', 'category', 'shop', 'band']

data = data.drop_duplicates(['userID','itemID'],keep="last")
data = data.sort_values(['day','hour'], ascending=True).reset_index(drop=True)

users = list(set(user['userID']))

user_groups = [users[i: i + len(users) // CPU_NUMS] for i in range(0, len(users), len(users) // CPU_NUMS)]

q = mp.Queue()
for groupID in range(len(user_groups)):
    matrix = data[data['userID'].isin(user_groups[groupID])][['userID','itemID']].values
    task = mp.Process(target=generate_logs_for_each_group, args=(matrix, q, ))
    task.start()
    
start_time = time.time()
print('Waiting for the son processing')
while q.qsize() != len(user_groups):
    pass
end_time = time.time()
print("Over, the time cost is:"  + str(end_time - start_time))
for i in range(len(user_groups)):
   temp = q.get()
   f = open('full_logs/userlogs_group' + str(i) + '.txt','w')
   f.write(str(temp))
   f.close()

主函數執行數據的導入與處理。這里要注意的就是整個用戶組根據CPU的數量被分為幾組,以多進程的方式執行generate_logs_for_each_group,生成行首為userID,后跟itemID的矩陣;

最后將生成的用戶日志寫入文件。

2、generate_hot_table:生成中間文件hot_map(統計的商品的出現次數)、upwardmap與downward_map(商品id映射到實數集[0, m],其中m代表商品總數)與usersActivity_map(用戶活躍度)。

import pandas as pd
import numpy as np

# round2 train的路徑
path = '../ECommAI_EUIR_round2_train_20190816/'
data = pd.read_csv(path + 'user_behavior.csv',header=None)

data.columns = ['userID','itemID','behavior','timestamp']
data['day'] = data['timestamp'] // 86400
data['hour'] = data['timestamp'] // 3600 % 24

user_times = data[['itemID','userID']].groupby('userID', as_index=False).count()

user_times.columns = ['userID','itemCount']

user_times_map = dict(zip(user_times['userID'], user_times['itemCount']))

len(user_times_map)

f = open('usersActivity_map.txt', 'w')
f.write(str(user_times_map))
f.close()

 這段代碼使用groupby().count()對數據按userID進行分類,並對每組數目進行統計,統計所得的數標簽為'itemCount',隨后與userID一起被寫入字典,作為用戶活躍度map寫入文件;

from sklearn import preprocessing
le = preprocessing.LabelEncoder()
item['encoding'] = le.fit_transform(item['itemID'])

upward_map = dict(zip(item['itemID'], item['encoding']))
downward_map = dict(zip(item['encoding'], item['itemID']))

在這里preprocessing.LabelEncoder()被用於給商品編號,fit_transform對商品編號序列進行歸一化處理,最后生成商品id映射。

temp = data[['itemID','behavior']].groupby('itemID',as_index=False).count()
hot_map = dict(zip(temp['itemID'], temp['behavior']))

groupby().count()用法同上。按商品id分組統計,算出商品出現次數。

文件的最后是保存文件函數,不多做解釋。

def save_to_file(trans_map, file_path):
    trans_map = str(trans_map)
    f = open(file_path, 'w')
    f.write(trans_map)
    f.close()

save_to_file(hot_map,'hot_items_map.txt')

save_to_file(upward_map,'upward_map.txt')

save_to_file(downward_map,'downward_map.txt')

 3、generate_original_matrix:統計每個group中的相似度矩陣。

import pandas as pd
import numpy as np
import sys
from scipy.sparse import lil_matrix
import scipy as scp
import time
%load_ext Cython

ITEM_NUM = 4318201

def get_logs_from_hardisk(path):
    f = open(path, 'r')
    a = f.read()
    dict_name = eval(a)
    f.close()
    return dict_name


f = open('usersActivity_map.txt', 'r')
m = f.read()
user_times_map = eval(m)
f.close()

導入數據,不多解釋。

import datetime
import math

cpdef calculate_matrix(mat, list user_logs, dict user_times_map):
    cdef int index, i1, i2, count
    cdef list item_log
    cdef tuple u
    
    count = 0
    for u in user_logs:
        count += 1
        if count % 1000 == 0:
            print('The %d'%count + ' users are finished.')
            print(datetime.datetime.now().strftime('%H:%M:%S'))
            
        item_log = u[1]   

        for index, i1 in enumerate(item_log):
            for i2 in item_log[(index+1): ]:
                weight = 1/(math.log(1+user_times_map[u[0]]))
                mat[i1, i2] += weight
                mat[i2, i1] += weight
    return mat

這部分數據用於計算物品相似度矩陣,在第一個文件生成用戶行為日志中,對每一行進行計算,每個用戶的item序列對應每行中第二個元素開始的列表;循環遍歷物品列表,將物品與所有它之后的物品的相似度加w,w為用戶對推薦系統的貢獻度,由之前的第二個文件生成的用戶活躍度表提供參數,計算公式為:

$w_u=\frac{1}{log(l_u+1)}$

其中l_u為用戶活躍度。下面是計算相似度矩陣的主函數:

user_logs = get_logs_from_hardisk('full_logs/userlogs_group0.txt')
f = open('upward_map.txt','r')
upward_map = eval(f.read())
f.close()
for u in user_logs:
    user_logs[u] = [int(upward_map[x]) for x in user_logs[u]]
user_logs = list(user_logs.items())


for i in range(0, len(user_logs), 10000):
    print('The %d '%i + ' batch is started!')
    print('--------------------------------')
    mat = lil_matrix((ITEM_NUM+1, ITEM_NUM+1), dtype=float)
    mat = calculate_matrix(mat, user_logs[i: i + 10000], user_times_map)
    scp.sparse.save_npz('tmpData/sparse_matrix_%d_batch_group0.npz'%i, mat.tocsr())
    print('save successfully')
    print('--------------------------------')

調入用戶日志,調入itemID-實數的文件(用於規范矩陣、減小矩陣規模),將用戶日志中的itemID用1~m的實數代替;使用lil_matrix()新建矩陣,分組計算物品相似度,並將相似度矩陣文件保存,第三步操作到此結束。

 4、Merge:將多個group中的相似性矩陣合並。

import pandas as pd
import numpy as np
from scipy.sparse import *
import scipy
import os

path = 'tmpData/'

lenth = 359850
for i in range(0, 12):
    mat = None
    start = lenth * i
    end = lenth * (i + 1)
    count = 0
    for name in os.listdir('tmpData/'):
        if name[-3:] == 'npz':
            if mat == None:
                mat = load_npz(path + name)[start: end]
            else:
                mat += load_npz(path + name)[start: end]
        count += 1
    scipy.sparse.save_npz('commonMatrix/common_matrix_from_%d_to_%d.npz'%(start, end), mat)
    print('save success for %d batch'%i)
    print('-------------------------')

功能官方文檔寫的很明顯了。主要思想就是分段合並相加相似度矩陣。

5、Save_sparse_to_dense:將相似性矩陣轉化為可供快速檢索的哈希結構。

import numpy as np
from scipy.sparse import *
import os

os.listdir('commonMatrix/')

for name in os.listdir('commonMatrix/'):
    mat = load_npz('commonMatrix/' + name).tolil()
    l = []
    for i in range(mat.shape[0]):
        _, a, b = find(mat[i])
        index = np.where(b > 1.5)
        #l.append(sorted(list(zip(a[index], b[index])),key= lambda x:x[1], reverse=True))
        
        c = np.array( [round(x,3) for x in b] )
        l.append(sorted(list(zip(a[index], c[index])),key= lambda x:x[1], reverse=True))
        
    l = str(l)
    f = open('common_dense_valued_small/' + name, 'w')
    f.write(l)
    f.close()
    print('finished')

 這段矩陣轉哈希表程序首先讀入相似度矩陣,對矩陣的每一行進行處理,使用mat()將數組轉換成矩陣,使用find()找到非零元素的下標並儲存值,下一步將非零元素中大於1.5的元素排序(我也不知道為什么要取1.5 可能是為了降低運算量而作的剪枝),地址和數值被加入表項中。

 6、6_Sta_for_SparseMatrix:用於對相似度的改進

import pandas as pd
import numpy as np
import time
from scipy.sparse import *
import os
import re

## 統計每個商品的打分次數(用train)
f = open('hot_items_map.txt', 'r')
rating_times_map = eval(f.read())
f.close()


item_dict = {}

for name in os.listdir('common_dense_valued_small/'):
    start_time = time.time()
    f = open('common_dense_valued_small/' + name, 'r')
    l = f.read()
    l = eval(l)
    f.close()
    end_time = time.time()
    print('load file: %d sec'%((end_time - start_time)))    
    
    name = re.findall(r'\d+', name)
    start = int(name[0])
    end = int(name[1])

    
    start_time = time.time()
    
    
    for i in range(start, end):
        tmp_list = []
        [tmp_list.append( (x[0], round(x[1] / rating_times_map[i], 4) ) ) for x in l[i - start] if x[0] != i]
        if len(tmp_list) > 0:
            item_dict[i] = sorted(tmp_list,key=lambda x:x[1], reverse=True)[:500]
            
    end_time = time.time()
    print('This batch is finished, time cost: %d sec'%((end_time - start_time)))

這個文件涉及正則表達式和上一個文件生成的哈希表,首先讀入之前生成的商品熱度表與哈希表,通過正則表達式findall()讀出文件名中的開始item序號與結束item序號(文件命名按分組區間,如:'common_matrix_from_3598500_to_3958350.npz')遍歷這個區間內item的哈希表,每個item i、j之間的相似度都除以i的商品熱度,作為熱度過高的商品的懲罰(若一件商品很熱門,則不管用戶喜不喜歡,用戶對其作出行為的可能性都很大,極有可能出現在不同用戶的行為列表中,這樣會使得相似度偏大)。將懲罰處理過的哈希表裝入item_dict中。

最后,將生成的item_dict寫入文件。

f = open('item_Apriori.txt','w')
f.write(str(item_dict))
f.close()

7、generate_recall:官方沒有寫特別說明,個人理解為這是最后的數據召回。

 

import pandas as pd
import numpy as np

def load_data(path):
    user = pd.read_csv(path + 'user.csv',header=None)
    item = pd.read_csv(path + 'item.csv',header=None)
    data = pd.read_csv(path + 'user_behavior.csv',header=None)

    data.columns = ['userID','itemID','behavior','timestamp']
    data['day'] = data['timestamp'] // 86400
    data['hour'] = data['timestamp'] // 3600 % 24

    ## 生成behavior的加權
    data['day_hour'] = data['day'] + data['hour'] / float(24)
    data.loc[data['behavior']=='pv','behavior'] = 1
    data.loc[data['behavior']=='fav','behavior'] = 2
    data.loc[data['behavior']=='cart','behavior'] = 3
    data.loc[data['behavior']=='buy','behavior'] = 1
    max_day = max(data['day'])
    min_day = min(data['day'])
    data['behavior'] = (1 - (max_day-data['day_hour']+2)/(max_day-min_day+2)) * data['behavior'] 

    item.columns = ['itemID','category','shop','brand']
    user.columns = ['userID','sex','age','ability']

    data = pd.merge(left=data, right=item, on='itemID',how='left')
    data = pd.merge(left=data, right=user, on='userID',how='left')

    return user, item, data

首先還是熟悉的數據導入操作,與前面不同的是這里加入了用戶行為權值,通過.loc方法將數值賦值到數據矩陣相應的行。最后使用merge()對用戶行為數據、用戶數據與商品數據進行拼接,這樣一條信息就包含用戶(ID、性別、年齡etc)、商品(品牌、價格、店家etc)、行為信息了。

 

def get_recall_list(train, targetDay, k=300):
    train_logs = dict()
    
    if targetDay > max(train['day']):
        for row in train[['userID','itemID','behavior']].values:
            train_logs.setdefault(row[0], dict())
            if row[1] in upward_map:
                train_logs[row[0]].setdefault(upward_map[row[1]],0)
                train_logs[row[0]][upward_map[row[1]]] = max(train_logs[row[0]][upward_map[row[1]]],row[2])
    else:
        user_List_test = set(train.loc[train['day']==targetDay,'userID'])
        train = train[train['day'] < targetDay]
        
        for row in train[['userID','itemID','behavior']].values:
            if row[0] in user_List_test:
                train_logs.setdefault(row[0], dict())
                if row[1] in upward_map:
                    train_logs[row[0]].setdefault(upward_map[row[1]],0)
                    train_logs[row[0]][upward_map[row[1]]] = max(train_logs[row[0]][upward_map[row[1]]],row[2])

    for each_user in train_logs:
        sum_value = sum(train_logs[each_user].values())
        if sum_value > 0:
            for each_item in train_logs[each_user]:
                train_logs[each_user][each_item] /= sum_value            

    result_logs = dict()    
    for u in train_logs:
        result_logs.setdefault(u, list())
        for i in set(train_logs[u].keys()):
            if i in item_dict:
                tmp_list = [ (x[0], train_logs[u][i]*x[1]) for x in item_dict[i]]
                result_logs[u] += tmp_list
            
    for u in result_logs:
        result_logs[u] = get_unique_inorder([(downward_map[x[0]], x[1]) for x in sorted(result_logs[u], key=lambda x:x[1], reverse=True)
                          if x[0] not in train_logs[u]], k=300)  
    
    return result_logs

這部分可以說是代碼中的核心部分了,倘若要計算的日期大於最大時間戳,則在訓練文件中加入用戶ID-空字典的鍵值對,如果該條信息中用戶作出行為,則在itemID-編號字典中找出編號作為用戶ID-dict()中,字典的鍵,之前的行為權值取最大值作為值加入。

若預測天數小於最大時間戳,則取數據矩陣中天數是目標天數以前的數據,取目標天數數據作為測試集(代碼示例中並未用到);之后所做的與預測大於最大時間戳的操作相同。

對於訓練集[(uesrID,itemID, value),……]中的每個user,首先將其鍵下所有的值相加,在用各個值除以相加所得的和,這步起到數據歸一化的作用。

然后是生成對於每個用戶物品最終分值的一步,每個用戶所對應的每批產生過行為的商品,將歸一化的行為權值乘以矩陣中每個與每個物品的相似度,所得結果就是對這個用戶而言,每個商品的興趣度,以[用戶,[(商品,興趣度)……]]排列。

接着將生成的序列降序排序,同時調用downward_map將編號映射回itemID,使用get_unique_inorder()去重,最終result_logs就是對每個用戶來說興趣度最大的300個商品列表矩陣。

最后是幾個格式變換函數:

def generate_pairs(recall):
    result = []
    for u in recall:
        for i in recall[u]:
            result.append([u,i[0],i[1]])
    return result

def reshape_recall_to_dataframe(recall):
    result = generate_pairs(recall)
    result = pd.DataFrame(result)
    result.columns = ['userID','itemID','apriori']
    return result

在主函數中,召回矩陣首先從[ [用戶,[(商品,興趣度)……] ] ……]的格式轉換為[ [用戶,商品,興趣度]……],再被加注標簽,最后寫入csv文件:

#path = './'
path = '../ECommAI_EUIR_round2_train_20190816/'

## The target date(16 means online, 15 means underline test, 14 means underline train)
targetday = 15

## The lenth of recall list, the default is 300
lenth = 300

## The name of generated recall file

name = 'recall_list_round2_%dday_%dlenth.csv'%(targetday, lenth)


user, item, data = load_data(path = path)   

#tempory_path = './tempory_file/'
tempory_path = './'
f = open(tempory_path + 'upward_map.txt','r')

upward_map = f.read()
upward_map = eval(upward_map)
f.close()
    
f = open(tempory_path + 'downward_map.txt','r')
downward_map = f.read()
downward_map = eval(downward_map)
f.close()

f = open(tempory_path + 'item_Apriori.txt','r')
tmp = f.read()
item_dict = eval(tmp)
f.close()

recall_logs = get_recall_list(data, targetDay=targetday, k=lenth)

recall_df = reshape_recall_to_dataframe(recall_logs)

temp = pd.merge(left=recall_df, right=data[data['day'] == targetday][['userID','itemID','behavior']], 
         on=['userID','itemID'], how='left').rename(columns={'behavior':'label'})

len(set(recall_df['userID']) & set(data[data['day'] == targetday]['userID']))

len(set(recall_df['userID']))

recall_df.to_csv(name, index=False)

其中還穿插着一些行為引入的操作;至此,基於apriori的itemCF源碼分析就結束了。

給我的感受是整個itemCF中數學統計的原理非常簡單,代碼的精巧體現在對多維度數據的處理和利用上。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM