# -*- coding: utf-8 -*-
"""
Created on Sat Aug 18 11:08:38 2018
@author: acadsoc
"""
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from pyecharts import Bar, Line, Page, Overlap
import statsmodels.api as sm
from sklearn.preprocessing import StandardScaler
# import pymssql
from dateutil import parser
import copy
import os
import sys
from featureSelection import featureSelection
plt.style.use('ggplot') # 設置ggplot2畫圖風格
# 根據不同平台設置其中文字體路徑
if sys.platform == 'linux':
zh_font = matplotlib.font_manager.FontProperties(
fname='path/anaconda3/lib/python3.6/site-packages/matplotlib/mpl-data/fonts/ttf/STZHONGS.TTF')
else:
zh_font = matplotlib.font_manager.FontProperties(fname='C:\Windows\Fonts\STZHONGS.ttf') # 設置中文字體
# 根據不同平台設定工作目錄
if sys.platform == 'linux':
os.chdir(path) # Linux path
else:
os.chdir(path) # Windows path
class rollingRegression():
'''
滾動多元回歸分析類。
參數
----
response : str
回歸因變量。
date_begin : datetime
起始日期。
date_end : datetime
截止日期。
rolling_days : int
滾動天數。
intercept : bool
回歸方程是否帶常數項。
p_value_threshold : float
回歸系數按p值顯示閾值。
normalize : bool
數據是否標准化。
屬性
----
coef_ : dataframe
回歸系數。
coef_pvalue_ : dataframe
回歸系數pvalue。
r2_ : dataframe
回歸模型Rsquared 和 Rsquared_adj。
echarts_page_ : echarts_page
echarts_page 文件。
'''
def __init__(self, response='新單數', date_begin='2018-01-01', date_end='2018-07-31', rolling_days=30,
intercept=False, p_value_threshold=.1, normalize=False):
self.response = response # 回歸因變量
self.date_begin = date_begin # 起始日期
self.date_end = date_end # 終止日期
self.rolling_days = rolling_days # 滾動天數
self.intercept = intercept # 回歸方程是否帶常數項
self.p_value_threshold = p_value_threshold # p值顯示閾值
self.normalize = normalize # 是否將數據標准化后再進行回歸分析
if self.normalize: # 如果數據標准化,常數強制設置為0
self.intercept = False
# 起始日期間隔必須大於等於滾動天數
if (parser.parse(self.date_end) - parser.parse(self.date_begin)).days < self.rolling_days:
raise IOError('起始日期間隔必須大於等於滾動天數,請重新選擇起始日期或者調整滾動日期。')
def getData(self, file='業績相關數據2018-8-1.xlsx', variabls_in=None, variables_out=None):
'''
讀取數據。
注:該方法只適合讀取特定方式的數據,第一列應是datetime類型,第二列是因變量,其余是自變量。
參數
----
file : str
文件路徑及文件名。
variabls_in : list, 默認是None
需要用到的自變量。
variables_out : list, 默認是None
不用的自變量。
return
------
df_ : dataframe
分析用數據框,response在第一列。
'''
if variabls_in: # 如果有選入模型的自變量,強制將variable_out設置為None
variables_out = None
if file.split('.')[-1] == 'xlsx': # 根據不同文件格式讀取數據
df = pd.read_excel(file)
elif file.split('.')[-1] == 'txt':
df = pd.read_table(file)
else:
df = eval('pd.read_' + file.split('.')[-1] + '(file)')
df.index = df.iloc[:, 0] # 將日期變為索引
df = df.iloc[:, 1:]
df[df.isnull()] = 0 # 缺失值填充
df = df.astype(float) # 將數據框object格式轉換為float
# dateTransfer = np.vectorize(self._dateTransfer) # 向量化日期轉換函數
# df.index = dateTransfer(df.index) # 轉換索引日期格式
df.index = pd.DatetimeIndex(df.index) # 將索引轉換為datetime格式
if variabls_in:
df = pd.concat([df[df.columns[0]], df[variabls_in]], axis=1)
if variables_out:
for var in variables_out:
df.pop(var)
if self.normalize: # 數據標准化
df_std = StandardScaler().fit_transform(df)
self.df_ = pd.DataFrame(df_std, index=df.index, columns=df.columns)
else:
self.df_ = df
return self
def rollingOLS(self, df):
'''
滾動日期多元線性模型。
參數
----
df : dataframe
回歸分析用數據框,response在第一列。
return
------
coef : dataframe
回歸系數。
coef_pvalue : dataframe
回歸系數pvalue。
r2 : dataframe
回歸模型Rsquared 和 Rsquared_adj。
'''
df = df.loc[(df.index>=self.date_begin) & (df.index<=self.date_end), :] # 按照參數給定起始、截止時間選擇數據
df = df.sort_index(ascending=True) # 按日期升序排序
coef = {}
coef_pvalue = {}
r2 = {}
# 從起始日開始做回歸
for i in range(df.shape[0] - self.rolling_days):
date = df.index[i+self.rolling_days]
data = df.iloc[i:i+self.rolling_days, :]
X = data.iloc[:, 1:]
y = data.iloc[:, 0]
# 線性回歸模型擬合
model = sm.OLS(y, X, hasconst=self.intercept)
lr = model.fit()
# 按字典格式保存系數、pvalue、R2
coef[date] = lr.params
coef_pvalue[date] = lr.pvalues
r2[date] = []
r2[date].append(lr.rsquared)
r2[date].append(lr.rsquared_adj)
# 系數字典轉化為數據框,並按日期升序排序
coef = pd.DataFrame.from_dict(coef, orient='index')
coef = coef.sort_index(ascending=True)
# 系數pvalue轉化為數據框,並按日期升序排序
coef_pvalue = pd.DataFrame.from_dict(coef_pvalue, orient='index')
coef_pvalue = coef_pvalue.sort_index(ascending=True)
# R2轉化為數據框,並按日期升序排序
r2 = pd.DataFrame.from_dict(r2, orient='index')
r2.columns = ['R_squred','R_squred_adj']
r2 = r2.sort_index(ascending=True)
return coef, coef_pvalue, r2
def _dateTransfer(self, date):
'''
定義日期轉換函數。
參數
----
date : str
需要轉換的日期數據。
return
------
date : datetime
日期。
'''
return parser.parse(date).strftime('%Y-%m-%d')
def fit(self, feat_selected=None):
'''
多元回歸分析並保存數據。
參數
----
feat_selected : list, 默認是None
分析用的特征列表。
return
------
coef_ : dataframe
回歸系數。
coef_pvalue_ : dataframe
回歸系數pvalue。
r2_ : dataframe
回歸模型Rsquared 和 Rsquared_adj。
'''
if feat_selected is not None:
df = pd.concat([self.df_.iloc[:, 0], self.df_[feat_selected]], axis=1)
else:
df = self.df_
# 滾動回歸分析
self.coef_, self.coef_pvalue_, self.r2_ = self.rollingOLS(df)
# 存儲分析數據表
self.coef_.to_excel('coef.xlsx')
self.coef_pvalue_.to_excel('coef_pvalue.xlsx')
self.r2_.to_excel('r2.xlsx')
return self
def coefPlots(self, width_subplot=12, height_subplot=5, columns_subplots=3):
'''
畫滾動回歸系數及pvalue圖。
參數
----
width_subplot : int
子圖寬度。
height_subplot : int
子圖高度。
columns_subplots : int
子圖列數。
'''
num_subplots = self.coef_.shape[1] + 1 # 確定子圖個數
# 確定子圖行數
if num_subplots % columns_subplots == 0: # 余數為0
rows_subplots = num_subplots // columns_subplots # 取整
else:
rows_subplots = num_subplots // columns_subplots + 1
# 確定畫布寬、高
width_figure = columns_subplots * width_subplot
height_figure = rows_subplots * height_subplot
# 繪制滾動回歸R2圖
plt.figure(figsize=(width_figure, height_figure))
plt.subplot(rows_subplots, columns_subplots, 1)
plt.plot(self.r2_['R_squred'], color='r', lw=3, label='R_squred')
plt.plot(self.r2_['R_squred_adj'], color='g', lw=3, label='R_squred_adj')
plt.title('R2')
plt.legend()
# 在子圖中畫系滾動回歸系數及p值圖
for i, feature in enumerate(self.coef_.columns): # 系數圖
plt.subplot(rows_subplots, columns_subplots, i+2)
plt.plot(self.coef_[feature], color='red', lw=3, label='Beta')
for t, pvalue in zip(self.coef_pvalue_.index, self.coef_pvalue_[feature]): # p值圖
if pvalue <= self.p_value_threshold:
plt.vlines(t, ymin=np.min(self.coef_[feature]), ymax=np.max(self.coef_[feature]),
color='green', alpha=.3, lw=5, label='p_value')
#plt.xlabel('日期')
if ((i + columns_subplots + 1) % columns_subplots) & (i > 0) == 0:
plt.ylabel('coef')
plt.title(feature, fontproperties=zh_font)
# plt.savefig('rollingRegression.jpeg') # 保存圖片
plt.show()
return self
def coefEcharts(self):
'''
利用Echarts畫圖。
注:因為沒有vline方法,故用echarts畫出的圖文件過大,在瀏覽器中打開很慢。
參數
----
return
------
echarts_page_ : echarts_page
echarts_page 文件。
'''
self.echarts_page_ = Page(self.response + '回歸分析')
charts = []
zeros = np.zeros(self.coef_.shape[0])
line = Line('R2') # R2圖
bar = Bar()
line.add('R_squred', self.r2_.index, self.r2_['R_squred'], is_more_utils=True)
line.add('R_squred_adj', self.r2_.index, self.r2_['R_squred_adj'], is_more_utils=True)
charts.append(line)
for i, feature in enumerate(self.coef_.columns):
min_num = np.min(self.coef_[feature])
max_num = np.max(self.coef_[feature])
line = Line(feature)
bar = Bar()
ol = Overlap()
line.add('coef', self.coef_.index, self.coef_[feature], is_more_utils=True) # 系數圖
#line.on()
for t, pvalue in zip(self.coef_pvalue_.index, self.coef_pvalue_[feature]): # p值圖
if pvalue <= self.p_value_threshold:
min_array, max_array = copy.deepcopy(zeros), copy.deepcopy(zeros)
min_array[self.coef_.index==t] = min_num
max_array[self.coef_.index==t] = max_num
bar.add('p-value', self.coef_.index, min_array)
bar.add('p-value', self.coef_.index, max_array)
ol.add(line)
ol.add(bar)
charts.append(ol)
self.echarts_page_.add(charts)
self.echarts_page_.render() # 保存格式為HTML, 保存地址為設定的全局path
return self