前段時間玩Python時無意看到了獲取股票交易數據的tushare模塊,由於自己對股票交易挺有興趣,加上現在又在做數據挖掘工作,故想先將股票數據下載到數據庫中,以便日后分析:
# 導入需要用到的模塊
from queue import Queue
import threading
import os
import datetime
import tushare as ts
from sqlalchemy import create_engine
from sqlalchemy import types
# 創建myql數據庫引擎,便於后期鏈接數據庫
mysql_info = {'host':'localhost','port':3306,'user':'******','passwd':'******','db':'stock','charset':'utf8'}
engine = create_engine('mysql+pymysql://%s:%s@%s:%s/%s?charset=%s' %(mysql_info['user'],mysql_info['passwd'],
mysql_info['host'],mysql_info['port'],
mysql_info['db'],mysql_info['charset']),
echo=False)
# 獲取所有股票數據,利用股票代碼獲取復權數據
stock_basics = ts.get_stock_basics()
stock_basics.columns
# 獲取數據庫現有數據的時間日期
def get_old_date():
con = engine.connect()
sql1 = 'show tables;'
tables = con.execute(sql1)
if ('fq_day',) not in tables:
date_old = datetime.date(2001,1,1)
return date_old
sql2 = 'select max(date) from fq_day;'
date_old = con.execute(sql2).fetchall()[0][0].date()
if date_old < datetime.date.today() - datetime.timedelta(1):
return date_old
else:
con.close()
print('今天已經獲取過數據,不需重新獲取')
os._exit(1)
# 聲明隊列,用於存取股票以代碼數據,以便獲取復權明細
stock_code_queue = Queue()
for code in stock_basics.index:
stock_code_queue.put(code)
type_fq_day = {'code':types.CHAR(6),'open':types.FLOAT,'hige':types.FLOAT,'close':types.FLOAT,'low':types.FLOAT,
'amount':types.FLOAT,'factor':types.FLOAT}
# 獲取復權數據
def process_data(old_date,task_qeue):
#queueLock.acquire()
while not task_qeue.empty():
data = task_qeue.get()
print("正在獲取%s;數據還有%s條:" %(data,task_qeue.qsize()))
#queueLock.release()
date_begin = old_date + datetime.timedelta(1)
date_end = datetime.date.today()
try:
qfq_day = ts.get_h_data(data,start = str(date_begin),end=str(date_end),autype='qfq',drop_factor=False)
qfq_day['code'] = data
qfq_day.to_sql('fq_day',engine,if_exists='append',dtype=type_fq_day)
except:
task_qeue.put(data) # 如果數據獲取失敗,將該數據重新存入到隊列,便於后期繼續執行
#else:
#queueLock.release()
昨天試了下單線程下載股票數據,但由於獲取數據量較大,所需時間特別長,這次用多線程實試試
# 重寫線程類,用戶獲取數據
class get_qfq(threading.Thread):
def __init__(self,name,queue,date_begin):
threading.Thread.__init__(self)
self.name = name
self.queue = queue
self.begin = date_begin
def run(self):
process_data(self.begin,self.queue)
print("Exiting " + self.name)
# 聲明線程鎖
#queueLock = threading.Lock()
old_date = get_old_date()
# 生成10個線程
threads = []
for i in range(7):
thread = get_qfq('thread'+ str(i), stock_code_queue,old_date)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
正在獲取300239;數據還有718條:
[Getting data:]##正在獲取002042;數據還有717條:
[Getting data:]正在獲取600999;數據還有716條:
[Getting data:]正在獲取300453;數據還有715條:
[Getting data:]#正在獲取600538;數據還有714條:
[Getting data:]###正在獲取002253;數據還有713條:
[Getting data:]正在獲取600188;數據還有712條:
[Getting data:]正在獲取000948;數據還有711條:
[Getting data:]###正在獲取002586;數據還有710條:
[Getting data:]正在獲取002651;數據還有709條:
[Getting data:]正在獲取600705;數據還有708條:
[Getting data:]##正在獲取300135;數據還有707條:
[Getting data:]##正在獲取600755;數據還有706條:
[Getting data:]正在獲取601890;數據還有705條:
[Getting data:]正在獲取300341;數據還有704條:
[Getting data:]#正在獲取000897;數據還有703條:
[Getting data:]###正在獲取600886;數據還有702條:
[Getting data:]#正在獲取002015;數據還有701條:
[Getting data:]正在獲取600662;數據還有700條:
[Getting data:]#正在獲取000408;數據還有699條:
[Getting data:]#正在獲取000524;數據還有698條:
[Getting data:]#正在獲取300309;數據還有697條:
[Getting data:]#正在獲取600333;數據還有696條:
[Getting data:]##正在獲取002178;數據還有695條:
本次采用了10個線程,下載速度快了許多,查看了下流量,基本可以達到3M/S,是單線程的6倍左右。