時序預測 01 - 異常檢測 Smoothed z-score algorithm 標准化的一些實踐、調參總結 -Python/pandas/numpy


z-score 的基礎概念

這種方法基於原始數據的均值(mean)和標准差(standard deviation)進行數據的標准化。
將A的原始值x使用z-score標准化到x’,
x′=x−μδ ,μ為數據的均值, δ為方差。
z-score標准化方法適用於屬性A的最大值和最小值未知的情況,或有超出取值范圍的離群數據的情況。
將數據按其屬性(按列進行)減去其均值,然后除以其方差。
最后得到的結果是,對每個屬性/每列來說所有數據都聚集在0附近,方差值為1。

Python使用平滑移動+z-score進行時序數據的異常值檢測 (調參總結見代碼注釋及后續結果輸出)

# -*- coding: utf-8 -*-

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import math


def smoothed_z_score_test(data):

    """  一點調參總結
    平滑z-score 測試
    :param y: 原DataFrame的數據列
    :param lag: 滯后數(初始滑動窗口大小) , 建議設置為業務線的循環周期需要的天數, 看業務線的周期規律——估算出回歸周期,乘上系數; 按天變化的設置為7*4天, 按周的設置為7*4*4天, 等
    :param threshold: 閾值 = 當前值超出前面所有的值的平均水平的絕對值 除以 前面所有的值的標准差的倍數 的上限, 建議2倍
    :param influence: 平滑系數,發生異常點時使用的平滑系數,(0,1),值越大越受當前值的影響,及異常值的折算系數,建議0.5左右
    :return:
    """

    input_id = 157  # 輸入數據的id
    output_id = 161  # 聚合並輸出的數據的id

    start_date = '2017-01-01'  # 開始轉換的時間
    end_date = '2020-01-31'  # 結束轉換的時間

    y = data[input_id]['data_value']
    # 設置z-score參數
    lag, threshold, influence = 8 * 2, 2, 0.5

    signals = np.zeros(len(y))
    filteredY = np.array(y)
    avgFilter = [0] * len(y)
    stdFilter = [0] * len(y)
    avgFilter[lag - 1] = np.mean(y[0:lag])
    stdFilter[lag - 1] = np.std(y[0:lag])
    for i in range(lag, len(y)):
        if abs(y[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1]:
            if y[i] > avgFilter[i - 1]:
                signals[i] = 1
            else:
                signals[i] = -1

            filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i - 1]
            avgFilter[i] = np.mean(filteredY[(i - lag + 1):i + 1])
            stdFilter[i] = np.std(filteredY[(i - lag + 1):i + 1])
        else:
            signals[i] = 0
            filteredY[i] = y[i]
            avgFilter[i] = np.mean(filteredY[(i - lag + 1):i + 1])
            stdFilter[i] = np.std(filteredY[(i - lag + 1):i + 1])

    series_dict = dict(signals=np.asarray(signals),
                       avgFilter=np.asarray(avgFilter),
                       stdFilter=np.asarray(stdFilter)
                       )

    data[output_id] = data[input_id].copy()
    data[output_id]['data_value'] = np.asarray(series_dict['signals'])


def paint(dfs=[], labels=[], title='暫無'):
    assert len(dfs) == len(labels)
    plt.rcParams['font.sans-serif'] = ['SimHei']  # 顯示中文標簽
    plt.rcParams['axes.unicode_minus'] = False

    plt.figure(figsize=(16, 8))
    for i in range(0, len(dfs)):
        plt.plot(dfs[i]['data_value'], label=labels[i])
    plt.legend(loc='best')
    plt.title(title)
    plt.show()

if __name__ == '__main__':
    idx = pd.date_range('2020-01-01', periods=90, freq='D')

    cos_arr = np.arange(len(idx)) * np.pi / 8
    for i in range(0, len(cos_arr)):
        cos_arr[i] = math.cos(cos_arr[i])
    # 人為制造兩個異常點
    cos_arr[24] -= 1
    cos_arr[25] -= 2
    cos_arr[32] += 1
    data_value = pd.Series(cos_arr, index=idx)

    # 設置z-score參數

    df = pd.DataFrame({
        'data_time': idx,  # 時間列
        'data_value': data_value  # 數據列
    })

    data = {}
    data[157] = df
    data[161] = pd.DataFrame()
    smoothed_z_score_test(data)

    paint(dfs=[data[157], data[161]], labels=['origin', 'signal'])

輸出繪圖

正弦函數模擬測試01 (上面的代碼的直接輸出)

  • 輸入數據特征: 回歸周期為8*2天,閾值2倍, 異常值折算為0.5倍
  • 對應使用的參數為lag, threshold, influence = 8 * 2, 2, 0.5
    粗糙的測試結果

歷史某業務線數據測試 (多次調整后)

  • 輸入數據特征: 回歸周期為24小時(再74后就是一個月),閾值3倍, 異常值折算為0.5倍
  • 使用參數為: lag, threshold, influence = 2474, 3, 0.5
  • 藍線為業務數據, 紅線為 z-score計算出的脈沖數據乘以100倍

參考鏈接:

https://www.imooc.com/article/39419 python 數據標准化常用方法,z-score\min-max標准化
https://zhuanlan.zhihu.com/p/39453139 Smoothed z-score algorithm簡介配圖
https://stackoverflow.com/questions/22583391/peak-signal-detection-in-realtime-timeseries-data/43512887#43512887 源代碼、Smoothed z-score algorithm實踐代碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM