【風控算法】二、SQL->Python->PySpark計算KS,AUC及PSI


KS,AUC 和 PSI 是風控算法中最常計算的幾個指標,本文記錄了多種工具計算這些指標的方法。

生成本文的測試數據:

import pandas as pd 
import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, DoubleType
from pyspark.sql import SparkSession, functions
from sklearn.metrics import roc_auc_score,roc_curve


tmptable = pd.DataFrame({'y':[np.random.randint(2) for i in range(1000000)]})
tmptable['y'] = tmptable['score'].apply(lambda x:1 if np.random.rand()+x>0.8 else 0)
tmp_sparkdf = spark.createDataFrame(tmptable)
tmp_sparkdf.craeteOrReplaceTempView('tmpview')

一、KS

​KS 指標來源於 Kolmogorov-Smirnov 檢驗,通常用於比較兩組樣本是否來源於同一分布。在建模中划分訓練集與測試集后,通常運用 KS 檢驗來檢驗訓練集與測試集的分布差異,如果分布差異過大,那可能就會因為訓練集、測試集划分不合理而降低模型的泛化性。(關於 KS 檢驗的更多細節

在風控中,KS 指標通過來衡量模型對於好壞樣本的區分能力,其具體的算法為:

  1. 按模型分從小到大排序,並分為 n 組(等頻分組或每個不同的分值作為一組)
  2. 計算截至每一組的累積好樣本(y=0)占比與累積壞樣本(y=1)占比,記為 \(cumgoodratio_i\)\(cumbadratio_i\)
    如第 k 組:
    累積好樣本占比=第 k 組前包括第 k 組 y=0 樣本數量 / 全部 y=0 樣本的數量
    累積壞樣本占比=第 k 組前包括第 k 組 y=1 樣本數量 / 全部 y=1 樣本的數量
  3. \(KS=max(abs(cumgoodratio_i-cumbadratio_i))\)

1. SQL 計算 KS

select max(abs(cumgood/totalgood-cumbad/totalbad)) as ks
from (
    select score,
    sum(totalbad)over(order by score) as cumbad,
    sum(totalgood)over(order by score) as cumgood,
    sum(totalbad) over() as totalbad,
    sum(totalgood) over() as totalgood
    from (
        select 
        score,
        sum(y) as totalbad,
        sum(1-y) as totalgood
        from tmpview
        group by score
    )
)

2. Python 計算 KS

def get_ks(y_true:pd.Series,y_pred:pd.Series):
    '''
    A staticmethod to caculate the KS of the model.
    Args:
        y_true: true value of the sample
        y_pred: pred value of the sample
  
    Returns:
        max(tpr-fpr): KS of the model
    '''
    fpr,tpr,_ = roc_curve(y_true,y_pred)
    return str(max(abs(tpr-fpr)))
ksdata = spark.sql('select * from tmpview').toPandas()
print(get_ks(ksdata['y'],ksdata['score']))

3. Pyspark 計算 KS

有兩種方法,1 是對用 pyspark 的語法把 SQL 的邏輯給寫出來,可以算出來 KS;2 就是包裝成 UDF 函數,這樣當需要 groupby 后計算 KS 時,可以直接調用 UDF 函數分組計算 KS

a. SQL 邏輯改寫

ksdata = spark.sql('select * from tmpview')

def calks(df,ycol='y',scorecol='score'):
    return df.withColumn(ycol,F.col(ycol).cast('int')).withColumn(scorecol,F.col(scorecol).cast('float'))\
        .withColumn('totalbad',F.sum(F.col(ycol)).over(Window.orderBy(F.lit(1))))\
        .withColumn('totalgood',F.sum(1-F.col(ycol)).over(Window.orderBy(F.lit(1))))\
        .withColumn('cumgood',F.sum(1-F.col(ycol)).over(Window.orderBy(F.col(scorecol).asc())))\
        .withColumn('cumbad',F.sum(F.col(ycol)).over(Window.orderBy(F.col(scorecol).asc())))\
        .select(F.max(F.abs(F.col('cumgood')/F.col('totalgood')-F.col('cumbad')/F.col('totalbad'))).alias('KS'))
calks(ksdata).show()

b. python 轉 UDF 函數

def get_ks(y_true:pd.Series,y_pred:pd.Series):
    '''
    A staticmethod to caculate the KS of the model.
    Args:
        y_true: true value of the sample
        y_pred: pred value of the sample
  
    Returns:
        max(tpr-fpr): KS of the model
    '''
    fpr,tpr,_ = roc_curve(y_true,y_pred)
    return str(max(abs(tpr-fpr)))
get_ks_udfs = F.udf(get_ks, returnType=StringType())
ksdata = spark.sql('select * from tmpview')
print(ksdata.withColumn('eval metrics',F.lit('KS'))\
    .groupby('eval metrics')\
    .agg(get_ks_udfs(F.collect_list(F.col('y')),F.collect_list(F.col('score'))).alias('KS'))\
    .select('KS').toPandas())

二、AUC

AUC(Area Under Curve)被定義為 ROC 曲線下與坐標軸圍成的面積,通常用來衡量二分類模型全局的區分能力。在 python 和 pyspark 中可以直接調包計算,在 SQL 中可以根據公式計算獲得,其計算方法如下:

  1. 對 score 從小到大排序

  2. 根據公式計算:

    \[AUC=\frac{\sum_{i\in{positiveClass}}rank_i-\frac{M(1+M)}{2}}{M\times N} \]

    其中,\(rank_i\) 代表第 i 個正樣本的排序序號,M 和 N 分別代表正樣本和負樣本的總個數。

關於該公式的詳細理解,可參考 AUC 的計算方法(及評論)

1. SQL 計算 AUC

select (sumpositivernk-totalbad*(1+totalbad)/2)/(totalbad*totalgood) as auc
from
(
    select sum(if(y=1,rnk,0)) as sumpositivernk,
    sum(y) as totalbad,
    sum(1-y) as totalgood
    from
    (
        select y,row_number() over (order by score) as rnk
        from tmpview
    )
)

2. Python 計算 AUC

ksdata = spark.sql('select * from tmpview').toPandas()
print(roc_auc_score(ksdata['y'],ksdata['score']))

3. Pyspark 計算 AUC

同 KS 的計算,除了提到的兩種方式,還可以調用 pyspark 的 ML 包下二分類評價,來計算 AUC

a. SQL 邏輯改寫

aucdata = spark.sql('select * from tmpview')

def calauc(df,ycol='y',scorecol='score'):
    return df.withColumn(ycol,F.col(ycol).cast('int')).withColumn(scorecol,F.col(scorecol).cast('float'))\
        .withColumn('totalbad',F.sum(F.col(ycol)).over(Window.orderBy(F.lit(1))))\
        .withColumn('totalgood',F.sum(1-F.col(ycol)).over(Window.orderBy(F.lit(1))))\
        .withColumn('rnk2',F.row_number().over(Window.orderBy(F.col(scorecol).asc())))\
        .filter(F.col(ycol)==1)\
        .select(((F.sum(F.col('rnk2'))-0.5*(F.max(F.col('totalbad')))*(1+F.max(F.col('totalbad'))))/(F.max(F.col('totalbad'))*F.max(F.col('totalgood')))).alias('AUC'))\

calauc(aucdata).show()

b. UDF 函數

def auc(ytrue,ypred):
    return str(roc_auc_score(ytrue,ypred))
get_auc_udfs = F.udf(auc, returnType=StringType())
aucdata = spark.sql('select * from tmpview')
aucdata.withColumn('eval metrics',F.lit('AUC'))\
    .groupby('eval metrics')\
    .agg(get_auc_udfs(F.collect_list(F.col('y')),F.collect_list(F.col('score'))).alias('AUC'))\
      .select('AUC').show()

c. 調包

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='score',labelCol='y')
aucdata = spark.sql('select * from tmpview')
evaluator.evaluate(aucdata)

三、PSI

PSI(Population Stability Index:群體穩定性指標),通常被用於衡量兩個樣本模型分分布的差異,在風控建模中通常有兩個作用:

  1. 用於建模時篩選掉不穩定的特征
  2. 用於建模后及上線后評估和監控模型分值的穩定程度

個人認為該指標無一個比較明確的標准,在樣本量較大的條件下,篩選特征時盡量控制特征 PSI<0.1,或更嚴格。

計算 PSI 首先需要一個分箱基准,假定本文隨機生成的模型分的分箱切分點為\([0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1]\)

1. SQL 計算 PSI

select 
 sum(grouppsi) as psi
 from (
        select g
        ,log(count(1) / sum(count(1))over() / 0.1)*(count(1) / sum(count(1))over() - 0.1) as grouppsi
        from (
            select 
            case when score<cutpoint[1] then 1
            when score<cutpoint[2] then 2
            when score<cutpoint[3] then 3
            when score<cutpoint[4] then 4
            when score<cutpoint[5] then 5
            when score<cutpoint[6] then 6
            when score<cutpoint[7] then 7
            when score<cutpoint[8] then 8
            when score<cutpoint[9] then 9
            when score<cutpoint[10] then 10 else 'error' end as g
            from (
                select *
                ,array(0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1) as cutpoint 
                from tmpview 
                )
            )  
        group by g  
    )

2. Python 計算 PSI

psidata = spark.sql('select * from tmpview').toPandas()
psidata['g'] = pd.cut(psidata['score'],cut_point)
psitable = psidata.groupby('g')['y'].count()
psitable /= psitable.sum()
standratio = 1/(len(cut_point)-1)
psi = sum((psitable-standratio)*np.log(psitable/standratio))

3. Pyspark 計算 PSI

參考 Pyspark 實現連續分桶映射並自定義標簽,調包分箱后按公式計算 PSI

from pyspark.ml.feature import Bucketizer

def psi(df, splits, inputCol, outputCol):
    if len(splits) < 2:
        raise RuntimeError("splits's length must grater then 2.")

    standratio = 1 / (len(splits)-1)
    bucketizer = Bucketizer(
        splits=splits, inputCol=inputCol, outputCol='split')
    with_split = bucketizer.transform(df)
    with_split = with_split.groupby('split')\
                            .agg((F.count(F.col(inputCol))/F.sum(F.count(F.col(inputCol))).over(Window.orderBy(F.lit(1)))).alias('groupratio'))\
                            .select(F.sum((F.col('groupratio')-standratio)*F.log(F.col('groupratio')/standratio)).alias('PSI'))
  
    return with_split
psi(aucdata,cut_point,'score','group').show()

參考資料

深入理解 AUC​

SQL 計算多模型分的 PSI

Pyspark 實現連續分桶映射並自定義標簽

使用 pyspark dataframe 的 groupby 計算 AUC


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM