量化回測:backtrader回測封裝代碼


 

main

# -*- coding:utf-8 -*-


#正常顯示畫圖時出現的中文和負號
from pylab import mpl
mpl.rcParams['font.sans-serif']=['SimHei']
import backtrader as bt
import numpy as np
from testpkg import backtest
#策略
class GridStrategy(bt.Strategy):
    params = (
        ("printlog", True),
        ("top", 4.2),
        ("buttom", 3.5),
    )

    def __init__(self):
        self.mid = (self.p.top + self.p.buttom) / 2.0
        # 百分比區間計算
        # 這里多1/2,是因為arange函數是左閉右開區間。
        perc_level = [x for x in np.arange(1 + 0.02 * 5, 1 - 0.02 * 5 - 0.02 / 2, -0.02)]
        # 價格區間
        # print(self.mid)
        self.price_levels = [self.mid * x for x in perc_level]
        # 記錄上一次穿越的網格
        self.last_price_index = None
        # 總手續費
        self.comm = 0.0


    def next(self):
        # print(self.last_price_index)
        # 開倉
        if self.last_price_index == None:
            # print("b", len(self.price_levels))
            for i in range(len(self.price_levels)):
                price = self.data.close[0]
                # print("c", i, price, self.price_levels[i][0])
                if self.data.close[0] > self.price_levels[i]:
                    self.last_price_index = i
                    self.order_target_percent(target=i / (len(self.price_levels) - 1))
                    print("a")
                    return
        # 調倉
        else:
            signal = False
            while True:
                upper = None
                lower = None
                if self.last_price_index > 0:
                    upper = self.price_levels[self.last_price_index - 1]
                if self.last_price_index < len(self.price_levels) - 1:
                    lower = self.price_levels[self.last_price_index + 1]
                # 還不是最輕倉,繼續漲,再賣一檔
                if upper != None and self.data.close > upper:
                    self.last_price_index = self.last_price_index - 1
                    signal = True
                    continue
                # 還不是最重倉,繼續跌,再買一檔
                if lower != None and self.data.close < lower:
                    self.last_price_index = self.last_price_index + 1
                    signal = True
                    continue
                break
            if signal:
                self.long_short = None
                self.order_target_percent(target=self.last_price_index / (len(self.price_levels) - 1))


    # 輸出交易記錄
    def log(self, txt, dt=None, doprint=False):
        if self.params.printlog or doprint:
            dt = dt or self.datas[0].datetime.date(0)
            print('%s, %s' % (dt.isoformat(), txt))


    def notify_order(self, order):
        # 有交易提交/被接受,啥也不做
        if order.status in [order.Submitted, order.Accepted]:
            return
        # 交易完成,報告結果
        if order.status in [order.Completed]:
            if order.isbuy():
                self.log(
                    '執行買入, 價格: %.2f, 成本: %.2f, 手續費 %.2f' %
                    (order.executed.price,
                     order.executed.value,
                     order.executed.comm))
                self.buyprice = order.executed.price


            self.comm += order.executed.comm
        elif order.status in [order.Canceled, order.Margin, order.Rejected]:
            self.log("交易失敗")
            self.order = None
        else:
                self.log(
                    '執行賣出, 價格: %.2f, 成本: %.2f, 手續費 %.2f' %
                    (order.executed.price,
                    order.executed.value,
                    order.executed.comm))
                self.comm += order.executed.comm

    # 輸出手續費
    def stop(self):
        self.log("手續費:%.2f 成本比例:%.5f" % (self.comm, self.comm / self.broker.getvalue()))




if __name__ == "__main__":
    start = "2018-01-01"
    end = "2020-07-05"
    name = ["300etf"]
    code = ["510300"]
    backtest = backtest.BackTest(GridStrategy, start, end, code, name, 100000)
    result = backtest.run()
    # backtest.output()
    print(result)

backtest

# coding:utf-8
# 量化交易回測類


import backtrader as bt
import backtrader.analyzers as btay
import tushare as ts
import os
import pandas as pd
import datetime
import matplotlib.pyplot as plt
import empyrical as ey
import math
import tushare as ts
import numpy as np
from scipy import stats
from backtrader.utils.py3 import map
from backtrader import Analyzer, TimeFrame
from backtrader.mathsupport import average, standarddev
from backtrader.analyzers import AnnualReturn
import operator
from pathlib import Path


# 回測類
class BackTest:
    def __init__(self, strategy, start, end, code, name, cash=0.01, commission=0.0003, benchmarkCode="510300",
                 bDraw=True):
        self.__cerebro = None
        self.__strategy = strategy
        self.__start = start
        self.__end = end
        self.__code = code
        self.__name = name
        self.__result = None
        self.__commission = commission
        self.__initcash = cash
        self.__backtestResult = pd.Series()
        self.__returns = pd.Series()
        self.__benchmarkCode = benchmarkCode
        self.__benchReturns = pd.Series()
        self.__benchFeed = None
        self.__bDraw = bDraw
        self.__start_date = None
        self.__end_date = None
        self._init()

    # 執行回測
    def run(self):
        self.__backtestResult["期初賬戶總值"] = self.getValue()
        self.__results = self.__cerebro.run()
        self.__backtestResult["期末賬戶總值"] = self.getValue()
        self._Result()
        if self.__bDraw == True:
            self._drawResult()
        self.__returns = self._timeReturns(self.__results)
        self.__benchReturns = self._getBenchmarkReturns(self.__results)
        self._riskAnaly(self.__returns, self.__benchReturns, self.__backtestResult)
        return self.getResult()

    # 獲取賬戶總價值
    def getValue(self):
        return self.__cerebro.broker.getvalue()

    # 獲取回測指標
    def getResult(self):
        return self.__backtestResult

    # 獲取策略及基准策略收益率的序列
    def getReturns(self):
        return self.__returns, self.__benchReturns

    # 執行參數優化的回測
    def optRun(self, *args, **kwargs):
        self._optStrategy(*args, **kwargs)
        results = self.__cerebro.run()
        if len(kwargs) == 1:
            testResults = self._optResult(results, **kwargs)
        elif len(kwargs) > 1:
            testResults = self._optResultMore(results, **kwargs)
        self._init()
        return testResults

    # 輸出回測結果
    def output(self):
        print("夏普比例:", self.__results[0].analyzers.sharpe.get_analysis()["sharperatio"])
        print("年化收益率:", self.__results[0].analyzers.AR.get_analysis())
        print("最大回撤:%.2f,最大回撤周期%d" % (self.__results[0].analyzers.DD.get_analysis().max.drawdown,
                                      self.__results[0].analyzers.DD.get_analysis().max.len))
        print("總收益率:%.2f" % (self.__results[0].analyzers.RE.get_analysis()["rtot"]))
        # self.__results[0].analyzers.TA.pprint()

    # 進行參數優化
    def _optStrategy(self, *args, **kwargs):
        self.__cerebro = bt.Cerebro(maxcpus=1)
        self.__cerebro.optstrategy(self.__strategy, *args, **kwargs)
        self._createDataFeeds()
        self._settingCerebro()

    # 真正進行初始化的地方
    def _init(self):
        self.__cerebro = bt.Cerebro()
        self.__cerebro.addstrategy(self.__strategy)
        self._createDataFeeds()
        self._settingCerebro()

    # 設置cerebro
    def _settingCerebro(self):
        # 添加回撤觀察器
        self.__cerebro.addobserver(bt.observers.DrawDown)
        # 添加基准觀察器
        self.__cerebro.addobserver(bt.observers.Benchmark, data=self.__benchFeed, timeframe=bt.TimeFrame.NoTimeFrame)
        # 設置手續費
        self.__cerebro.broker.setcommission(commission=self.__commission)
        # 設置初始資金
        self.__cerebro.broker.setcash(self.__initcash)
        # 添加分析對象
        self.__cerebro.addanalyzer(btay.SharpeRatio, _name="sharpe", riskfreerate=0.02, stddev_sample=True,
                                   annualize=True)
        self.__cerebro.addanalyzer(btay.AnnualReturn, _name="AR")
        self.__cerebro.addanalyzer(btay.DrawDown, _name="DD")
        self.__cerebro.addanalyzer(btay.Returns, _name="RE")
        self.__cerebro.addanalyzer(btay.TradeAnalyzer, _name="TA")
        self.__cerebro.addanalyzer(btay.TimeReturn, _name="TR")
        self.__cerebro.addanalyzer(btay.TimeReturn, _name="TR_Bench", data=self.__benchFeed)
        self.__cerebro.addanalyzer(btay.SQN, _name="SQN")

    # 建立數據源
    def _createDataFeeds(self):
        # 建立回測數據源
        for i in range(len(self.__code)):
            dataFeed = self._createDataFeedsProcess(self.__code[i], self.__name[i])
            self.__cerebro.adddata(dataFeed, name=self.__name[i])
        self.__benchFeed = self._createDataFeedsProcess(self.__benchmarkCode, "benchMark")
        self.__cerebro.adddata(self.__benchFeed, name="benchMark")

    # 建立數據源的具體過程
    def _createDataFeedsProcess(self, code, name):
        df_data = self._getData(code)
        start_date = list(map(int, self.__start.split("-")))
        end_date = list(map(int, self.__end.split("-")))
        self.__start_date = datetime.datetime(start_date[0], start_date[1], start_date[2])
        self.__end_date = datetime.datetime(end_date[0], end_date[1], end_date[2])
        dataFeed = bt.feeds.PandasData(dataname=df_data, name=name,
                                       fromdate=datetime.datetime(start_date[0], start_date[1], start_date[2]),
                                       todate=datetime.datetime(end_date[0], end_date[1], end_date[2]))
        return dataFeed

    # 計算勝率信息
    def _winInfo(self, trade_info, result):
        total_trade_num = trade_info["total"]["total"]
        if total_trade_num > 1:
            win_num = trade_info["won"]["total"]
            lost_num = trade_info["lost"]["total"]
            result["交易次數"] = total_trade_num
            result["勝率"] = win_num / total_trade_num
            result["敗率"] = lost_num / total_trade_num

    # 根據SQN值對策略做出評估
    # 按照backtrader文檔寫的
    def _judgeBySQN(self, sqn):
        result = None
        if sqn >= 1.6 and sqn <= 1.9:
            result = "低於平均"
        elif sqn > 1.9 and sqn <= 2.4:
            result = "平均水平"
        elif sqn > 2.4 and sqn <= 2.9:
            result = "良好"
        elif sqn > 2.9 and sqn <= 5.0:
            result = "優秀"
        elif sqn > 5.0 and sqn <= 6.9:
            result = "卓越"
        elif sqn > 6.9:
            result = "大神?"
        else:
            result = "很差"
        self.__backtestResult["策略評價(根據SQN)"] = result
        return result

    # 計算並保存回測結果指標
    def _Result(self):
        self.__backtestResult["賬戶總額"] = self.getValue()
        self.__backtestResult["總收益率"] = self.__results[0].analyzers.RE.get_analysis()["rtot"]
        self.__backtestResult["年化收益率"] = self.__results[0].analyzers.RE.get_analysis()["rnorm"]
        # self.__backtestResult["交易成本"] = self.__cerebro.strats[0].getCommission()
        self.__backtestResult["夏普比率"] = self.__results[0].analyzers.sharpe.get_analysis()["sharperatio"]
        self.__backtestResult["最大回撤"] = self.__results[0].analyzers.DD.get_analysis().max.drawdown
        self.__backtestResult["最大回撤期間"] = self.__results[0].analyzers.DD.get_analysis().max.len
        self.__backtestResult["SQN"] = self.__results[0].analyzers.SQN.get_analysis()["sqn"]
        self._judgeBySQN(self.__backtestResult["SQN"])

        # 計算勝率信息
        trade_info = self.__results[0].analyzers.TA.get_analysis()
        self._winInfo(trade_info, self.__backtestResult)

    # 取得優化參數時的指標結果
    def _getOptAnalysis(self, result):
        temp = dict()
        temp["總收益率"] = result[0].analyzers.RE.get_analysis()["rtot"]
        temp["年化收益率"] = result[0].analyzers.RE.get_analysis()["rnorm"]
        temp["夏普比率"] = result[0].analyzers.sharpe.get_analysis()["sharperatio"]
        temp["最大回撤"] = result[0].analyzers.DD.get_analysis().max.drawdown
        temp["最大回撤期間"] = result[0].analyzers.DD.get_analysis().max.len
        sqn = result[0].analyzers.SQN.get_analysis()["sqn"]
        temp["SQN"] = sqn
        temp["策略評價(根據SQN)"] = self._judgeBySQN(sqn)
        trade_info = self.__results[0].analyzers.TA.get_analysis()
        self._winInfo(trade_info, temp)
        return temp

    # 在優化多個參數時計算並保存回測結果
    def _optResultMore(self, results, **kwargs):
        testResults = pd.DataFrame()
        i = 0
        for key in kwargs:
            for value in kwargs[key]:
                temp = self._getOptAnalysis(results[i])
                temp["參數名"] = key
                temp["參數值"] = value
                returns = self._timeReturns(results[i])
                benchReturns = self._getBenchmarkReturns(results[i])
                self._riskAnaly(returns, benchReturns, temp)
                testResults = testResults.append(temp, ignore_index=True)
            # testResults.set_index(["參數值"], inplace = True)
        return testResults

    # 在優化參數時計算並保存回測結果
    def _optResult(self, results, **kwargs):
        testResults = pd.DataFrame()
        params = []
        for k, v in kwargs.items():
            for t in v:
                params.append(t)
        i = 0
        for result in results:
            temp = self._getOptAnalysis(result)
            temp["參數名"] = k
            temp["參數值"] = params[i]
            i += 1
            returns = self._timeReturns(result)
            benchReturns = self._getBenchmarkReturns(result)
            self._riskAnaly(returns, benchReturns, temp)
            testResults = testResults.append(temp, ignore_index=True)
        # testResults.set_index(["參數值"], inplace = True)
        return testResults

    # 計算收益率序列
    def _timeReturns(self, result):
        return pd.Series(result[0].analyzers.TR.get_analysis())

    # 運行基准策略,獲取基准收益值
    def _getBenchmarkReturns(self, result):
        return pd.Series(result[0].analyzers.TR_Bench.get_analysis())

    # 分析策略的風險指標
    def _riskAnaly(self, returns, benchReturns, results):
        risk = riskAnalyzer(returns, benchReturns)
        result = risk.run()
        results["阿爾法"] = result["阿爾法"]
        results["貝塔"] = result["貝塔"]
        results["信息比例"] = result["信息比例"]
        results["策略波動率"] = result["策略波動率"]
        results["歐米伽"] = result["歐米伽"]
        # self.__backtestResult["夏普值"] = result["夏普值"]
        results["sortino"] = result["sortino"]
        results["calmar"] = result["calmar"]

    # 回測結果繪圖
    def _drawResult(self):
        self.__cerebro.plot(numfigs=2)
        figname = type(self).__name__ + ".png"
        plt.savefig(figname)

    # 獲取數據
    def _getData(self, code):
        filename = code + ".csv"
        path = "./data/"
        # 如果數據目錄不存在,創建目錄
        if not os.path.exists(path):
            os.makedirs(path)
        # 已有數據文件,直接讀取數據
        if os.path.exists(path + filename):
            df = pd.read_csv(path + filename)
        else:  # 沒有數據文件,用tushare下載
            df = ts.get_k_data(code, autype="qfq", start=self.__start, end=self.__end)
            df.to_csv(path + filename)
        df.index = pd.to_datetime(df.date)
        df['openinterest'] = 0
        df = df[['open', 'high', 'low', 'close', 'volume', 'openinterest']]
        return df


# 用empyrical庫計算風險指標
class riskAnalyzer:
    def __init__(self, returns, benchReturns, riskFreeRate=0.02):
        self.__returns = returns
        self.__benchReturns = benchReturns
        self.__risk_free = riskFreeRate
        self.__alpha = 0.0
        self.__beta = 0.0
        self.__info = 0.0
        self.__vola = 0.0
        self.__omega = 0.0
        self.__sharpe = 0.0
        self.__sortino = 0.0
        self.__calmar = 0.0

    def run(self):
        # 計算各指標
        self._alpha_beta()
        self._info()
        self._vola()
        self._omega()
        self._sharpe()
        self._sortino()
        self._calmar()
        result = pd.Series(dtype="float64")
        result["阿爾法"] = self.__alpha
        result["貝塔"] = self.__beta
        result["信息比例"] = self.__info
        result["策略波動率"] = self.__vola
        result["歐米伽"] = self.__omega
        result["夏普值"] = self.__sharpe
        result["sortino"] = self.__sortino
        result["calmar"] = self.__calmar
        return result

    def _alpha_beta(self):
        self.__alpha, self.__beta = ey.alpha_beta(returns=self.__returns, factor_returns=self.__benchReturns,
                                                  risk_free=self.__risk_free, annualization=1)

    def _info(self):
        self.__info = ey.excess_sharpe(returns=self.__returns, factor_returns=self.__benchReturns)

    def _vola(self):
        self.__vola = ey.annual_volatility(self.__returns, period='daily')

    def _omega(self):
        self.__omega = ey.omega_ratio(returns=self.__returns, risk_free=self.__risk_free)

    def _sharpe(self):
        self.__sharpe = ey.sharpe_ratio(returns=self.__returns, annualization=1)

    def _sortino(self):
        self.__sortino = ey.sortino_ratio(returns=self.__returns)

    def _calmar(self):
        self.__calmar = ey.calmar_ratio(returns=self.__returns)


# 測試函數
def test():
    # 構造測試數據
    returns = pd.Series(
        index=pd.date_range("2017-03-10", "2017-03-19"),
        data=(-0.012143, 0.045350, 0.030957, 0.004902, 0.002341, -0.02103, 0.00148, 0.004820, -0.00023, 0.01201))
    print(returns)
    benchmark_returns = pd.Series(
        index=pd.date_range("2017-03-10", "2017-03-19"),
        data=(-0.031940, 0.025350, -0.020957, -0.000902, 0.007341, -0.01103, 0.00248, 0.008820, -0.00123, 0.01091))
    print(benchmark_returns)
    # 計算累積收益率
    creturns = ey.cum_returns(returns)
    print("累積收益率\n", creturns)
    risk = riskAnalyzer(returns, benchmark_returns, riskFreeRate=0.01)
    results = risk.run()
    print(results)
    # 直接調用empyrical試試
    alpha = ey.alpha(returns=returns, factor_returns=benchmark_returns, risk_free=0.01)
    calmar = ey.calmar_ratio(returns)
    print(alpha, calmar)
    # 自己計算阿爾法值
    annual_return = ey.annual_return(returns)
    annual_bench = ey.annual_return(benchmark_returns)
    print(annual_return, annual_bench)
    alpha2 = (annual_return - 0.01) - results["貝塔"] * (annual_bench - 0.01)
    print(alpha2)

    # 自己計算阿爾法貝塔
    def get_return(code, startdate, endate):
        df = ts.get_k_data(code, ktype="D", autype="qfq", start=startdate, end=endate)
        p1 = np.array(df.close[1:])
        p0 = np.array(df.close[:-1])
        logret = np.log(p1 / p0)
        rate = pd.DataFrame()
        rate[code] = logret
        rate.index = df["date"][1:]
        return rate

    def alpha_beta(code, startdate, endate):
        mkt_ret = get_return("sh", startdate, endate)
        stock_ret = get_return(code, startdate, endate)
        df = pd.merge(mkt_ret, stock_ret, left_index=True, right_index=True)
        x = df.iloc[:, 0]
        y = df.iloc[:, 1]
        beta, alpha, r_value, p_value, std_err = stats.linregress(x, y)
        return (alpha, beta)

    def stocks_alpha_beta(stocks, startdate, endate):
        df = pd.DataFrame()
        alpha = []
        beta = []
        for code in stocks.values():
            a, b = alpha_beta(code, startdate, endate)
            alpha.append(float("%.4f" % a))
            beta.append(float("%.4f" % b))
        df["alpha"] = alpha
        df["beta"] = beta
        df.index = stocks.keys()
        return df

    startdate = "2017-01-01"
    endate = "2018-11-09"
    stocks = {'中國平安': '601318', '格力電器': '000651', '招商銀行': '600036', '恆生電子': '600570', '中信證券': '600030',
              '貴州茅台': '600519'}
    results = stocks_alpha_beta(stocks, startdate, endate)
    print("自己計算結果")
    print(results)

    # 用empyrical計算
    def stocks_alpha_beta2(stocks, startdate, endate):
        df = pd.DataFrame()
        alpha = []
        beta = []
        for code in stocks.values():
            a, b = empyrical_alpha_beta(code, startdate, endate)
            alpha.append(float("%.4f" % a))
            beta.append(float("%.4f" % b))
        df["alpha"] = alpha
        df["beta"] = beta
        df.index = stocks.keys()
        return df

    def empyrical_alpha_beta(code, startdate, endate):
        mkt_ret = get_return("sh", startdate, endate)
        stock_ret = get_return(code, startdate, endate)
        alpha, beta = ey.alpha_beta(returns=stock_ret, factor_returns=mkt_ret, annualization=1)
        return (alpha, beta)

    results2 = stocks_alpha_beta2(stocks, startdate, endate)
    print("empyrical計算結果")
    print(results2)
    print(results2["alpha"] / results["alpha"])


# 測試夏普值的計算
def testSharpe():
    # 讀取數據
    stock_data = pd.read_csv("stock_data.csv", parse_dates=["Date"], index_col=["Date"]).dropna()
    benchmark_data = pd.read_csv("benchmark_data.csv", parse_dates=["Date"], index_col=["Date"]).dropna()
    # 了解數據
    print("Stocks\n")
    print(stock_data.info())
    print(stock_data.head())
    print("\nBenchmarks\n")
    print(benchmark_data.info())
    print(benchmark_data.head())
    # 輸出統計量
    print(stock_data.describe())
    print(benchmark_data.describe())
    # 計算每日回報率
    stock_returns = stock_data.pct_change()
    print(stock_returns.describe())
    sp_returns = benchmark_data.pct_change()
    print(sp_returns.describe())
    # 每日超額回報
    excess_returns = pd.DataFrame()
    risk_free = 0.04 / 252.0
    excess_returns["Amazon"] = stock_returns["Amazon"] - risk_free
    excess_returns["Facebook"] = stock_returns["Facebook"] - risk_free
    print(excess_returns.describe())
    # 超額回報的均值
    avg_excess_return = excess_returns.mean()
    print(avg_excess_return)
    # 超額回報的標准差
    std_excess_return = excess_returns.std()
    print(std_excess_return)
    # 計算夏普比率
    # 日夏普比率
    daily_sharpe_ratio = avg_excess_return.div(std_excess_return)
    # 年化夏普比率
    annual_factor = np.sqrt(252)
    annual_sharpe_ratio = daily_sharpe_ratio.mul(annual_factor)
    print("年化夏普比率\n", annual_sharpe_ratio)

    # 用empyrical算
    sharpe = pd.DataFrame()
    a = ey.sharpe_ratio(stock_returns["Amazon"], risk_free=risk_free)  # , annualization = 252)
    b = ey.sharpe_ratio(stock_returns["Facebook"], risk_free=risk_free)
    print("empyrical計算結果")
    print(a, b)
    print(a / annual_sharpe_ratio["Amazon"], b / annual_sharpe_ratio["Facebook"])


if __name__ == "__main__":
    # testpkg()
    testSharpe()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM