z-score 的基礎概念
這種方法基於原始數據的均值(mean)和標准差(standard deviation)進行數據的標准化。
將A的原始值x使用z-score標准化到x’,
x′=x−μδ ,μ為數據的均值, δ為方差。
z-score標准化方法適用於屬性A的最大值和最小值未知的情況,或有超出取值范圍的離群數據的情況。
將數據按其屬性(按列進行)減去其均值,然后除以其方差。
最后得到的結果是,對每個屬性/每列來說所有數據都聚集在0附近,方差值為1。
Python使用平滑移動+z-score進行時序數據的異常值檢測 (調參總結見代碼注釋及后續結果輸出)
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import math
def smoothed_z_score_test(data):
""" 一點調參總結
平滑z-score 測試
:param y: 原DataFrame的數據列
:param lag: 滯后數(初始滑動窗口大小) , 建議設置為業務線的循環周期需要的天數, 看業務線的周期規律——估算出回歸周期,乘上系數; 按天變化的設置為7*4天, 按周的設置為7*4*4天, 等
:param threshold: 閾值 = 當前值超出前面所有的值的平均水平的絕對值 除以 前面所有的值的標准差的倍數 的上限, 建議2倍
:param influence: 平滑系數,發生異常點時使用的平滑系數,(0,1),值越大越受當前值的影響,及異常值的折算系數,建議0.5左右
:return:
"""
input_id = 157 # 輸入數據的id
output_id = 161 # 聚合並輸出的數據的id
start_date = '2017-01-01' # 開始轉換的時間
end_date = '2020-01-31' # 結束轉換的時間
y = data[input_id]['data_value']
# 設置z-score參數
lag, threshold, influence = 8 * 2, 2, 0.5
signals = np.zeros(len(y))
filteredY = np.array(y)
avgFilter = [0] * len(y)
stdFilter = [0] * len(y)
avgFilter[lag - 1] = np.mean(y[0:lag])
stdFilter[lag - 1] = np.std(y[0:lag])
for i in range(lag, len(y)):
if abs(y[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1]:
if y[i] > avgFilter[i - 1]:
signals[i] = 1
else:
signals[i] = -1
filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i - 1]
avgFilter[i] = np.mean(filteredY[(i - lag + 1):i + 1])
stdFilter[i] = np.std(filteredY[(i - lag + 1):i + 1])
else:
signals[i] = 0
filteredY[i] = y[i]
avgFilter[i] = np.mean(filteredY[(i - lag + 1):i + 1])
stdFilter[i] = np.std(filteredY[(i - lag + 1):i + 1])
series_dict = dict(signals=np.asarray(signals),
avgFilter=np.asarray(avgFilter),
stdFilter=np.asarray(stdFilter)
)
data[output_id] = data[input_id].copy()
data[output_id]['data_value'] = np.asarray(series_dict['signals'])
def paint(dfs=[], labels=[], title='暫無'):
assert len(dfs) == len(labels)
plt.rcParams['font.sans-serif'] = ['SimHei'] # 顯示中文標簽
plt.rcParams['axes.unicode_minus'] = False
plt.figure(figsize=(16, 8))
for i in range(0, len(dfs)):
plt.plot(dfs[i]['data_value'], label=labels[i])
plt.legend(loc='best')
plt.title(title)
plt.show()
if __name__ == '__main__':
idx = pd.date_range('2020-01-01', periods=90, freq='D')
cos_arr = np.arange(len(idx)) * np.pi / 8
for i in range(0, len(cos_arr)):
cos_arr[i] = math.cos(cos_arr[i])
# 人為制造兩個異常點
cos_arr[24] -= 1
cos_arr[25] -= 2
cos_arr[32] += 1
data_value = pd.Series(cos_arr, index=idx)
# 設置z-score參數
df = pd.DataFrame({
'data_time': idx, # 時間列
'data_value': data_value # 數據列
})
data = {}
data[157] = df
data[161] = pd.DataFrame()
smoothed_z_score_test(data)
paint(dfs=[data[157], data[161]], labels=['origin', 'signal'])
輸出繪圖
正弦函數模擬測試01 (上面的代碼的直接輸出)
- 輸入數據特征: 回歸周期為8*2天,閾值2倍, 異常值折算為0.5倍
- 對應使用的參數為lag, threshold, influence = 8 * 2, 2, 0.5
歷史某業務線數據測試 (多次調整后)
- 輸入數據特征: 回歸周期為24小時(再74后就是一個月),閾值3倍, 異常值折算為0.5倍
- 使用參數為: lag, threshold, influence = 2474, 3, 0.5
- 藍線為業務數據, 紅線為 z-score計算出的脈沖數據乘以100倍
參考鏈接:
https://www.imooc.com/article/39419 python 數據標准化常用方法,z-score\min-max標准化
https://zhuanlan.zhihu.com/p/39453139 Smoothed z-score algorithm簡介配圖
https://stackoverflow.com/questions/22583391/peak-signal-detection-in-realtime-timeseries-data/43512887#43512887 源代碼、Smoothed z-score algorithm實踐代碼