main
# -*- coding:utf-8 -*- #正常顯示畫圖時出現的中文和負號 from pylab import mpl mpl.rcParams['font.sans-serif']=['SimHei'] import backtrader as bt import numpy as np from testpkg import backtest #策略 class GridStrategy(bt.Strategy): params = ( ("printlog", True), ("top", 4.2), ("buttom", 3.5), ) def __init__(self): self.mid = (self.p.top + self.p.buttom) / 2.0 # 百分比區間計算 # 這里多1/2,是因為arange函數是左閉右開區間。 perc_level = [x for x in np.arange(1 + 0.02 * 5, 1 - 0.02 * 5 - 0.02 / 2, -0.02)] # 價格區間 # print(self.mid) self.price_levels = [self.mid * x for x in perc_level] # 記錄上一次穿越的網格 self.last_price_index = None # 總手續費 self.comm = 0.0 def next(self): # print(self.last_price_index) # 開倉 if self.last_price_index == None: # print("b", len(self.price_levels)) for i in range(len(self.price_levels)): price = self.data.close[0] # print("c", i, price, self.price_levels[i][0]) if self.data.close[0] > self.price_levels[i]: self.last_price_index = i self.order_target_percent(target=i / (len(self.price_levels) - 1)) print("a") return # 調倉 else: signal = False while True: upper = None lower = None if self.last_price_index > 0: upper = self.price_levels[self.last_price_index - 1] if self.last_price_index < len(self.price_levels) - 1: lower = self.price_levels[self.last_price_index + 1] # 還不是最輕倉,繼續漲,再賣一檔 if upper != None and self.data.close > upper: self.last_price_index = self.last_price_index - 1 signal = True continue # 還不是最重倉,繼續跌,再買一檔 if lower != None and self.data.close < lower: self.last_price_index = self.last_price_index + 1 signal = True continue break if signal: self.long_short = None self.order_target_percent(target=self.last_price_index / (len(self.price_levels) - 1)) # 輸出交易記錄 def log(self, txt, dt=None, doprint=False): if self.params.printlog or doprint: dt = dt or self.datas[0].datetime.date(0) print('%s, %s' % (dt.isoformat(), txt)) def notify_order(self, order): # 有交易提交/被接受,啥也不做 if order.status in [order.Submitted, order.Accepted]: return # 交易完成,報告結果 if order.status in [order.Completed]: if order.isbuy(): self.log( '執行買入, 價格: %.2f, 成本: %.2f, 手續費 %.2f' % (order.executed.price, order.executed.value, order.executed.comm)) self.buyprice = order.executed.price self.comm += order.executed.comm elif order.status in [order.Canceled, order.Margin, order.Rejected]: self.log("交易失敗") self.order = None else: self.log( '執行賣出, 價格: %.2f, 成本: %.2f, 手續費 %.2f' % (order.executed.price, order.executed.value, order.executed.comm)) self.comm += order.executed.comm # 輸出手續費 def stop(self): self.log("手續費:%.2f 成本比例:%.5f" % (self.comm, self.comm / self.broker.getvalue())) if __name__ == "__main__": start = "2018-01-01" end = "2020-07-05" name = ["300etf"] code = ["510300"] backtest = backtest.BackTest(GridStrategy, start, end, code, name, 100000) result = backtest.run() # backtest.output() print(result)
backtest
# coding:utf-8 # 量化交易回測類 import backtrader as bt import backtrader.analyzers as btay import tushare as ts import os import pandas as pd import datetime import matplotlib.pyplot as plt import empyrical as ey import math import tushare as ts import numpy as np from scipy import stats from backtrader.utils.py3 import map from backtrader import Analyzer, TimeFrame from backtrader.mathsupport import average, standarddev from backtrader.analyzers import AnnualReturn import operator from pathlib import Path # 回測類 class BackTest: def __init__(self, strategy, start, end, code, name, cash=0.01, commission=0.0003, benchmarkCode="510300", bDraw=True): self.__cerebro = None self.__strategy = strategy self.__start = start self.__end = end self.__code = code self.__name = name self.__result = None self.__commission = commission self.__initcash = cash self.__backtestResult = pd.Series() self.__returns = pd.Series() self.__benchmarkCode = benchmarkCode self.__benchReturns = pd.Series() self.__benchFeed = None self.__bDraw = bDraw self.__start_date = None self.__end_date = None self._init() # 執行回測 def run(self): self.__backtestResult["期初賬戶總值"] = self.getValue() self.__results = self.__cerebro.run() self.__backtestResult["期末賬戶總值"] = self.getValue() self._Result() if self.__bDraw == True: self._drawResult() self.__returns = self._timeReturns(self.__results) self.__benchReturns = self._getBenchmarkReturns(self.__results) self._riskAnaly(self.__returns, self.__benchReturns, self.__backtestResult) return self.getResult() # 獲取賬戶總價值 def getValue(self): return self.__cerebro.broker.getvalue() # 獲取回測指標 def getResult(self): return self.__backtestResult # 獲取策略及基准策略收益率的序列 def getReturns(self): return self.__returns, self.__benchReturns # 執行參數優化的回測 def optRun(self, *args, **kwargs): self._optStrategy(*args, **kwargs) results = self.__cerebro.run() if len(kwargs) == 1: testResults = self._optResult(results, **kwargs) elif len(kwargs) > 1: testResults = self._optResultMore(results, **kwargs) self._init() return testResults # 輸出回測結果 def output(self): print("夏普比例:", self.__results[0].analyzers.sharpe.get_analysis()["sharperatio"]) print("年化收益率:", self.__results[0].analyzers.AR.get_analysis()) print("最大回撤:%.2f,最大回撤周期%d" % (self.__results[0].analyzers.DD.get_analysis().max.drawdown, self.__results[0].analyzers.DD.get_analysis().max.len)) print("總收益率:%.2f" % (self.__results[0].analyzers.RE.get_analysis()["rtot"])) # self.__results[0].analyzers.TA.pprint() # 進行參數優化 def _optStrategy(self, *args, **kwargs): self.__cerebro = bt.Cerebro(maxcpus=1) self.__cerebro.optstrategy(self.__strategy, *args, **kwargs) self._createDataFeeds() self._settingCerebro() # 真正進行初始化的地方 def _init(self): self.__cerebro = bt.Cerebro() self.__cerebro.addstrategy(self.__strategy) self._createDataFeeds() self._settingCerebro() # 設置cerebro def _settingCerebro(self): # 添加回撤觀察器 self.__cerebro.addobserver(bt.observers.DrawDown) # 添加基准觀察器 self.__cerebro.addobserver(bt.observers.Benchmark, data=self.__benchFeed, timeframe=bt.TimeFrame.NoTimeFrame) # 設置手續費 self.__cerebro.broker.setcommission(commission=self.__commission) # 設置初始資金 self.__cerebro.broker.setcash(self.__initcash) # 添加分析對象 self.__cerebro.addanalyzer(btay.SharpeRatio, _name="sharpe", riskfreerate=0.02, stddev_sample=True, annualize=True) self.__cerebro.addanalyzer(btay.AnnualReturn, _name="AR") self.__cerebro.addanalyzer(btay.DrawDown, _name="DD") self.__cerebro.addanalyzer(btay.Returns, _name="RE") self.__cerebro.addanalyzer(btay.TradeAnalyzer, _name="TA") self.__cerebro.addanalyzer(btay.TimeReturn, _name="TR") self.__cerebro.addanalyzer(btay.TimeReturn, _name="TR_Bench", data=self.__benchFeed) self.__cerebro.addanalyzer(btay.SQN, _name="SQN") # 建立數據源 def _createDataFeeds(self): # 建立回測數據源 for i in range(len(self.__code)): dataFeed = self._createDataFeedsProcess(self.__code[i], self.__name[i]) self.__cerebro.adddata(dataFeed, name=self.__name[i]) self.__benchFeed = self._createDataFeedsProcess(self.__benchmarkCode, "benchMark") self.__cerebro.adddata(self.__benchFeed, name="benchMark") # 建立數據源的具體過程 def _createDataFeedsProcess(self, code, name): df_data = self._getData(code) start_date = list(map(int, self.__start.split("-"))) end_date = list(map(int, self.__end.split("-"))) self.__start_date = datetime.datetime(start_date[0], start_date[1], start_date[2]) self.__end_date = datetime.datetime(end_date[0], end_date[1], end_date[2]) dataFeed = bt.feeds.PandasData(dataname=df_data, name=name, fromdate=datetime.datetime(start_date[0], start_date[1], start_date[2]), todate=datetime.datetime(end_date[0], end_date[1], end_date[2])) return dataFeed # 計算勝率信息 def _winInfo(self, trade_info, result): total_trade_num = trade_info["total"]["total"] if total_trade_num > 1: win_num = trade_info["won"]["total"] lost_num = trade_info["lost"]["total"] result["交易次數"] = total_trade_num result["勝率"] = win_num / total_trade_num result["敗率"] = lost_num / total_trade_num # 根據SQN值對策略做出評估 # 按照backtrader文檔寫的 def _judgeBySQN(self, sqn): result = None if sqn >= 1.6 and sqn <= 1.9: result = "低於平均" elif sqn > 1.9 and sqn <= 2.4: result = "平均水平" elif sqn > 2.4 and sqn <= 2.9: result = "良好" elif sqn > 2.9 and sqn <= 5.0: result = "優秀" elif sqn > 5.0 and sqn <= 6.9: result = "卓越" elif sqn > 6.9: result = "大神?" else: result = "很差" self.__backtestResult["策略評價(根據SQN)"] = result return result # 計算並保存回測結果指標 def _Result(self): self.__backtestResult["賬戶總額"] = self.getValue() self.__backtestResult["總收益率"] = self.__results[0].analyzers.RE.get_analysis()["rtot"] self.__backtestResult["年化收益率"] = self.__results[0].analyzers.RE.get_analysis()["rnorm"] # self.__backtestResult["交易成本"] = self.__cerebro.strats[0].getCommission() self.__backtestResult["夏普比率"] = self.__results[0].analyzers.sharpe.get_analysis()["sharperatio"] self.__backtestResult["最大回撤"] = self.__results[0].analyzers.DD.get_analysis().max.drawdown self.__backtestResult["最大回撤期間"] = self.__results[0].analyzers.DD.get_analysis().max.len self.__backtestResult["SQN"] = self.__results[0].analyzers.SQN.get_analysis()["sqn"] self._judgeBySQN(self.__backtestResult["SQN"]) # 計算勝率信息 trade_info = self.__results[0].analyzers.TA.get_analysis() self._winInfo(trade_info, self.__backtestResult) # 取得優化參數時的指標結果 def _getOptAnalysis(self, result): temp = dict() temp["總收益率"] = result[0].analyzers.RE.get_analysis()["rtot"] temp["年化收益率"] = result[0].analyzers.RE.get_analysis()["rnorm"] temp["夏普比率"] = result[0].analyzers.sharpe.get_analysis()["sharperatio"] temp["最大回撤"] = result[0].analyzers.DD.get_analysis().max.drawdown temp["最大回撤期間"] = result[0].analyzers.DD.get_analysis().max.len sqn = result[0].analyzers.SQN.get_analysis()["sqn"] temp["SQN"] = sqn temp["策略評價(根據SQN)"] = self._judgeBySQN(sqn) trade_info = self.__results[0].analyzers.TA.get_analysis() self._winInfo(trade_info, temp) return temp # 在優化多個參數時計算並保存回測結果 def _optResultMore(self, results, **kwargs): testResults = pd.DataFrame() i = 0 for key in kwargs: for value in kwargs[key]: temp = self._getOptAnalysis(results[i]) temp["參數名"] = key temp["參數值"] = value returns = self._timeReturns(results[i]) benchReturns = self._getBenchmarkReturns(results[i]) self._riskAnaly(returns, benchReturns, temp) testResults = testResults.append(temp, ignore_index=True) # testResults.set_index(["參數值"], inplace = True) return testResults # 在優化參數時計算並保存回測結果 def _optResult(self, results, **kwargs): testResults = pd.DataFrame() params = [] for k, v in kwargs.items(): for t in v: params.append(t) i = 0 for result in results: temp = self._getOptAnalysis(result) temp["參數名"] = k temp["參數值"] = params[i] i += 1 returns = self._timeReturns(result) benchReturns = self._getBenchmarkReturns(result) self._riskAnaly(returns, benchReturns, temp) testResults = testResults.append(temp, ignore_index=True) # testResults.set_index(["參數值"], inplace = True) return testResults # 計算收益率序列 def _timeReturns(self, result): return pd.Series(result[0].analyzers.TR.get_analysis()) # 運行基准策略,獲取基准收益值 def _getBenchmarkReturns(self, result): return pd.Series(result[0].analyzers.TR_Bench.get_analysis()) # 分析策略的風險指標 def _riskAnaly(self, returns, benchReturns, results): risk = riskAnalyzer(returns, benchReturns) result = risk.run() results["阿爾法"] = result["阿爾法"] results["貝塔"] = result["貝塔"] results["信息比例"] = result["信息比例"] results["策略波動率"] = result["策略波動率"] results["歐米伽"] = result["歐米伽"] # self.__backtestResult["夏普值"] = result["夏普值"] results["sortino"] = result["sortino"] results["calmar"] = result["calmar"] # 回測結果繪圖 def _drawResult(self): self.__cerebro.plot(numfigs=2) figname = type(self).__name__ + ".png" plt.savefig(figname) # 獲取數據 def _getData(self, code): filename = code + ".csv" path = "./data/" # 如果數據目錄不存在,創建目錄 if not os.path.exists(path): os.makedirs(path) # 已有數據文件,直接讀取數據 if os.path.exists(path + filename): df = pd.read_csv(path + filename) else: # 沒有數據文件,用tushare下載 df = ts.get_k_data(code, autype="qfq", start=self.__start, end=self.__end) df.to_csv(path + filename) df.index = pd.to_datetime(df.date) df['openinterest'] = 0 df = df[['open', 'high', 'low', 'close', 'volume', 'openinterest']] return df # 用empyrical庫計算風險指標 class riskAnalyzer: def __init__(self, returns, benchReturns, riskFreeRate=0.02): self.__returns = returns self.__benchReturns = benchReturns self.__risk_free = riskFreeRate self.__alpha = 0.0 self.__beta = 0.0 self.__info = 0.0 self.__vola = 0.0 self.__omega = 0.0 self.__sharpe = 0.0 self.__sortino = 0.0 self.__calmar = 0.0 def run(self): # 計算各指標 self._alpha_beta() self._info() self._vola() self._omega() self._sharpe() self._sortino() self._calmar() result = pd.Series(dtype="float64") result["阿爾法"] = self.__alpha result["貝塔"] = self.__beta result["信息比例"] = self.__info result["策略波動率"] = self.__vola result["歐米伽"] = self.__omega result["夏普值"] = self.__sharpe result["sortino"] = self.__sortino result["calmar"] = self.__calmar return result def _alpha_beta(self): self.__alpha, self.__beta = ey.alpha_beta(returns=self.__returns, factor_returns=self.__benchReturns, risk_free=self.__risk_free, annualization=1) def _info(self): self.__info = ey.excess_sharpe(returns=self.__returns, factor_returns=self.__benchReturns) def _vola(self): self.__vola = ey.annual_volatility(self.__returns, period='daily') def _omega(self): self.__omega = ey.omega_ratio(returns=self.__returns, risk_free=self.__risk_free) def _sharpe(self): self.__sharpe = ey.sharpe_ratio(returns=self.__returns, annualization=1) def _sortino(self): self.__sortino = ey.sortino_ratio(returns=self.__returns) def _calmar(self): self.__calmar = ey.calmar_ratio(returns=self.__returns) # 測試函數 def test(): # 構造測試數據 returns = pd.Series( index=pd.date_range("2017-03-10", "2017-03-19"), data=(-0.012143, 0.045350, 0.030957, 0.004902, 0.002341, -0.02103, 0.00148, 0.004820, -0.00023, 0.01201)) print(returns) benchmark_returns = pd.Series( index=pd.date_range("2017-03-10", "2017-03-19"), data=(-0.031940, 0.025350, -0.020957, -0.000902, 0.007341, -0.01103, 0.00248, 0.008820, -0.00123, 0.01091)) print(benchmark_returns) # 計算累積收益率 creturns = ey.cum_returns(returns) print("累積收益率\n", creturns) risk = riskAnalyzer(returns, benchmark_returns, riskFreeRate=0.01) results = risk.run() print(results) # 直接調用empyrical試試 alpha = ey.alpha(returns=returns, factor_returns=benchmark_returns, risk_free=0.01) calmar = ey.calmar_ratio(returns) print(alpha, calmar) # 自己計算阿爾法值 annual_return = ey.annual_return(returns) annual_bench = ey.annual_return(benchmark_returns) print(annual_return, annual_bench) alpha2 = (annual_return - 0.01) - results["貝塔"] * (annual_bench - 0.01) print(alpha2) # 自己計算阿爾法貝塔 def get_return(code, startdate, endate): df = ts.get_k_data(code, ktype="D", autype="qfq", start=startdate, end=endate) p1 = np.array(df.close[1:]) p0 = np.array(df.close[:-1]) logret = np.log(p1 / p0) rate = pd.DataFrame() rate[code] = logret rate.index = df["date"][1:] return rate def alpha_beta(code, startdate, endate): mkt_ret = get_return("sh", startdate, endate) stock_ret = get_return(code, startdate, endate) df = pd.merge(mkt_ret, stock_ret, left_index=True, right_index=True) x = df.iloc[:, 0] y = df.iloc[:, 1] beta, alpha, r_value, p_value, std_err = stats.linregress(x, y) return (alpha, beta) def stocks_alpha_beta(stocks, startdate, endate): df = pd.DataFrame() alpha = [] beta = [] for code in stocks.values(): a, b = alpha_beta(code, startdate, endate) alpha.append(float("%.4f" % a)) beta.append(float("%.4f" % b)) df["alpha"] = alpha df["beta"] = beta df.index = stocks.keys() return df startdate = "2017-01-01" endate = "2018-11-09" stocks = {'中國平安': '601318', '格力電器': '000651', '招商銀行': '600036', '恆生電子': '600570', '中信證券': '600030', '貴州茅台': '600519'} results = stocks_alpha_beta(stocks, startdate, endate) print("自己計算結果") print(results) # 用empyrical計算 def stocks_alpha_beta2(stocks, startdate, endate): df = pd.DataFrame() alpha = [] beta = [] for code in stocks.values(): a, b = empyrical_alpha_beta(code, startdate, endate) alpha.append(float("%.4f" % a)) beta.append(float("%.4f" % b)) df["alpha"] = alpha df["beta"] = beta df.index = stocks.keys() return df def empyrical_alpha_beta(code, startdate, endate): mkt_ret = get_return("sh", startdate, endate) stock_ret = get_return(code, startdate, endate) alpha, beta = ey.alpha_beta(returns=stock_ret, factor_returns=mkt_ret, annualization=1) return (alpha, beta) results2 = stocks_alpha_beta2(stocks, startdate, endate) print("empyrical計算結果") print(results2) print(results2["alpha"] / results["alpha"]) # 測試夏普值的計算 def testSharpe(): # 讀取數據 stock_data = pd.read_csv("stock_data.csv", parse_dates=["Date"], index_col=["Date"]).dropna() benchmark_data = pd.read_csv("benchmark_data.csv", parse_dates=["Date"], index_col=["Date"]).dropna() # 了解數據 print("Stocks\n") print(stock_data.info()) print(stock_data.head()) print("\nBenchmarks\n") print(benchmark_data.info()) print(benchmark_data.head()) # 輸出統計量 print(stock_data.describe()) print(benchmark_data.describe()) # 計算每日回報率 stock_returns = stock_data.pct_change() print(stock_returns.describe()) sp_returns = benchmark_data.pct_change() print(sp_returns.describe()) # 每日超額回報 excess_returns = pd.DataFrame() risk_free = 0.04 / 252.0 excess_returns["Amazon"] = stock_returns["Amazon"] - risk_free excess_returns["Facebook"] = stock_returns["Facebook"] - risk_free print(excess_returns.describe()) # 超額回報的均值 avg_excess_return = excess_returns.mean() print(avg_excess_return) # 超額回報的標准差 std_excess_return = excess_returns.std() print(std_excess_return) # 計算夏普比率 # 日夏普比率 daily_sharpe_ratio = avg_excess_return.div(std_excess_return) # 年化夏普比率 annual_factor = np.sqrt(252) annual_sharpe_ratio = daily_sharpe_ratio.mul(annual_factor) print("年化夏普比率\n", annual_sharpe_ratio) # 用empyrical算 sharpe = pd.DataFrame() a = ey.sharpe_ratio(stock_returns["Amazon"], risk_free=risk_free) # , annualization = 252) b = ey.sharpe_ratio(stock_returns["Facebook"], risk_free=risk_free) print("empyrical計算結果") print(a, b) print(a / annual_sharpe_ratio["Amazon"], b / annual_sharpe_ratio["Facebook"]) if __name__ == "__main__": # testpkg() testSharpe()