前段時間玩Python時無意看到了獲取股票交易數據的tushare模塊,由於自己對股票交易挺有興趣,加上現在又在做數據挖掘工作,故想先將股票數據下載到數據庫中,以便日后分析:
# 導入需要用到的模塊 from queue import Queue import threading import os import datetime import tushare as ts from sqlalchemy import create_engine from sqlalchemy import types
# 創建myql數據庫引擎,便於后期鏈接數據庫 mysql_info = {'host':'localhost','port':3306,'user':'******','passwd':'******','db':'stock','charset':'utf8'} engine = create_engine('mysql+pymysql://%s:%s@%s:%s/%s?charset=%s' %(mysql_info['user'],mysql_info['passwd'], mysql_info['host'],mysql_info['port'], mysql_info['db'],mysql_info['charset']), echo=False)
# 獲取所有股票數據,利用股票代碼獲取復權數據 stock_basics = ts.get_stock_basics() stock_basics.columns
# 獲取數據庫現有數據的時間日期 def get_old_date(): con = engine.connect() sql1 = 'show tables;' tables = con.execute(sql1) if ('fq_day',) not in tables: date_old = datetime.date(2001,1,1) return date_old sql2 = 'select max(date) from fq_day;' date_old = con.execute(sql2).fetchall()[0][0].date() if date_old < datetime.date.today() - datetime.timedelta(1): return date_old else: con.close() print('今天已經獲取過數據,不需重新獲取') os._exit(1)
# 聲明隊列,用於存取股票以代碼數據,以便獲取復權明細 stock_code_queue = Queue() for code in stock_basics.index: stock_code_queue.put(code) type_fq_day = {'code':types.CHAR(6),'open':types.FLOAT,'hige':types.FLOAT,'close':types.FLOAT,'low':types.FLOAT, 'amount':types.FLOAT,'factor':types.FLOAT}
# 獲取復權數據 def process_data(old_date,task_qeue): #queueLock.acquire() while not task_qeue.empty(): data = task_qeue.get() print("正在獲取%s;數據還有%s條:" %(data,task_qeue.qsize())) #queueLock.release() date_begin = old_date + datetime.timedelta(1) date_end = datetime.date.today() try: qfq_day = ts.get_h_data(data,start = str(date_begin),end=str(date_end),autype='qfq',drop_factor=False) qfq_day['code'] = data qfq_day.to_sql('fq_day',engine,if_exists='append',dtype=type_fq_day) except: task_qeue.put(data) # 如果數據獲取失敗,將該數據重新存入到隊列,便於后期繼續執行 #else: #queueLock.release()
昨天試了下單線程下載股票數據,但由於獲取數據量較大,所需時間特別長,這次用多線程實試試
# 重寫線程類,用戶獲取數據 class get_qfq(threading.Thread): def __init__(self,name,queue,date_begin): threading.Thread.__init__(self) self.name = name self.queue = queue self.begin = date_begin def run(self): process_data(self.begin,self.queue) print("Exiting " + self.name)
# 聲明線程鎖 #queueLock = threading.Lock() old_date = get_old_date()
# 生成10個線程 threads = [] for i in range(7): thread = get_qfq('thread'+ str(i), stock_code_queue,old_date) thread.start() threads.append(thread) for thread in threads: thread.join()
正在獲取300239;數據還有718條: [Getting data:]##正在獲取002042;數據還有717條: [Getting data:]正在獲取600999;數據還有716條: [Getting data:]正在獲取300453;數據還有715條: [Getting data:]#正在獲取600538;數據還有714條: [Getting data:]###正在獲取002253;數據還有713條: [Getting data:]正在獲取600188;數據還有712條: [Getting data:]正在獲取000948;數據還有711條: [Getting data:]###正在獲取002586;數據還有710條: [Getting data:]正在獲取002651;數據還有709條: [Getting data:]正在獲取600705;數據還有708條: [Getting data:]##正在獲取300135;數據還有707條: [Getting data:]##正在獲取600755;數據還有706條: [Getting data:]正在獲取601890;數據還有705條: [Getting data:]正在獲取300341;數據還有704條: [Getting data:]#正在獲取000897;數據還有703條: [Getting data:]###正在獲取600886;數據還有702條: [Getting data:]#正在獲取002015;數據還有701條: [Getting data:]正在獲取600662;數據還有700條: [Getting data:]#正在獲取000408;數據還有699條: [Getting data:]#正在獲取000524;數據還有698條: [Getting data:]#正在獲取300309;數據還有697條: [Getting data:]#正在獲取600333;數據還有696條: [Getting data:]##正在獲取002178;數據還有695條:
本次采用了10個線程,下載速度快了許多,查看了下流量,基本可以達到3M/S,是單線程的6倍左右。
標簽:
股票