【夢溪筆談】1.模型訓練及線上部署相關筆記


1.前言

算法工程師不僅要搭建模型,還要對模型進行優化及相關線上部署。這里面涉及到很多方面:特征處理(獨熱編碼、歸一化)、自定義損失函數、自定義評價函數、超參調節、構建pipeline流程線上部署(100ms返回要求)等。

 

2.跑模型前准備

2.1 獨熱編碼

對於LR模型,進行獨熱編碼(類別型)和歸一化(數值型)是很有必要的。比如某個類別型特征的枚舉值是0到9(類別型再輸入到模型前都會做label enconding,比如有三個枚舉值:A、B、C,通過label enconding都變成0、1、2),某個數值型的分布范圍是0-1000,那么輸入到LR中,模型會認為這兩個特征是同一類,就會給數值型特征一個相對小的權重(比如0.001),而給數值型特征一個相對大的權重,但這顯然不是我們想要的。因此需要把類別型和數值型特征都轉換成0-1之間的數字。

  • 對類別型特征進行獨熱編碼前,需要對該字段進行label enconding。
    import pandas as pd 
    df_raw=pd.DataFrame(['A','B','C','A'],columns=['col_raw'])
    #對類別型進行轉換
    from sklearn import preprocessing
    lbl = preprocessing.LabelEncoder()
    col='col_raw'
    df_raw['col_lab'] = lbl.fit_transform(df_raw[col].astype(str))
    #保存label轉換映射關系
    import pickle
    save_lbl_path='./onehot_model/'+col+'.pkl'
    output = open(save_lbl_path, 'wb')
    pickle.dump(lbl, output)
    output.close()
    #讀取和轉換
    pkl_path='./onehot_model/'+col+'.pkl'
    pkl_file = open(pkl_path, 'rb')
    le_departure = pickle.load(pkl_file) 
    df_raw['col_t_raw'] = le_departure.inverse_transform(df_raw['col_lab'])
    df_raw

        

  • 對label enconding后的列進行獨熱編碼 
    df_cate_tmp=pd.get_dummies(df_raw['col_lab'],prefix='col_lab')
    df_raw=pd.concat([df_raw, df_cate_tmp],axis=1) 
    df_raw

2.2 歸一化和標准化

歸一化是把數值型特征映射到0-1值域區間,而標准化是把數值分布變成均值為0方差為1標准正態分布。

  • 歸一化
    nor_model=[]
    col_num='num_first'
    #賦值數值型
    df_raw[col_num]=[0.2,4,22,8]
    tmp_min=df_raw[col_num].min()
    tmp_max=df_raw[col_num].max()
    nor_model.append((col_num,tmp_min,tmp_max))
    #最大-最小歸一化法
    df_raw[col+'_nor']=df_raw[col_num].apply(lambda x:(x-tmp_min)/(tmp_max-tmp_min))
    #保存對應的列名及最大值、最小值
    with open("./nor_model/col_min_max.txt","w") as f:
            for i in nor_model:
                result=i[0]+','+str(i[1])+','+str(i[2])+'\n'
                f.write(str(result))  

 2.3 自定義損失函數

在不同的業務場景中,python包提供的損失函數滿足不了我們的項目要求,這個時候就需要對自定義損失函數。(參考:https://cloud.tencent.com/developer/article/1357671)

  • 自定義一個MSE,使得它對正殘差的懲罰是負殘差的10倍

         

 正如定義的那樣,非對稱MSE很好,因為它很容易計算梯度和hessian,如下圖所示。注意,hessian在兩個不同的值上是常量,左邊是2,右邊是20,盡管在下面的圖中很難看到這一點。

          

LightGBM提供了一種直接實現定制訓練和驗證損失的方法。其他的梯度提升包,包括XGBoost和Catboost,也提供了這個選項。這里是一個Jupyter筆記本,展示了如何實現自定義培訓和驗證損失函數。細節在筆記本上,但在高層次上,實現略有不同。

1、訓練損失:在LightGBM中定制訓練損失需要定義一個包含兩個梯度數組的函數,目標和它們的預測。反過來,該函數應該返回梯度的兩個梯度和每個觀測值的hessian數組。如上所述,我們需要使用微積分來派生gradient和hessian,然后在Python中實現它。

2、驗證丟失:在LightGBM中定制驗證丟失需要定義一個函數,該函數接受相同的兩個數組,但返回三個值: 要打印的名稱為metric的字符串、損失本身以及關於是否更高更好的布爾值。

#---sklearn api
def custom_asymmetric_train(y_true, y_pred):
    residual = (y_true - y_pred).astype("float")
    grad = np.where(residual<0, -2*10.0*residual, -2*residual)
    hess = np.where(residual<0, 2*10.0, 2.0)
    return grad, hess

def custom_asymmetric_valid(y_true, y_pred):
    residual = (y_true - y_pred).astype("float")
    loss = np.where(residual < 0, (residual**2)*10.0, residual**2) 
    return "custom_asymmetric_eval", np.mean(loss), False


#---lgb api

def custom_asymmetric_train(preds,dtrain):
    y_true=np.array(dtrain.get_label())
    y_pred=np.argmax(preds.reshape(len(y_true),-1), axis=0)
    residual = np.array((y_pred-y_true)).astype("float")
    p=20#參數
    tmpGrad=[]
    tmpHess=[]
    for i in residual:
        if i<0:
            tmpGrad.append(-i*p)
            tmpHess.append(p)
        elif (i>=0 and i<=12):
            tmpGrad.append(i*(p/10))
            tmpHess.append(p/10)
        else:
            tmpGrad.append(i*p)
            tmpHess.append(p)
    grad=np.array(tmpGrad)   
    hess=np.array(tmpHess)
    return grad, hess

def custom_asymmetric_valid(preds,dtrain):
    p=20#參數
    y_true=np.array(dtrain.get_label())
    y_pred=np.argmax(preds.reshape(len(y_true),-1), axis=0)
    residual = np.array((y_pred-y_true)).astype("float")
    tmpLoss=[]
    for i in residual:
        if i<0:
            tmpLoss.append(-i*p)
        elif (i>=0 and i<=12):
            tmpLoss.append(i*(p/10))
        else:
            tmpLoss.append(i*p)
    loss=np.array(tmpLoss)           
    return "custom_asymmetric_eval", np.mean(loss), False
相應的調用代碼(這里需要區分lgb和sklearn的lgb,兩個模塊的y_red、y是不同的格式):
import lightgbm

********* Sklearn API **********
# default lightgbm model with sklearn api
gbm = lightgbm.LGBMRegressor() 

# updating objective function to custom
# default is "regression"
# also adding metrics to check different scores
gbm.set_params(**{'objective': custom_asymmetric_train}, metrics = ["mse", 'mae'])

# fitting model 
gbm.fit(
    X_train,
    y_train,
    eval_set=[(X_valid, y_valid)],
    eval_metric=custom_asymmetric_valid,
    verbose=False,
)

y_pred = gbm.predict(X_valid)

********* Python API **********
# create dataset for lightgbm
# if you want to re-use data, remember to set free_raw_data=False
lgb_train = lgb.Dataset(X_train, y_train, free_raw_data=False)
lgb_eval = lgb.Dataset(X_valid, y_valid, reference=lgb_train, free_raw_data=False)

# specify your configurations as a dict
params = {
    'objective': 'regression',
    'verbose': 0
}

gbm = lgb.train(params,
                lgb_train,
                num_boost_round=10,
                init_model=gbm,
                fobj=custom_asymmetric_train,
                feval=custom_asymmetric_valid,
                valid_sets=lgb_eval)
                
y_pred = gbm.predict(X_valid)

 

2.4 自定義scoring

在評估模型預測結果時,如果python庫中沒有對應的指標(比如k-s值、GINI純度等),就需要自定義scoring。---此處是針對sklearn的metrics。

  • sklearn中自帶的評估函數:
from sklearn.metrics import *
SCORERS.keys()

#結果
dict_keys(['explained_variance', 'r2', 'neg_median_absolute_error', 'neg_mean_absolute_error', 'neg_mean_squared_error', 'neg_mean_squared_log_error', 'accuracy', 'roc_auc', 'balanced_accuracy', 'average_precision', 'neg_log_loss', 'brier_score_loss', 'adjusted_rand_score', 'homogeneity_score', 'completeness_score', 'v_measure_score', 'mutual_info_score', 'adjusted_mutual_info_score', 'normalized_mutual_info_score', 'fowlkes_mallows_score', 'precision', 'precision_macro', 'precision_micro', 'precision_samples', 'precision_weighted', 'recall', 'recall_macro', 'recall_micro', 'recall_samples', 'recall_weighted', 'f1', 'f1_macro', 'f1_micro', 'f1_samples', 'f1_weighted'])
  • 自定義scoring
import numpy as np
def ginic(actual, pred):
    actual = np.asarray(actual) # In case, someone passes Series or list
    n = len(actual)
    a_s = actual[np.argsort(pred)]
    a_c = a_s.cumsum()
    giniSum = a_c.sum() / a_s.sum() - (n + 1) / 2.0
    return giniSum / n

def gini_normalizedc(a, p):
    if p.ndim == 2:  # Required for sklearn wrapper
        p = p[:,1]   # If proba array contains proba for both 0 and 1 classes, just pick class 1
    return ginic(a, p) / ginic(a, a)

from sklearn import metrics
gini_sklearn = metrics.make_scorer(gini_normalizedc, True, True)


import numpy as np
import pandas as pd
from scipy.stats import ks_2samp
from sklearn.metrics import make_scorer, roc_auc_score, log_loss,f1_score
from sklearn.model_selection import GridSearchCV

def ks_stat(y, yhat):
    y=np.array(y)
    yhat=np.array(yhat)
    try:
        ks =ks_2samp(yhat[y==1],yhat[y!=1]).statistic
    except:
        print(yhat.shape,y.shape)
        kmp=yhat[y==1]
        kmp2=yhat[y!=1]
        print(kmp.shape,kmp2.shape)
        ks=0
    return ks


ks_scorer = make_scorer(ks_stat, needs_proba=True, greater_is_better=True)

log_scorer = make_scorer(log_loss, needs_proba=True, greater_is_better=False)

roc_scorer = make_scorer(roc_auc_score, needs_proba=True)

f1_scorer = make_scorer(f1_score, needs_proba=True, greater_is_better=True)

 

3.跑模型ing

3.1 GridSearchCV調參

模型有很多超參數,通過格點搜索進行(此處,如果設置並行n_jobs>1,則需要把自定義的scoring放在一個外部py文件中,因為:如果在聲明函數之前聲明池,則嘗試並行使用它將引發此錯誤。顛倒順序,它將不再拋出此錯誤)。

from sklearn.ensemble import GradientBoostingClassifier
#必須從外部導入scoring
import multiprocessing
#multiprocessing.set_start_method('spawn')
from external import *
from sklearn.model_selection import GridSearchCV
tuned_param = {'learning_rate': [0.1, 0.2, 0.5, 0.8, 1],
           'max_depth':[3,5,7,10],
          'min_samples_split':[50,100,200],
          'n_estimators':[50,70,100,150,200],
          'subsample':[0.6,0.8]}

#
n=20
clf_gbdt=GridSearchCV(GradientBoostingClassifier(),tuned_param,cv=2,scoring={'auc': roc_scorer, 'k-s': ks_scorer}, refit='k-s',n_jobs=n, verbose=10)
clf_gbdt.fit(x_train[col_gbdt],y_train)

#獲取最優超參組合
def print_best_score(gsearch,param_test):
     # 輸出best score
    print("Best score: %0.3f" % gsearch.best_score_)
    print("Best parameters set:")
    # 輸出最佳的分類器到底使用了怎樣的參數
    best_parameters = gsearch.best_estimator_.get_params()
    for param_name in sorted(param_test.keys()):
        print("\t%s: %r" % (param_name, best_parameters[param_name]))

print_best_score(clf_gbdt,tuned_param)     

 

3.2 k-fold驗證模型穩定性

對於工業界算法模型,模型的穩定是首要!

from sklearn.model_selection import cross_val_score
scores=cross_val_score(estimator=lr,X=x_train[col_lr],y=y_train,cv=5,n_jobs=10,scoring=ks_scorer,verbose=10)
print('CV k-s scores: %s'%scores)
print('CV k-s: %.3f +/- %.3f'%(np.mean(scores),np.std(scores)))

 

3.3 模型訓練

from sklearn.ensemble import GradientBoostingClassifier
gbdt = GradientBoostingClassifier(learning_rate=0.2,max_depth=10,min_samples_split=200,n_estimators=200,
                                 subsample=0.8,min_samples_leaf=50)
gbdt.fit(x_train[col_gbdt], y_train)

#模型保存
pickle.dump(gbdt, open('./ml_model/gbdt_model.pkl', 'wb'))

#模型預測輸出概率值
y_pred_gbdt=gbdt.predict_proba(x_valid[col_gbdt])

 

3.4 模型驗證

  • AUC
import seaborn as sns
sns.set_style('darkgrid')
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams.update({'font.size': 10})
plt.rcParams['savefig.dpi'] = 300 #圖片像素
plt.rcParams['figure.dpi'] = 300 #分辨率

# 計算AUC
fpr_lr,tpr_lr,thresholds = roc_curve(y_valid,y_pred_lr[:,1],pos_label=1)
roc_auc_lr = auc(fpr_lr, tpr_lr)

# 繪制roc
plt.rcParams['figure.figsize']=(8,5)
plt.figure()
plt.plot(fpr_lr, tpr_lr, color='darkorange', label='ROC curve (area = %0.2f)' % roc_auc_lr)
plt.plot([0, 1], [0, 1], color='navy', linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC曲線-LR')
plt.legend(loc="lower right")
  • K-S
# 繪制K-S曲線
import numpy as np
import pandas as pd
def PlotKS(preds, labels, n, asc):
    
    # preds is score: asc=1
    # preds is prob: asc=0
    
    pred = preds  # 預測值
    bad = labels  # 取1為bad, 0為good
    ksds = pd.DataFrame({'bad': bad, 'pred': pred})
    ksds['good'] = 1 - ksds.bad
    
    if asc == 1:
        ksds1 = ksds.sort_values(by=['pred', 'bad'], ascending=[True, True])
    elif asc == 0:
        ksds1 = ksds.sort_values(by=['pred', 'bad'], ascending=[False, True])
    ksds1.index = range(len(ksds1.pred))
    ksds1['cumsum_good1'] = 1.0*ksds1.good.cumsum()/sum(ksds1.good)
    ksds1['cumsum_bad1'] = 1.0*ksds1.bad.cumsum()/sum(ksds1.bad)
    
    if asc == 1:
        ksds2 = ksds.sort_values(by=['pred', 'bad'], ascending=[True, False])
    elif asc == 0:
        ksds2 = ksds.sort_values(by=['pred', 'bad'], ascending=[False, False])
    ksds2.index = range(len(ksds2.pred))
    ksds2['cumsum_good2'] = 1.0*ksds2.good.cumsum()/sum(ksds2.good)
    ksds2['cumsum_bad2'] = 1.0*ksds2.bad.cumsum()/sum(ksds2.bad)
    
    # ksds1 ksds2 -> average
    ksds = ksds1[['cumsum_good1', 'cumsum_bad1']]
    ksds['cumsum_good2'] = ksds2['cumsum_good2']
    ksds['cumsum_bad2'] = ksds2['cumsum_bad2']
    ksds['cumsum_good'] = (ksds['cumsum_good1'] + ksds['cumsum_good2'])/2
    ksds['cumsum_bad'] = (ksds['cumsum_bad1'] + ksds['cumsum_bad2'])/2
    
    # ks
    ksds['ks'] = ksds['cumsum_bad'] - ksds['cumsum_good']
    ksds['tile0'] = range(1, len(ksds.ks) + 1)
    ksds['tile'] = 1.0*ksds['tile0']/len(ksds['tile0'])
    
    qe = list(np.arange(0, 1, 1.0/n))
    qe.append(1)
    qe = qe[1:]
    
    ks_index = pd.Series(ksds.index)
    ks_index = ks_index.quantile(q = qe)
    ks_index = np.ceil(ks_index).astype(int)
    ks_index = list(ks_index)
    
    ksds = ksds.loc[ks_index]
    ksds = ksds[['tile', 'cumsum_good', 'cumsum_bad', 'ks']]
    ksds0 = np.array([[0, 0, 0, 0]])
    ksds = np.concatenate([ksds0, ksds], axis=0)
    ksds = pd.DataFrame(ksds, columns=['tile', 'cumsum_good', 'cumsum_bad', 'ks'])
    
    ks_value = ksds.ks.max()
    ks_pop = ksds.tile[ksds.ks.idxmax()]
    tmp_str='ks_value is ' + str(np.round(ks_value, 4)) + ' at pop = ' + str(np.round(ks_pop, 4))
    
    # chart

    # chart
    plt.plot(ksds.tile, ksds.cumsum_good, label='cum_good',
                         color='blue', linestyle='-', linewidth=2)
                         
    plt.plot(ksds.tile, ksds.cumsum_bad, label='cum_bad',
                        color='red', linestyle='-', linewidth=2)
                        
    plt.plot(ksds.tile, ksds.ks, label='ks',
                   color='green', linestyle='-', linewidth=2)
                       
    plt.axvline(ks_pop, color='gray', linestyle='--')
    plt.axhline(ks_value, color='green', linestyle='--')
    plt.axhline(ksds.loc[ksds.ks.idxmax(), 'cumsum_good'], color='blue', linestyle='--')
    plt.axhline(ksds.loc[ksds.ks.idxmax(),'cumsum_bad'], color='red', linestyle='--')
    plt.title('KS=%s ' %np.round(ks_value, 4) +  
                'at Pop=%s' %np.round(ks_pop, 4), fontsize=15)
    

    return tmp_str

#調用
PlotKS(y_valid,y_pred_lr[:,1], n=10, asc=0)
  • 其他指標計算
#根據上述可知,在0.2的時候區分度最好,因此認為大於0.2的就是1,小於則為0
y_pred_lr_new=[]
for i in y_pred_lr[:,1]:
    if i<=0.2:
        y_pred_lr_new.append(0)
    else:
        y_pred_lr_new.append(1)

y_pred_gbdt_new=[]
for i in y_pred_gbdt[:,1]:
    if i<=0.2:
        y_pred_gbdt_new.append(0)
    else:
        y_pred_gbdt_new.append(1)
        
y_pred_lr_gbdt_new=[]
for i in y_pred_gbdt_lr[:,1]:
    if i<=0.2:
        y_pred_lr_gbdt_new.append(0)
    else:
        y_pred_lr_gbdt_new.append(1)

# gbdt
acc=accuracy_score(y_valid, y_pred_gbdt_new)
p = precision_score(y_valid, y_pred_gbdt_new, average='binary')
r = recall_score(y_valid, y_pred_gbdt_new, average='binary')
f1score = f1_score(y_valid, y_pred_gbdt_new, average='binary')
print(acc,p,r,f1score)        

 

 

 

4.線上部署(高能預警:有大殺器)

 

4.1 提供flask服務

初始模型以讀取pickle文件方式:首先通過python腳本讀取pickle文件,再起flask提供模型預測服務,最后java應用調用flask實現預測。

  •  優點:方便更新模型,且能用與除LR模型之外的復雜模型(樹模型等)。
  • 缺點:java調flask,可能會有通信延時等,會影響速度。flask服務部署的docker和java應用部署的docker要分開,計算效率低
  • 參考鏈接:https://www.cnblogs.com/demodashi/p/8491170.html

 

4.2 純java代碼實現

用java腳本實現LR模型類:通過java腳本構建模型(通過python訓練好的模型可以輸出成規則)方法,java應用之間調這個方法實現預測。

  •  優點:相對於第一種方法,該方法不需要進行通信(這里指和python),且計算速度快(純java方法)。
  •  缺點:代碼實現復雜,且只能用於LR模型,不能用於復雜模型。模型特征加工復雜(這里特指獨熱編碼、歸一化和gbdt特征生成),開發費勁

 

4.3 Java調用jpmml類

使用Java調用jpmml:把python相關的模型(獨熱編碼、LR模型等)轉成PMML文件(相當於一個本地txt文件),再通過java類(jpmml)調用。

  •  優點:結合1和2的優點,把所有(獨熱編碼、模型預測)的操作都匯集到一個方法中,速度快。
  •  缺點:暫無。
  • 參考鏈接:https://github.com/jpmml/jpmml-evaluator

綜上所述,綜合考慮計算速度及計算資源,推薦使用方案3。

 

4.4 GBDT+LR模型訓練及線上部署(快、粗、好、猛)

參考鏈接:https://openscoring.io/blog/2019/06/19/sklearn_gbdt_lr_ensemble/

from lightgbm import LGBMClassifier
from sklearn_pandas import DataFrameMapper
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import LabelBinarizer, LabelEncoder
from sklearn2pmml import sklearn2pmml
from sklearn2pmml.decoration import CategoricalDomain, ContinuousDomain
from sklearn2pmml.ensemble import GBDTLRClassifier
from sklearn2pmml.pipeline import PMMLPipeline
from xgboost import XGBClassifier

import pandas

df = pandas.read_csv("audit.csv")

cat_columns = ["Education", "Employment", "Marital", "Occupation"]
cont_columns = ["Age", "Hours", "Income"]

label_column = "Adjusted"

def make_fit_gbdtlr(gbdt, lr):
    mapper = DataFrameMapper(
        [([cat_column], [CategoricalDomain(), LabelBinarizer()]) for cat_column in cat_columns] +
        [(cont_columns, ContinuousDomain())]
    )
    classifier = GBDTLRClassifier(gbdt, lr)
    pipeline = PMMLPipeline([
        ("mapper", mapper),
        ("classifier", classifier)
    ])
    pipeline.fit(df[cat_columns + cont_columns], df[label_column])
    return pipeline

pipeline = make_fit_gbdtlr(GradientBoostingClassifier(n_estimators = 499, max_depth = 2), LogisticRegression())
sklearn2pmml(pipeline, "GBDT+LR.pmml")

pipeline = make_fit_gbdtlr(RandomForestClassifier(n_estimators = 31, max_depth = 6), LogisticRegression())
sklearn2pmml(pipeline, "RF+LR.pmml")

pipeline = make_fit_gbdtlr(XGBClassifier(n_estimators = 299, max_depth = 3), LogisticRegression())
sklearn2pmml(pipeline, "XGB+LR.pmml")

def make_fit_lgbmlr(gbdt, lr):
    mapper = DataFrameMapper(
        [([cat_column], [CategoricalDomain(), LabelEncoder()]) for cat_column in cat_columns] +
        [(cont_columns, ContinuousDomain())]
    )
    classifier = GBDTLRClassifier(gbdt, lr)
    pipeline = PMMLPipeline([
        ("mapper", mapper),
        ("classifier", classifier)
    ])
    pipeline.fit(df[cat_columns + cont_columns], df[label_column], classifier__gbdt__categorical_feature = range(0, len(cat_columns)))
    return pipeline

pipeline = make_fit_lgbmlr(LGBMClassifier(n_estimators = 71, max_depth = 5), LogisticRegression())
sklearn2pmml(pipeline, "LGBM+LR.pmml")

注意:jdk必須是1.8及以上、sklearn的版本必須0.21.0及以上、sklearn2pmml版本最好通過git獲取安裝

import sklearn
import sklearn2pmml
print('The sklearn2pmml version is {}.'.format(sklearn2pmml.__version__))
print('The scikit-learn version is {}.'.format(sklearn.__version__))

#多個jdk版本設置切換
export JAVA_HOME=/software/servers/jdk1.8.0_121/
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM