RollingRegression(滾動回歸分析)之Python實現


# -*- coding: utf-8 -*-
"""
Created on Sat Aug 18 11:08:38 2018

@author: acadsoc
"""

import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from pyecharts import Bar, Line, Page, Overlap
import statsmodels.api as sm
from sklearn.preprocessing import StandardScaler
# import pymssql
from dateutil import parser
import copy
import os
import sys
from featureSelection import featureSelection

plt.style.use('ggplot') # 設置ggplot2畫圖風格
# 根據不同平台設置其中文字體路徑
if sys.platform == 'linux':
    zh_font = matplotlib.font_manager.FontProperties(
        fname='path/anaconda3/lib/python3.6/site-packages/matplotlib/mpl-data/fonts/ttf/STZHONGS.TTF')
else:
    zh_font = matplotlib.font_manager.FontProperties(fname='C:\Windows\Fonts\STZHONGS.ttf')  # 設置中文字體

# 根據不同平台設定工作目錄
if sys.platform == 'linux':
    os.chdir(path) # Linux path
else:
    os.chdir(path) # Windows path

class rollingRegression():
    '''
    滾動多元回歸分析類。
    
    參數
    ----
    response : str
        回歸因變量。
    date_begin : datetime
        起始日期。
    date_end : datetime
        截止日期。
    rolling_days : int
        滾動天數。
    intercept : bool
        回歸方程是否帶常數項。
    p_value_threshold : float
        回歸系數按p值顯示閾值。
    normalize : bool
        數據是否標准化。
        
    屬性
    ----
    coef_ : dataframe
        回歸系數。
    coef_pvalue_ : dataframe
        回歸系數pvalue。
    r2_ : dataframe
        回歸模型Rsquared 和 Rsquared_adj。
    echarts_page_ : echarts_page
        echarts_page 文件。       
    '''
    def __init__(self, response='新單數', date_begin='2018-01-01', date_end='2018-07-31', rolling_days=30,
                 intercept=False, p_value_threshold=.1, normalize=False):
        self.response = response  # 回歸因變量
        self.date_begin = date_begin  # 起始日期
        self.date_end = date_end  # 終止日期
        self.rolling_days = rolling_days  # 滾動天數
        self.intercept = intercept  # 回歸方程是否帶常數項
        self.p_value_threshold = p_value_threshold  # p值顯示閾值
        self.normalize = normalize  # 是否將數據標准化后再進行回歸分析
        if self.normalize:  # 如果數據標准化,常數強制設置為0
            self.intercept = False
        # 起始日期間隔必須大於等於滾動天數
        if (parser.parse(self.date_end) - parser.parse(self.date_begin)).days < self.rolling_days:
            raise IOError('起始日期間隔必須大於等於滾動天數,請重新選擇起始日期或者調整滾動日期。')    
            
    def getData(self, file='業績相關數據2018-8-1.xlsx', variabls_in=None, variables_out=None):
        '''
        讀取數據。
        注:該方法只適合讀取特定方式的數據,第一列應是datetime類型,第二列是因變量,其余是自變量。
        
        參數
        ----
        file : str
            文件路徑及文件名。
        variabls_in : list, 默認是None
            需要用到的自變量。
        variables_out : list, 默認是None
            不用的自變量。
            
        return
        ------
        df_ : dataframe
            分析用數據框,response在第一列。
        '''
        if variabls_in:   # 如果有選入模型的自變量,強制將variable_out設置為None
            variables_out = None
        
        if file.split('.')[-1] == 'xlsx':  # 根據不同文件格式讀取數據        
            df = pd.read_excel(file)  
        elif file.split('.')[-1] == 'txt':
            df = pd.read_table(file)
        else:
            df = eval('pd.read_' + file.split('.')[-1] + '(file)')

        df.index = df.iloc[:, 0]  # 將日期變為索引
        df = df.iloc[:, 1:]             
        df[df.isnull()] = 0  # 缺失值填充        
        df = df.astype(float)  # 將數據框object格式轉換為float
        # dateTransfer = np.vectorize(self._dateTransfer)   # 向量化日期轉換函數                
        # df.index = dateTransfer(df.index) # 轉換索引日期格式          
        df.index = pd.DatetimeIndex(df.index)   # 將索引轉換為datetime格式
        
        if variabls_in:
            df = pd.concat([df[df.columns[0]], df[variabls_in]], axis=1)
        
        if variables_out:
            for var in variables_out:
                df.pop(var)            
       
        if self.normalize:   # 數據標准化     
            df_std = StandardScaler().fit_transform(df)
            self.df_ = pd.DataFrame(df_std, index=df.index, columns=df.columns)    
        else:
            self.df_ = df
        
        return self
    
    def rollingOLS(self, df):
        '''
        滾動日期多元線性模型。
        
        參數
        ----
        df : dataframe
            回歸分析用數據框,response在第一列。
            
        return
        ------
        coef : dataframe
            回歸系數。
        coef_pvalue : dataframe
            回歸系數pvalue。
        r2 : dataframe
            回歸模型Rsquared 和 Rsquared_adj。
        '''
        df = df.loc[(df.index>=self.date_begin) & (df.index<=self.date_end), :]   # 按照參數給定起始、截止時間選擇數據
        df = df.sort_index(ascending=True)  # 按日期升序排序
        coef = {}
        coef_pvalue = {}
        r2 = {}

        # 從起始日開始做回歸
        for i in range(df.shape[0] - self.rolling_days):
            date = df.index[i+self.rolling_days]   
            data = df.iloc[i:i+self.rolling_days, :]
            X = data.iloc[:, 1:]
            y = data.iloc[:, 0]        
            # 線性回歸模型擬合    
            model = sm.OLS(y, X, hasconst=self.intercept)
            lr = model.fit()

            # 按字典格式保存系數、pvalue、R2
            coef[date] = lr.params            
            coef_pvalue[date] = lr.pvalues
            r2[date] = []
            r2[date].append(lr.rsquared)
            r2[date].append(lr.rsquared_adj)

        # 系數字典轉化為數據框,並按日期升序排序
        coef = pd.DataFrame.from_dict(coef, orient='index')
        coef = coef.sort_index(ascending=True)

        # 系數pvalue轉化為數據框,並按日期升序排序
        coef_pvalue = pd.DataFrame.from_dict(coef_pvalue, orient='index')
        coef_pvalue = coef_pvalue.sort_index(ascending=True)

        # R2轉化為數據框,並按日期升序排序
        r2 = pd.DataFrame.from_dict(r2, orient='index')
        r2.columns = ['R_squred','R_squred_adj']
        r2 = r2.sort_index(ascending=True)
        return coef, coef_pvalue, r2
    
    def _dateTransfer(self, date):
        '''
        定義日期轉換函數。
        
        參數
        ----
        date : str
            需要轉換的日期數據。
        
        return
        ------
        date : datetime
            日期。
        '''
        return parser.parse(date).strftime('%Y-%m-%d')    
    
    def fit(self, feat_selected=None):
        '''
        多元回歸分析並保存數據。
        
        參數
        ----
        feat_selected : list, 默認是None
            分析用的特征列表。
            
        return
        ------
        coef_ : dataframe
            回歸系數。
        coef_pvalue_ : dataframe
            回歸系數pvalue。
        r2_ : dataframe
            回歸模型Rsquared 和 Rsquared_adj。
        '''
        if feat_selected is not None:
            df = pd.concat([self.df_.iloc[:, 0], self.df_[feat_selected]], axis=1)
        else:
            df = self.df_
        # 滾動回歸分析        
        self.coef_, self.coef_pvalue_, self.r2_ = self.rollingOLS(df)  
        # 存儲分析數據表
        self.coef_.to_excel('coef.xlsx')
        self.coef_pvalue_.to_excel('coef_pvalue.xlsx')
        self.r2_.to_excel('r2.xlsx')        
        return self    
    
    def coefPlots(self, width_subplot=12, height_subplot=5, columns_subplots=3):
        '''
        畫滾動回歸系數及pvalue圖。
        
        參數
        ----
        width_subplot : int
            子圖寬度。
        height_subplot : int
            子圖高度。
        columns_subplots : int
            子圖列數。
        '''
        num_subplots = self.coef_.shape[1] + 1  # 確定子圖個數
        # 確定子圖行數
        if num_subplots % columns_subplots == 0: # 余數為0
            rows_subplots = num_subplots // columns_subplots  # 取整
        else:
            rows_subplots = num_subplots // columns_subplots + 1
        # 確定畫布寬、高
        width_figure = columns_subplots * width_subplot
        height_figure = rows_subplots * height_subplot
        
        # 繪制滾動回歸R2圖
        plt.figure(figsize=(width_figure, height_figure))
        plt.subplot(rows_subplots, columns_subplots, 1)
        plt.plot(self.r2_['R_squred'], color='r', lw=3, label='R_squred')
        plt.plot(self.r2_['R_squred_adj'], color='g', lw=3, label='R_squred_adj')
        plt.title('R2')
        plt.legend()
        # 在子圖中畫系滾動回歸系數及p值圖
        for i, feature in enumerate(self.coef_.columns):  # 系數圖
            plt.subplot(rows_subplots, columns_subplots, i+2)
            plt.plot(self.coef_[feature], color='red', lw=3, label='Beta')

            for t, pvalue in zip(self.coef_pvalue_.index, self.coef_pvalue_[feature]):  # p值圖
                if pvalue <= self.p_value_threshold:
                    plt.vlines(t, ymin=np.min(self.coef_[feature]), ymax=np.max(self.coef_[feature]),
                               color='green', alpha=.3, lw=5, label='p_value')

            #plt.xlabel('日期')
            if ((i + columns_subplots + 1) % columns_subplots) & (i > 0) == 0:
                plt.ylabel('coef')
            plt.title(feature, fontproperties=zh_font)
        # plt.savefig('rollingRegression.jpeg') # 保存圖片
        plt.show()
        return self    
   
    def coefEcharts(self):
        '''
        利用Echarts畫圖。
        注:因為沒有vline方法,故用echarts畫出的圖文件過大,在瀏覽器中打開很慢。
        
        參數
        ----
        
        return
        ------
        echarts_page_ : echarts_page
            echarts_page 文件。
        '''
        self.echarts_page_ = Page(self.response + '回歸分析')
        charts = []
        zeros = np.zeros(self.coef_.shape[0])

        line = Line('R2')  # R2圖
        bar = Bar()
        line.add('R_squred', self.r2_.index, self.r2_['R_squred'], is_more_utils=True)
        line.add('R_squred_adj', self.r2_.index, self.r2_['R_squred_adj'], is_more_utils=True)
        charts.append(line)

        for i, feature in enumerate(self.coef_.columns):  
            min_num = np.min(self.coef_[feature])
            max_num = np.max(self.coef_[feature])
            line = Line(feature)
            bar = Bar()
            ol = Overlap()
            line.add('coef', self.coef_.index, self.coef_[feature], is_more_utils=True) # 系數圖
            #line.on()
            for t, pvalue in zip(self.coef_pvalue_.index, self.coef_pvalue_[feature]):  # p值圖
                if pvalue <= self.p_value_threshold:
                    min_array, max_array = copy.deepcopy(zeros), copy.deepcopy(zeros)
                    min_array[self.coef_.index==t] = min_num
                    max_array[self.coef_.index==t] = max_num
                    bar.add('p-value', self.coef_.index, min_array)
                    bar.add('p-value', self.coef_.index, max_array)

            ol.add(line)
            ol.add(bar)
            charts.append(ol)

        self.echarts_page_.add(charts)
        self.echarts_page_.render()  # 保存格式為HTML, 保存地址為設定的全局path
        return self


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM