用Python選一個自己的股票池1
來源:https://blog.51cto.com/youerning/2498036
人的精力是有限的,所以將目光聚焦在更小的范圍內,也許能夠得到性價比最高的效果。
股票很多很多,但是我們關心的也許並不多,將自己關心或者符合自己買入股票的前提條件的股票納入一個股票池,然后再一定時間內只關注這些股票,也許是一個不錯的選擇,
入市有風險,投資需謹慎,本文不作為任何投資參考依據。
數據源
這里主要指免費的,收費的本人也沒用過。
比較推薦的數據源有兩個tushare或者baostock.
tushare需要注冊(如果你需要質量更高的數據的話)。對獲取頻率或者小於日線頻率的數據有限制。
其他數據源
baostock
數據不需要注冊,數據有維護者維護
akshare
數據源來源於各類財經網站
pytdx
通過通達信的數據接口獲取的數據
如果大家想注冊tushare的話, 可以使用我的分享鏈接: https://tushare.pro/register?reg=277890 這樣我能多50積分^_^
使用何種方式獲取數據都是可以的,看自己喜歡。
安裝
安裝很簡單
pip install tushare
pip install baostock
pip install akshare
pip install pytdx
數據獲取
這里使用為了接下來的操作需要將一定歷史范圍的股票數據下載下來,這里下載起始時間為20160101,截至時間為運行代碼的時間范圍的歷史日線數據。
這里以tushare為例, tushare獲取歷史數據有兩種方式。
第一種是以迭代歷史交易日的方式獲取所有歷史數據,假設獲取三年的歷史數據,一年一般220個交易日左右,那么3年需要請求660多次左右,如果以這種方式的話,就下載數據的時間只需要1分鍾多點的樣子。
第二種是以迭代所有股票代碼的方式獲取所有歷史數據,股票數量有大概3800多個,需要請求3800多次,但是在積分有限的情況下一分鍾最多請求500次,也就意味着僅下載數據的時間至少需要大概8分鍾時間。
理論上,你獲取的歷史范圍超過17.3年,那么使用第一種方式才比第二種方式快。
這兩種方式我實現了,大家可以參考以下代碼.
首先是一些必要的設置及額外的重試機制。
DATE_FORMAT = "%Y%m%d"
TS_TOKEN = "<你的Tushare Token>"
ts.set_token(TS_TOKEN)
pro = ts.pro_api(TS_TOKEN)
def retry(fn, max_retry=3, sleep_init=14):
"""簡單的retry函數"""
count = 1
while count < max_retry:
count += 1
try:
return fn()
except Exception as e:
# traceback.print_exc("")
# 等待時間遞增
time_sleep = sleep_init * count + 1
print("遇到異常%s, 在%s秒后再次嘗試第%s次" % (e, time_sleep, count))
time.sleep(time_sleep)
def save_to_csv(ret, data_path="stock"):
if not path.exists(data_path):
# 如果父目錄不存在不會報錯
os.makedirs(data_path)
for ts_code, df in ret.items():
fname = "-".join([ts_code, ".csv"])
fp = path.join(data_path, fname)
df.to_csv(fp, index=False)
第一種方式
def download_by_trade_date(start_date, end_date, data_path="by_trade_date", worker_size=2, debug=False):
"""
通過交易日來遍歷時間范圍內的數據,當交易日的個數小於股票的數量時,效率較高.
一年一般220個交易日左右,但是股票卻有3800多個,那么通過交易日來下載數據就高效的多了
"""
now = datetime.now()
start_time = now
try:
start_date_ = datetime.strptime(start_date, DATE_FORMAT)
end_date_ = datetime.strptime(end_date, DATE_FORMAT)
if end_date_ < start_date_:
sys.exit("起始時間應該大於結束時間")
if start_date_ > now:
sys.exit("起始時間應該大於當前時間")
if end_date_ > now:
end_date = now.strftime(DATE_FORMAT)
except Exception:
traceback.print_exc("")
sys.exit("傳入的start_date[%s]或end_date[%s]時間格式不正確, 格式應該像20200101" % (start_date, end_date))
# 獲取交易日歷
try:
trade_cal = pro.trade_cal(exchange="SSE", is_open="1",
start_date=start_date,
end_date=end_date,
fields="cal_date")
except Exception:
sys.exit("獲取交易日歷失敗")
trade_date_lst = trade_cal.cal_date
pool = ThreadPoolExecutor(max_workers=worker_size)
print("准備開始獲取 %s到%s 的股票數據" % (start_date, end_date))
def worker(trade_date):
# 用偏函數包裝一下
# pro = ts.pro_api(TS_TOKEN)
fn = partial(pro.daily, trade_date=trade_date)
return retry(fn)
# 最終保存到一個列表中
ret = defaultdict(list)
# future 列表
fs_lst = []
# 通過線程並發獲取數據
for trade_date in trade_date_lst:
# print(trade_date)
# 這里不使用pool.map的原因是, map返回的future列表會亂序
# submit的位置參數不需要需要放到可迭代對象里面(一般是元組), 卧槽。。。
fs = pool.submit(worker, trade_date)
fs_lst.append(fs)
# break
# *****************************
# 獲取每個交易日的股票數據
# *****************************
for trade_date, fs in zip(trade_date_lst, fs_lst):
if debug:
print("開始獲取交易日[%s]的數據" % trade_date)
# 如果有異常或者結果為空的話
if fs.exception() or not isinstance(fs.result(), pd.DataFrame):
print(fs.exception())
sys.exit("在交易日[%s]超過重試最大的次數也沒有獲取到數據" % trade_date)
day_df = fs.result()
columns = day_df.columns
# 遍歷一個交易日的所有股票數據
# print(datetime.now())
# 遍歷day_df.values 大概2ms
# 2 ms ± 63.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
# 遍歷day_df.iterrows() 大概285ms
# 285 ms ± 2.64 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
for row in day_df.values:
ts_code = row[0]
ret[ts_code].append(row)
# print(datetime.now())
merge_start_time = datetime.now()
new_ret = {}
for key, value in ret.items():
new_ret[key] = pd.DataFrame(value, columns=columns)
merge_end_time = datetime.now()
# 組合[series...] 需要142ms
# 142 ms ± 1.12 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
# 組合[array....] 需要6.56ms
# 6.56 ms ± 66.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
print("合並共花費時間: %s" % (merge_end_time - merge_start_time))
# *****************************
# 獲取將結果保存到本地的csv文件中
# *****************************
print("數據已經獲取完畢准備保存到本地")
save_to_csv(new_ret, data_path=data_path)
end_time = datetime.now()
print("*****************************")
print("下載完成, 共花費時間%s" % (end_time - start_time))
print("*****************************")
第二種方式
def download_by_ts_code(start_date, end_date, data_path="by_ts_code", debug=False, worker_size=3):
"""因為按股票代碼的方式實在太慢了(如果你寬帶速度比較快的話), 也就沒必要多線程了"""
now = datetime.now()
start_time = now
try:
start_date_ = datetime.strptime(start_date, DATE_FORMAT)
end_date_ = datetime.strptime(end_date, DATE_FORMAT)
if end_date_ < start_date_:
sys.exit("起始時間應該大於結束時間")
if start_date_ > now:
sys.exit("起始時間應該大於當前時間")
if end_date_ > now:
end_date = now.strftime(DATE_FORMAT)
except Exception:
traceback.print_exc("")
sys.exit("傳入的start_date[%s]或end_date[%s]時間格式不正確, 格式應該像20200101" % (start_date, end_date))
def worker(ts_code):
fn = partial(ts.pro_bar, ts_code=ts_code, adj='qfq', start_date=start_date, end_date=end_date)
return retry(fn)
pool = ThreadPoolExecutor(max_workers=worker_size)
print("准備開始獲取 %s到%s 的股票數據" % (start_date, end_date))
# 不指定任何參數會獲取5000條最近的數據,ts_code會重復
day = pro.daily()
# 固定順序,set是無法包裝有序的
all_ts_code = list(set(day.ts_code))
fs_lst = []
for ts_code in all_ts_code:
fs_lst.append(pool.submit(worker, ts_code))
# break
for ts_code, fs in zip(all_ts_code, fs_lst):
# 如果有異常或者結果為空的話
if fs.exception() or not isinstance(fs.result(), pd.DataFrame):
print(fs.exception())
sys.exit("在交易日[%s]超過重試最大的次數也沒有獲取到數據" % trade_date)
df = fs.result()
# %timeit df.sort_index()
# 192 µs ± 3.73 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
# %timeit df.sort_index(inplace=True)
# 2.54 µs ± 177 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
df.sort_index(inplace=True, ascending=False)
if not isinstance(df, pd.DataFrame):
sys.exit("在股票[%s]超過重試最大的次數也沒有獲取到數據" % ts_code)
save_to_csv({ts_code: df}, data_path=data_path)
if debug:
print("股票[%s]歷史數據下載保存完成" % ts_code)
end_time = datetime.now()
print("*****************************")
print("下載完成, 共花費時間%s" % (end_time - start_time))
print("*****************************")
我感覺我寫了好多注釋以及一些方法之間性能對比,關於各個方法之間的性能對比大家還是需要注意的,因為雖然平時感受不出來,但是在較多次數的循環的時候就會發現性能差異了。
至此需要的歷史數據就得到了。
如果對讀寫速度很在意的話,存儲在sqlite數據庫或者其他其他數據會快很多。
股票池的選擇條件
無論是主觀交易還是量化交易,選股的方式個人認為大概分為以下三類。
技術選股
通過技術指標比如MACD, KDJ或者K線形態等技術指標來選股。
基本面選股
通過財務報表或者一些金融指標來選股。
消息選股
新聞消息或者小道消息選股。
本文主要集中在以下幾種方式
形態選股
通過k線的形態找到股票中的十字星等形態。
交易額/流通股本選股
通過對交易額或者流通股本排序選擇前面的股票。
相似度選股
通過選定基准股票趨勢,發現相似的股票。
趨勢選股
挑選最近六個月股票趨勢向上的股票。
形態選股
股票的形態多種多樣,這里以選擇股票中的黃昏(早晨)十字星為例。
黃昏十字星與早晨十字星的區別在於所處趨勢不一樣。
圖示如下:

如果需要程序判斷,那么這些長度需要量化,不能模棱兩可。所以根據十字星的定義,量化如下:
實體長度: 超過2.5%才算長實體,而且上下影線不能超過0.3%
十字星: 實體長度不超過1.5%
趨勢: 包括十字星在內往前數6根k線,其中的第1根k線收盤價均小於它后面4根收盤價為向上趨勢,反之趨勢向下,並且幅度超過2.5%。
這里我只是主觀的量化沒有任何交易經驗的選擇,如果不認同的話,可以自行修改代碼。
代碼如下:
def select_doji():
# fp = "by_ts_code/600249.SH-.csv"
fp = "by_ts_code/300130.SZ-.csv"
df = pd.read_csv(fp, index_col="trade_date", parse_dates=["trade_date"])
df = df[["open", "high", "low", "close"]]
# k線數量
k_size = 6
# 起始幅度大小
treand_threshold = 0.025
# 長實體最小長度
entity_length_threshold = 0.025
# 長實體上下最大影線長度
entity_shadow_line_length_threshold = 0.03
# 十字星實體長度最大長度
doji_entity_length_threshold = 0.015
# 十字星上下影線長度最小長度
# doji_shadow_line_length_threshold = 0.005
trend_map = {1: "向上", -1: "向下"}
def up_or_down_trend(row):
"""1代表向上, -1代表向下, 0代表震盪"""
first = row[0]
last = row[-1]
if all(first > row[1:]) and first > (last * treand_threshold):
return -1
elif all(first < row[1:]) and last > (first * treand_threshold):
return 1
else:
return 0
df["trend"] = df.close.rolling(k_size).apply(up_or_down_trend, raw=True)
df.fillna(value=0, inplace=True)
def k_sharp(k):
"""返回k線的上下影線長度, 實體長度"""
open_, high, low, close = k[:4]
if open_ > close:
upper_line_length = (high - open_) / high
lower_line_length = (close - low) / close
entity_length = (open_ - close) / open_
else:
upper_line_length = (high - close) / high
lower_line_length = (open_ - low) / open_
entity_length = (close - open_) / close
return upper_line_length, lower_line_length, entity_length
def is_up_or_down_doji(k_lst, trend):
# open, high, low, close
if len(k_lst) != 3:
sys.exit("判斷十字星需要三根K線")
is_ok = False
k1, k2, k3 = k_lst
# 判斷是否跳空
# 通過high, close過於嚴格
if trend > 0:
# 趨勢向上時,最低點是否大於兩個實體的最高價
if k2[0] < k1[1] or k2[0] < k3[1]:
# if k2[0] < k1[1] :
return is_ok
else:
# 趨勢向下時,最高點是否小於兩個實體的最高價
if k2[0] > k1[2] or k2[0] > k3[2]:
return is_ok
k1_sharp = k_sharp(k1)
# print("k1 sharp")
# print(k1_sharp)
# 判斷是否為長實體
if (k1_sharp[2] < entity_length_threshold
or k1_sharp[0] > entity_shadow_line_length_threshold
or k1_sharp[1] > entity_shadow_line_length_threshold):
return is_ok
k3_sharp = k_sharp(k3)
# print("k3 sharp")
# print(k3_sharp)
if (k3_sharp[2] < entity_length_threshold
or k3_sharp[0] > entity_shadow_line_length_threshold
or k3_sharp[1] > entity_shadow_line_length_threshold):
return is_ok
# print("ok")
# 判斷是否為十字星
k2_sharp = k_sharp(k2)
# print("k2 sharp")
# print(k2_sharp)
# 實體長度不超過0.2%, 上下影線長度超過0.6%, 如果規定上下影線的長度不太好找
# if (k2_sharp[2] > doji_entity_length_threshold
# or k2_sharp[0] < doji_shadow_line_length_threshold
# or k2_sharp[1] < doji_shadow_line_length_threshold):
if k2_sharp[2] > doji_entity_length_threshold:
return is_ok
return True
df_values = df.values
ret = []
for index in range(len(df_values)):
if index < k_size:
continue
trend = df_values[index - 1][-1]
if trend == 0:
continue
val_slice = slice(index-2, index+1)
k_lst = df_values[val_slice]
if is_up_or_down_doji(k_lst, trend):
ret.append(index-1)
print("在>>%s<<<找到趨勢為[%s]的十字星" % (df.index[index-1], trend_map[trend]))
if not ret:
print("沒有發現任何十字星")
ax = ohlc_plot(df[["open", "high", "low", "close"]])
for i in ret:
# print(i)
mark_x = df.index[i]
# mark_y = df.loc[mark_x].low
# print(mark_x, mark_y)
ax.axvline(mark_x)
plt.show()
return ret
結果如下:

說實話標准的不太好找,所以代碼的條件設置不是很嚴格。
形態選股很有意思,以后有機會單獨出一篇文章。
交易額/流通股本選股
通過交易額過濾,選擇股票最近一百個交易日的成交額平均值,然后進行排序。
交易額排序
def select_by_amount(data_path, top_size=20):
# 或許交易日比較好
day_range = 100
all_df = load_all_local_data(data_path, tail_size=day_range)
ret = []
for ts_code, df in all_df.items():
df["amount_avg"] = df.amount.rolling(day_range).mean()
amount_avg = df["amount_avg"][-1]
# NaN的布爾值為True, 所以需要np.isnan判斷
if np.isnan(amount_avg):
continue
ret.append((ts_code, amount_avg))
ret.sort(key=lambda x:x[1])
print("交易額排名前%s的結果如下" % top_size)
print(ret[-top_size:])
return ret[-top_size:]
結果如下。
[('002714.SZ', 2162871.9387400015), ('000002.SZ', 2294273.739109999), ('002460.SZ', 2372975.2177099977), ('600745.SH', 2461243.4350499995), ('601990.SH', 2551214.2825800003), ('603986.SH', 2679324.4020000002), ('600703.SH', 2759167.96963), ('002456.SZ', 2759314.642220001), ('000651.SZ', 2782179.0211499995), ('000100.SZ', 2853012.22389), ('002185.SZ', 3149773.6836400027), ('000858.SZ', 3168871.2845699983), ('002475.SZ', 3392269.47999), ('300750.SZ', 3511882.7076800014), ('600030.SH', 4039920.83439), ('000725.SZ', 4189700.4488400007), ('300059.SZ', 4447245.855129998), ('600519.SH', 4448082.42745), ('601318.SH', 4892206.13003), ('000063.SZ', 5128169.27109)]
流通市值排序
def select_by_float_market_value(trade_date, top_size=20):
df = pro.daily_basic(ts_code='', trade_date=trade_date, fields="ts_code,close,float_share")
ret = []
for row in df.values:
ts_code = row[0]
float_market_value = row[1] * row[2]
if np.isnan(float_market_value) or not float_market_value:
continue
ret.append((ts_code, float_market_value))
ret.sort(key=lambda x:x[1])
print("流通市值名前%s的結果如下" % top_size)
print(ret[-top_size:])
return ret[-top_size:]
交易額排名前20的結果如下
[('000002.SZ', 24443367.828188002), ('000001.SZ', 25072232.462559998), ('601088.SH', 26237241.386405), ('600000.SH', 28497216.593586), ('601166.SH', 30952865.369478), ('000651.SZ', 33204757.629185997), ('603288.SH', 36720702.433056), ('600900.SH', 36894000.0), ('000333.SZ', 39063915.66100799), ('600028.SH', 40612052.69455), ('600276.SH', 42312945.987192), ('000858.SZ', 54753808.6884), ('601628.SH', 54786707.43), ('600036.SH', 69375140.114727), ('601857.SH', 70759948.006466), ('601988.SH', 72081806.077332), ('601318.SH', 75958643.459976), ('601288.SH', 99096634.04564801), ('601398.SH', 136154167.33219498), ('600519.SH', 166848191.796)]
沈大婦科醫院:http://yyk.39.net/sy/zhuanke/fc843.html