如何用時序數據庫處理Tushare金融數據?


本文將介紹如何把Tushare的滬深股票2008年到2017年的日線行情數據和每日指標數據導入到 DolphinDB database,並使用DolphinDB進行金融分析。Tushare是金融大數據開放社區,擁有豐富的金融數據,如股票、基金、期貨、數字貨幣等行情數據,為量化從業人員和金融相關研究人員免費提供金融數據。

DolphinDB  是新一代的時序數據庫,不僅可以作為分布式數據倉庫或者內存數據庫來使用,而且自帶豐富的計算工具,可以作為研究工具或研究平台來使用,非常適用於量化金融、物聯網等領域的海量數據分析。量化金融領域的不少問題,如交易信號研究、策略回測、交易成本分析、股票相關性研究、市場風險控制等,都可以用DolphinDB來解決。

1. 數據概況

Tushare提供的滬深股票日線行情數據包含以下字段:

名稱	        描述
ts_code	        股票代碼
trade_date	交易日期
open	        開盤價
high	        最高價
low	        最低價
close	        收盤價
pre_close	昨收價
change	        漲跌額
pct_change	漲跌幅
vol	        成交量(手)
amount	        成交額(千元)

每日指標數據包含以下字段:

名稱	        描述
ts_code	        股票代碼
trade_date	交易日期
close	        收盤價
turnover_rate	換手率
turnover_rate_f	換手率(自由流通股)
volume_ratio	量比
pe	        市盈率(總市值/凈利潤)
pe_ttm	        市盈率(TTM)
pb	        市凈率(總市值/凈資產)
ps	        市銷率
ps_ttm	        市銷率(TTM)
total_share	總股本(萬)
float_share	流通股本(萬)
free_share	自由流通股本(萬)
total_mv	總市值(萬元)
cric_mv	        流通市值(萬元)

2. 創建DolphinDB數據庫

2.1 安裝DolphinDB

從官網下載DolphinDB安裝包和DolphinDB GUI.

DolphinDB單節點部署請參考單節點部署。

DolphinDB單服務器集群部署請參考單服務器集群部署。

DolphinDB多物理服務器部署請參考多服務器集群部署。

2.2 創建數據庫

我們可以使用database函數創建分區數據庫。

語法:database(directory, [partitionType], [partitionScheme], [locations])

參數

directory:數據庫保存的目錄。DolphinDB有三種類型的數據庫,分別是內存數據庫、磁盤上的數據庫和分布式文件系統上的數據庫。創建內存數據庫,directory為空;創建本地數據庫,directory應該是本地文件系統目錄;創建分布式文件系統上的數據庫,directory應該以“dfs://”開頭。本教程使用分布式文件系統上的數據庫。

partitionType:分區方式,有6種方式: 順序分區(SEQ),范圍分區(RANGE),哈希分區(HASH),值分區(VALUE),列表分區(LIST),復合分區(COMPO)。

partitionScheme:分區方案。各種分區方式對應的分區方案如下:

導入數據前,要做好數據的分區規划,主要考慮兩個因素:分區字段和分區粒度。

在日常的查詢分析中,按照日期查詢的頻率最高,所以分區字段為日期trade_date。如果一天一個分區,每個分區的數據量過少,只有3000多條數據,不到1兆大小,而且分區數量非常多。分布式系統在執行查詢時,會把查詢語句分成多個子任務發送到不同的分區。這樣的分區方式會導致子任務數量非常多,而每個任務執行的時間極短,系統在管理任務上耗費的時間反而大於任務本身的執行時間,明顯這樣的分區方式是不合理。這種情況下,我們按日期范圍進行分區,每年的1月1日到次年的1月1日為一個分區,這樣既能提升查詢的效率,也不會造成分區粒度過小。

現有數據的時間跨度是2008-2017年,但是為了給未來的數據留出足夠的空間,我們把時間范圍設置為2008-2030年。執行以下代碼:

yearRange=date(2008.01M + 12*0..22)

由於日線行情和每日指標數據的分區方案相同,因此把它們存放在同一個數據庫dfs://tushare的兩個表中,hushen_daily_line用於存放日線行情數據,hushen_daily_indicator用於存放每日指標數據。如果需要使用內存數據庫,創建數據庫時把directory設為空;如果需要使用磁盤上的數據庫,把directory設置為磁盤目錄即可。創建數據庫的代碼如下:

login("admin","123456")
dbPath="dfs://tushare"
yearRange=date(2008.01M + 12*0..22)
if(existsDatabase(dbPath)){
	dropDatabase(dbPath)
}
columns1=`ts_code`trade_date`open`high`low`close`pre_close`change`pct_change`vol`amount
type1=`SYMBOL`NANOTIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE
db=database(dbPath,RANGE,yearRange)
hushen_daily_line=db.createPartitionedTable(table(100000000:0,columns1,type1),`hushen_daily_line,`trade_date)

columns2=`ts_code`trade_date`close`turnover_rate`turnover_rate_f`volume_ratio`pe`pr_ttm`pb`ps`ps_ttm`total_share`float_share`free_share`total_mv`circ_mv
type2=`SYMBOL`NANOTIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE
hushen_daily_indicator=db.createPartitionedTable(table(100000000:0,columns2,type2),`hushen_daily_indicator,`trade_date)

3. 使用Python API把數據導入到DolphinDB

Tushare提供了兩種常用的數據調取方式:

  • 通過Tushare Python包,返回的是python dataframe類型數據。
  • 通過http協議直接獲取,返回的是Json格式數據。

本教程使用了第一種方法調取滬深股票2008年到2017年10年的日線行情數據和每日指標數據。

3.1 下載安裝Python3.X和Tushare

具體教程請參考Tushare官網。

3.2 安裝DolphinDB的Python3 API

從官網下載Python3 API,把Python3 API的安裝包解壓至任意目錄。在console中進入該目錄,執行以下命令:

python setup.py install

使用以下命令更新Python API:

python setup.py install --force

3.3 數據導入

我們分別使用Tushare Python包的daily和daily_basic接口調取日線行情和每日指標數據,返回的是Python Dataframe類型數據。注意,需要注冊Tushare賬號才能獲取token。接着,通過Python API,連接到IP為localhost,端口號為8941的DolphinDB數據節點(這里的數據節點IP與端口根據自己集群的情況進行修改),把Tushare返回的Dataframe數據分別追加到之前創建的DolphinDB DFS Table中。

具體的Python代碼如下:

import datetime
import tushare as ts
import pandas as pd
import numpy as np
import dolphindb as ddb
pro=ts.pro_api('YOUR_TOKEN')
s=ddb.session()
s.connect("localhost",8941,"admin","123456")
t1=s.loadTable(tableName="hushen_daily_line",dbPath="dfs://tushare")
t2=s.loadTable(tableName="hushen_daily_indicator",dbPath="dfs://tushare")
def dateRange(beginDate,endDate): dates=[] dt=datetime.datetime.strptime(beginDate,"%Y%m%d") date=beginDate[:] while date <= endDate: dates.append(date) dt=dt + datetime.timedelta(1) date=dt.strftime("%Y%m%d") return dates for dates in dateRange('20080101','20171231'): df=pro.daily(trade_date=dates) df['trade_date']=pd.to_datetime(df['trade_date']) if len(df): t1.append(s.table(data=df)) print(t1.rows) for dates in dateRange('20080101','20171231'): ds=pro.daily_basic(trade_date=dates) ds['trade_date']=pd.to_datetime(ds['trade_date']) ds['volume_ratio']=np.float64(ds['volume_ratio']) if len(ds): t2.append(s.table(data=ds)) print(t2.rows)

數據導入成功后,我們可以從DolphinDB GUI右下角的變量瀏覽器中看到兩個表的分區情況:

查看數據量:

select count(*) from hushen_daily_line
5,332,932

select count(*) from hushen_daily_indicator
5,333,321

至此,我們已經把滬深股票2008年-2017年的日線行情和每日指標數據全部導入到DolphinDB中。

4. 金融分析

DolphinDB將數據庫、編程語言和分布式計算融合在一起,不僅可以用作數據倉庫,還可以用作計算和分析工具。DolphinDB內置了許多經過優化的時間序列函數,特別適用於投資銀行、對沖基金和交易所的定量查詢和分析,可以用於構建基於歷史數據的策略測試。下面介紹如何使用Tushare的數據進行金融分析。

4.1 計算每只股票滾動波動率

daily_line= loadTable("dfs://daily_line","hushen_daily_line")
t=select ts_code,trade_date,mstd(pct_change/100,21) as mvol from daily_line context by ts_code
select * from t where trade_date=2008.11.14
ts_code	        trade_date	mvol
000001.SZ	2008.11.14	0.048551
000002.SZ	2008.11.14	0.04565
000004.SZ	2008.11.14	0.030721
000005.SZ	2008.11.14	0.046655
000006.SZ	2008.11.14	0.043092
000008.SZ	2008.11.14	0.035764
000009.SZ	2008.11.14	0.051113
000010.SZ	2008.11.14	0.027254
...

計算每只股票一個月的滾動波動率,僅需一行代碼。DolphinDB自帶金融基因,內置了大量與金融相關的函數,可以用簡單的代碼計算金融指標。

4.2 找到最相關的股票

使用滬深股票日線行情數據,計算股票的兩兩相關性。首先,生成股票回報矩陣:

retMatrix=exec pct_change/100 as ret from loadTable("dfs://daily_line","hushen_daily_line") pivot by trade_date,ts_code

exec和pivot by是DolphinDB編程語言的特點之一。exec與select的用法相同,但是select語句生成的是表,exec語句生成的是向量。pivot by用於整理維度,與exec一起使用時會生成一個矩陣。

接着,生成股票相關性矩陣:

corrMatrix=cross(corr,retMatrix,retMatrix)

上面使用到的cross是DolphinDB中的高階函數,它以函數和對象作為輸入內容,把函數應用到每個對象上。模板函數在復雜的批量計算中非常有用。

然后,找到每只股票相關性最高的10只股票:

mostCorrelated = select * from table(corrMatrix).rename!(`ts_code`corr_ts_code`corr) context by ts_code having rank(corr,false) between 1:10

查找與000001.SZ相關性最高的10只股票:

select * from mostCorrelated where ts_code="000001.SZ" order by cor desc
ts_code	        corr_ts_code	corr
000001.SZ	601166.SH	0.859
000001.SZ	600000.SH	0.8406
000001.SZ	002920.SZ	0.8175
000001.SZ	600015.SH	0.8153
000001.SZ	600036.SH	0.8129
000001.SZ	600016.SH	0.8022
000001.SZ	002142.SZ	0.7956
000001.SZ	601169.SH	0.7882
000001.SZ	601009.SH	0.7778
000001.SZ	601328.SH	0.7736

上面兩個示例都比較簡單,下面我們進行復雜的計算。

4.3 構建World Quant Alpha #001和#98

WorldQuant LLC發表的論文101 Formulaic Alphas中給出了101個Alpha因子公式。很多個人和機構嘗試用不同的語言來實現這101個Alpha因子。本文中,我們例舉了較為簡單的Alpha #001和較為復雜的Alpha #098兩個因子的實現。

Alpha#001公式:rank(Ts_ArgMax(SignedPower((returns<0?stddev(returns,20):close), 2), 5))-0.5

Alpha #001的詳細解讀可以參考【史上最詳細】WorldQuant Alpha 101因子系列#001研究。

Alpha#98公式:(rank(decay_linear(correlation(vwap, sum(adv5,26.4719), 4.58418), 7.18088))- rank(decay_linear(Ts_Rank(Ts_ArgMin(correlation(rank(open), rank(adv15), 20.8187), 8.62571), 6.95668) ,8.07206)))

這兩個因子在計算時候既用到了cross sectional的信息,也用到了大量時間序列的計算。也即在計算某個股票某一天的因子時,既要用到該股票的歷史數據,也要用到當天所有股票的信息,所以計算量很大。

構建這兩個因子,需要包含以下字段:股票代碼、日期、成交量、成交量的加權平均價格、開盤價和收盤價。其中,成交量的加權平均價格可以通過收盤價和成交量計算得出。因此,日線行情的數據可以用於構建這兩個因子。

構建因子的代碼如下:

def alpha1(stock){ t= select trade_date,ts_code,mimax(pow(iif(ratios(close) < 1.0, mstd(ratios(close) - 1, 20),close), 2.0), 5) as maxIndex from stock context by ts_code return select trade_date,ts_code,rank(maxIndex) - 0.5 as A1 from t context by trade_date } def alpha98(stock){ t = select ts_code,trade_date, wavg(close,vol) as vwap, open, mavg(vol, 5) as adv5, mavg(vol,15) as adv15 from stock context by ts_code update t set rank_open = rank(open), rank_adv15 = rank(adv15) context by trade_date update t set decay7 = mavg(mcorr(vwap, msum(adv5, 26), 5), 1..7), decay8 = mavg(mrank(9 - mimin(mcorr(rank_open, rank_adv15, 21), 9), true, 7), 1..8) context by ts_code return select ts_code,trade_date, rank(decay7)-rank(decay8) as A98 from t context by trade_date }

構建Alpha #001僅用了2行核心代碼,Alpha #98僅用了4行核心代碼,並且所有核心代碼都是用SQL實現,可讀性非常好。SQL中最關鍵的功能是context by子句實現的分組計算功能。context by是DolphinDB對標准SQL的擴展。與group by每個組產生一行記錄不同,context by會輸出跟輸入相同行數的記錄,所以我們可以方便的進行多個函數嵌套。cross sectional計算時,我們用trade_date分組。時間序列計算時,我們用ts_code分組。與傳統的分析語言Matlab、SAS不同,DolphinDB腳本語言與分布式數據庫和分布式計算緊密集成,表達能力強,高性能易擴展,能夠滿足快速開發和建模的需要。

查看結果:

select * from alpha1(stock1) where trade_date=2017.07.06
trade_date	ts_code	    A1
2017.07.06	000001.SZ	252.5
2017.07.06	000002.SZ	1,103.5
2017.07.06	000004.SZ	252.5
2017.07.06	000005.SZ	252.5
2017.07.06	000006.SZ	1,103.5
2017.07.06	000008.SZ	1,972.5
2017.07.06	000009.SZ	1,103.5
2017.07.06	000010.SZ	1,103.5
2017.07.06	000011.SZ	1,103.5
...

select * from alpha98(stock1) where trade_date=2017.07.19
ts_code	trade_date	A98
000001.SZ	2017.07.19	(1,073)
000002.SZ	2017.07.19	(1,270)
000004.SZ	2017.07.19	(1,805)
000005.SZ	2017.07.19	224
000006.SZ	2017.07.19	791
000007.SZ	2017.07.19	(609)
000008.SZ	2017.07.19	444
000009.SZ	2017.07.19	(1,411)
000010.SZ	2017.07.19	(1,333)
...

使用單線程計算,Alpha #001耗時僅4秒,復雜的Alpha #98耗時僅5秒,性能極佳。

4.4 動量交易策略

動量策略是投資界最流行的策略之一。通俗地講,動量策略就是“追漲殺跌”,買漲得厲害的,賣跌得厲害的。下面將介紹如何在DolphinDB中測試動量交易策略。

最常用的動量因素是過去一年扣除最近一個月的收益率。動量策略通常是一個月調整一次,並且持有期也是一個月。本教程中,每天調整1/21的投資組合,並持有新的投資組合21天。

要測試動量交易策略,需要包含以下字段的數據:股票代碼、日期、每股價格(收盤價格)、流通市值、股票日收益和每日交易量。

顯然,只有日線行情的數據是不夠的,我們需要連接hushen_daily_line和hushen_daily_indicator兩個表。

通過equal join,從兩個表中選擇需要的字段:

daily_line=loadTable(“dfs://daily_line”,http://www.jintianxuesha.com/”hushen_daily_line”)
daily_indicator=loadTable(“dfs://daily_indicator”,”hushen_daily_indicator”)
s=select ts_code,trade_date,close,change,pre_close,vol,amount,turnover_rate,total_share,float_share,free_share,total_mv,circ_mv from ej(daily_line,daily_indicator,`ts_code`trade_date)

(1)對數據進行清洗和過濾,為每只股票構建過去一年扣除最近一個月收益率的動量信號。

def loadPriceData(inData){ stocks = select ts_code, trade_date, close,change/pre_close as ret, circ_mv from inData where weekday(trade_date) between 1:5, isValid(close), isValid(vol) order by ts_code, trade_date stocks = select ts_code, trade_date,close,ret,circ_mv, cumprod(1+ret) as cumretIndex from stocks context by ts_code return select ts_code, trade_date, close, ret, circ_mv, move(cumretIndex,21)\move(cumretIndex,252)-1 as signal from stocks context by ts_code } priceData = loadPriceData(s)

(2)生成投資組合

選擇滿足以下條件的流通股:動量信號無缺失、當天的交易量為正、市值超過1億元以及每股價格超過5元。

def genTradables(indata){ return select trade_date, ts_code, circ_mv, signal from indata where close>5, circ_mv>10000, vol>0, isValid(signal) order by trade_date } tradables = genTradables(priceData)

根據每天的動量信號,產生10組流通股票。只保留兩個最極端的群體(贏家和輸家)。假設在21天內,每天總是多頭1元和空頭1元,所以我們每天在贏家組多頭1/21,在輸家組每天空頭1/21。在每組中,我們可以使用等權重或值權重,來計算投資組合形成日期上每個股票的權重。

//WtScheme=1表示等權重;WtScheme=2表示值權重
def formPortfolio(startDate, endDate, tradables, holdingDays, groups, WtScheme){ ports = select date(trade_date) as trade_date, ts_code, circ_mv, rank(signal,,groups) as rank, count(ts_code) as symCount, 0.0 as wt from tradables where date(trade_date) between startDate:endDate context by trade_date having count(ts_code)>=100 if (WtScheme==1){ update ports set wt = -1.0\count(ts_code)\holdingDays where rank=0 context by trade_date update ports set wt = 1.0\count(ts_code)\holdingDays where rank=groups-1 context by trade_date } else if (WtScheme==2){ update ports set wt = -circ_mv\sum(circ_mv)\holdingDays where rank=0 context by trade_date update ports set wt = circ_mv\sum(circ_mv)\holdingDays where rank=groups-1 context by trade_date } return select ts_code, trade_date as tranche, wt from ports where wt != 0 order by ts_code, trade_date } startDate=2008.01.01 endDate=2018.01.01 holdingDays=21 groups=10 ports = formPortfolio(startDate, endDate, tradables, holdingDays, groups, 2) dailyRtn = select date(trade_date) as trade_date, ts_code, ret as dailyRet from priceData where date(trade_date) between startDate:endDate

(3)計算投資組合中每只股票接下來21天的利潤或損失。在投資組合形成后的21天關停投資組合。

def calcStockPnL(ports, dailyRtn, holdingDays, endDate, lastDays){ ages = table(1..holdingDays as age) dates = sort distinct ports.tranche dictDateIndex = dict(dates, 1..dates.size()) dictIndexDate = dict(1..dates.size(), dates) pos = select dictIndexDate[dictDateIndex[tranche]+age] as date, ts_code, tranche, age, take(0.0,size age) as ret, wt as expr, take(0.0,size age) as pnl from cj(ports,ages) where isValid(dictIndexDate[dictDateIndex[tranche]+age]), dictIndexDate[dictDateIndex[tranche]+age]<=min(lastDays[ts_code], endDate) update pos set ret = dailyRet from ej(pos, dailyRtn,`date`ts_code,`trade_date`ts_code) update pos set expr = expr*cumprod(1+ret) from pos context by ts_code, tranche update pos set pnl = expr*ret/(1+ret) return pos } lastDaysTable = select max(date(trade_date)) as date from priceData group by ts_code lastDays = dict(lastDaysTable.ts_code, lastDaysTable.date) stockPnL = calcStockPnL(ports, dailyRtn, holdingDays, endDate, lastDays)

(4)計算投資組合的利潤或損失,並繪制動量策略累計回報走勢圖。

portPnL = select sum(pnl) as pnl from stockPnL group by date order by date
plot(cumsum(portPnL.pnl) as cumulativeReturn,portPnL.date, "Cumulative Returns of the Momentum Strategy")

下面是滬深股票2008年到2017年的回測結果。回測時,每天產生一個新的tranche,持有21天。使用單線程計算,耗時僅6秒。

如果使用Pandas來處理金融數據,對內存的要求較高,內存使用峰值一般是數據的3-4倍,隨着數據的積累,pandas的內存占用問題會越來越明顯。在性能上,pandas在多線程處理方面比較弱,不能充分利用多核CPU的計算能力,並且pandas不能根據業務字段對數據進行分區,也不支持列式存儲,查詢數據時必須全表掃描,效率不高。

5. 總結

作為數據庫,DolphinDB支持單表PB級存儲和靈活的分區方式;作為研究平台,DolphinDB不僅功能豐富,支持快速的數據清洗、高效的數據導入、交互式分析、庫內分析,流計算框架和離線計算支持生產環境代碼重用,而且性能極佳,即使面對龐大數據集,仍可以輕松實現秒級毫秒級的低延時交互分析。另外,DolphinDB對用戶十分友好,提供了豐富的編程接口,如Python、C++、Java、C#、R等編程API和Excel的add-in插件、ODBC、JDBC插件,還提供了功能強大的集成開發工具,支持圖形化數據顯示,讓實驗結果更加直觀易於理解。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM