從Tushare獲取歷史行情數據


從Tushare獲取歷史行情數據,分為兩種,一種是后復權(daily_hfq)數據,一種是不復權(daily)數據,獲取到的數據存儲在MongoDB數據庫中,每個集合(collection)中,數據字段包含如下:

抓取指數歷史行情

流程圖如下


首先准備好數據庫的連接,可查看python對MongoDB數據庫的操作,這里在database文件中創建了對MongoDB數據的連接及指定存儲的數據庫
datebase.py文件

from pymongo import MongoClient

#指定數據庫的連接,quant_01是數據庫名
DB_CONN = MongoClient('mongodb://127.0.0.1:27017')['quant_01']


在daily_crawler.py文件中完成初始化、數據的獲取、儲存等操作。

import tushare as ts 
from database import DB_CONN
from datetime import datetime
from pymongo import UpdateOne

class DailyCrawler:
    def __init__(self):
        #創建daily數據集(集合)
        self.daily = DB_CONN['daily']
        #創建daily_hfq數據集(集合)
        self.daily_hfq = DB_CONN['daily_hfq']

獲取指數歷史行情數據(index= true)

def crawl_index(self,begin_date = None,end_date=None):
        """
        抓取指數的日k數據
        指數行情的主要作用:
        1、用來生成交易日歷
        2、回測時作為收益的對比基准
        :param begin_date:開始日期
        :param end_date:結束日期
        """
        #指定抓取的指數列表,可以增加和改變列表中的值
        index_codes = ['000001','000300','399001','399006','399006']
        #當前日期
        now = datetime.now().strftime('%Y-%m-%d')
        #如果沒有指定開始日期,則默認當前日期
        if begin_date is None:
            begin_date = now
        #如果沒有指定結束日期,則默認當前日期
        if end_date is None:
            end_date = now
        #按指數的代碼循環,抓取所有指數信息
        for code in index_codes:
            #抓取一個指數在一個時間區間的數據
            df_daily = ts.get_k_data(code,index=True,start=begin_date,end=end_date)
            #保存數據
            self.save_data(code,df_daily,self.daily,{'index':True})

獲取股票歷史數據(index=False)

流程圖如下:

獲取所有股票行情數據
調用tushare中get_stock_basics()獲取所有股票的基本信息,然后將基本信息的索引列表轉化為股票代碼列表,就得到了所有股票代碼
再調用get_k_data()獲取不復權、后復權歷史價格數據

def crawl(self,begin_date=None,end_date=None):
        '''
        抓取股票的日k數據,主要包括不復權和后復權兩種
        :param begin_date:開始日期
        :param end_date:結束日期
        '''
        #通過tushare的基本信息API,獲取股票的基本信息
        stock_df = ts.get_stock_basics()
        #將基本信息的索引列表轉換為股票代碼列表
        codes = list(stock_df.index)

        #當前日期
        now = datetime.now().strftime("%Y-%M-%D")

        #如果沒有指定開始/結束日期,則默認為當前日期
        if begin_date is None:
            begin_date = now
        if end_date is None:
            end_date = now
        
        for code in codes:
            #不復權價格
            df_daily = ts.get_k_data(code,start=begin_date,end=end_date,autype=None)
            self.save_data(code,df_daily,self.daily,{'index':False})
            #后復權價格
            df_daily_hfq = ts.get_k_data(code,start=begin_date,end=end_date,autype='hfq')
            self.save_data(code,df_daily_hfq,self.daily_hfq,{'index':False})

這里曾經很好奇,為何\(\color{purple}{stock\_df.index}\)就可以獲得股票代碼呢?
在get_stock_basics()實現源碼中,作者將\(\color{purple}{code}\)設為了index,因此該語句才能有效的獲取股票代碼

保存數據

流程圖:

隨着數據量的增加,寫入速度會變慢,因此需要創建索引,這里對code、date、index三個字段加上索引
創建索引的命令式如下:

db.daily.createIndex({'code':1,'date':1,'index':1})

可通過db.daily.getIndexes()查看索引

保存數據代碼:

def save_data(self,code,df_daily,collection,extra_fields =None):
        '''
        將從網上抓取的數據保存在本地MongoDB中
        :param code:股票代碼
        :param df_daily:包含日線數據的DataFrame
        :param collection:儲存的數據集
        :param extra_fields:除k線數據中保存的字段,需要額外保存的字段
        '''
        #數據更新的請求列表
        update_requests = []

        #將DataFrame中的行情數據,生成更新數據的請求
        for df_index in df_daily.index:
            #將DataFrame中的一行數據轉換成dict類型:
            doc = dict(df_daily.loc[df_index])
            #設置股票代碼
            doc['code'] = code

            #如果指定了其他字段,則更行dict
            if extra_fields is not None:
                doc.update(extra_fields)
            
            #生成一條數據庫的更新請求
            #注意:
            #需要在code、date、index三個字段上增加索引,否則隨着數據量的增加,寫入速度會變慢
            #創建索引的命令式:
            #db.daily.createIndex({'code':1,'date':1,'index':1})
            update_requests.append(
                UpdateOne(
                    {'code':doc['code'],'date':doc['date'],'index':doc['index']},
                    {'$set':doc},
                    upsert = True
                )
            )
            #如果寫入的請求列表不為空,則都保存在數據庫中
            if len(update_requests)>0:
                #批量寫入到數據庫中,批量寫入可以降低網絡IO,提高速度
                update_result = collection.bulk_write(update_requests,ordered=False)
                print('保存日線數據,代碼:%s ,插入:%4d 條,更新:%4d 條'%(code,update_result.upserted_count,update_result.modified_count),flush=True)

程序入口

if __name__ == "__main__":
    dc = DailyCrawler()
    dc.crawl_index('2015-01-01', '2015-01-06')
    dc.crawl('2015-01-01', '2015-01-06')

運行效果:

查看有多少條數據:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM