Python股票數據分析
最近在學習基於python的股票數據分析,其中主要用到了tushare和seaborn。tushare是一款財經類數據接口包,國內的股票數據還是比較全的
官網地址:http://tushare.waditu.com/index.html#id5。seaborn則是一款繪圖庫,通過seaborn可以輕松地畫出簡潔漂亮的圖表,而且庫本身具有一定的統計功能。
導入的模塊:
import matplotlib.pyplot as plt
import seaborn as sns
import seaborn.linearmodels as snsl
from datetime import datetime
import tushare as ts
代碼部分:
股票收盤價走勢曲線
sns.set_style("whitegrid")
end = datetime.today() #開始時間結束時間,選取最近一年的數據
start = datetime(end.year-1,end.month,end.day)
end = str(end)[0:10]
start = str(start)[0:10]
stock = ts.get_hist_data('300104',start,end)#選取一支股票
stock['close'].plot(legend=True ,figsize=(10,4))
plt.show()
股票日線
同理,可以做出5日均線、10日均線以及20日均線
stock[['close','ma5','ma10','ma20']].plot(legend=True ,figsize=(10,4))
日線、5日均線、10日均線、20日均線
股票每日漲跌幅度
stock['Daily Return'] = stock['close'].pct_change()
stock['Daily Return'].plot(legend=True,figsize=(10,4))
每日漲跌幅
核密度估計
sns.kdeplot(stock['Daily Return'].dropna())
核密度估計
核密度估計+統計柱狀圖
sns.distplot(stock['Daily Return'].dropna(),bins=100)
核密度+柱狀圖
兩支股票的皮爾森相關系數
sns.jointplot(stock['Daily Return'],stock['Daily Return'],alpha=0.2)
皮爾森相關系數
多只股票相關性計算
stock_lis=['300113','300343','300295','300315`] #隨便選取了四支互聯網相關的股票
df=pd.DataFrame()
for stock in stock_lis: closing_df = ts.get_hist_data(stock,start,end)['close'] df = df.join(pd.DataFrame({stock:closing_df}),how='outer')
tech_rets = df.pct_change()
snsl.corrplot(tech_rets.dropna())
相關性
簡單地計算股票的收益與風險,衡量股票收益與風險的數值分別為股票漲跌的平均值以及標准差,平均值為正則說明收益是正的,標准差越大則說明股票波動大,風險也大。
rets = tech_rets.dropna()
plt.scatter(rets.mean(),rets.std())
plt.xlabel('Excepted Return')
plt.ylabel('Risk')
for label,x,y in zip(rets.columns,rets.mean(),rets.std()):#添加標注 plt.annotate( label, xy =(x,y),xytext=(15,15), textcoords = 'offset points', arrowprops = dict(arrowstyle = '-',connectionstyle = 'arc3,rad=-0.3'))
用Python分析公開數據選出高送轉預期股票
根據以往的經驗,每年年底都會有一波高送轉預期行情。今天,米哥就帶大家實踐一下如何利用tushare實現高送轉預期選股。
本文主要是講述選股的思路方法,選股條件和參數大家可以根據米哥提供的代碼自行修改。
1. 選股原理
一般來說,具備高送轉預期的個股,都具有總市值低、每股公積金高、每股收益大,流通股本少的特點。當然,也還有其它的因素,比如當前股價、經營收益變動情況、以及以往分紅送股習慣等等。
這里我們暫時只考慮每股公積金、每股收益、流通股本和總市值四個因素,將公積金大於等於5元,每股收益大於等於5毛,流通股本在3億以下,總市值在100億以內作為高送轉預期目標(這些參數大家可根據自己的經驗隨意調整)。
2. 數據准備
首先要導入tushare:
import tushare as ts
調取股票基本面數據和行情數據
# 基本面數據 basic = ts.get_stock_basics() # 行情和市值數據 hq = ts.get_today_all()
3. 數據清洗整理
對獲取到的數據進行清洗和整理,只保留需要的字段。(其它字段及含義,請參考 http:// tushare.org 文檔)
#當前股價,如果停牌則設置當前價格為上一個交易日股價 hq['trade'] = hq.apply(lambda x:x.settlement if x.trade==0 else x.trade, axis=1) #分別選取流通股本,總股本,每股公積金,每股收益 basedata = basic[['outstanding', 'totals', 'reservedPerShare', 'esp']] #選取股票代碼,名稱,當前價格,總市值,流通市值 hqdata = hq[['code', 'name', 'trade', 'mktcap', 'nmc']] #設置行情數據code為index列 hqdata = hqdata.set_index('code') #合並兩個數據表 data = basedata.merge(hqdata, left_index=True, right_index=True)
4. 選股條件
根據上文提到的選股參數和條件,我們對數據進一步處理。
將總市值和流通市值換成億元單位
data['mktcap'] = data['mktcap'] / 10000 data['nmc'] = data['nmc'] / 10000
設置參數和過濾值(此次各自調整)
#每股公積金>=5 res = data.reservedPerShare >= 5 #流通股本<=3億 out = data.outstanding <= 30000 #每股收益>=5毛 eps = data.esp >= 0.5 #總市值<100億 mktcap = data.mktcap <= 100
取並集結果:
allcrit = res & out & eps & mktcap
selected = data[allcrit]
具有高送轉預期股票的結果呈現:
以上字段的含義分別為:股票名稱、收盤價格、每股公積金、流通股本、每股收益(應該為eps,之前發布筆誤)、總市值和流通市值。
https://zhuanlan.zhihu.com/p/23829205
Python 金叉判定

Python 過濾次新股、停牌、漲跌停

#過濾次新股、是否漲跌停、是否停牌等條件 def filcon(context,bar_dict,tar_list): def zdt_trade(stock, context, bar_dict): yesterday = history(2,'1d', 'close')[stock].values[-1] zt = round(1.10 * yesterday,2) dt = round(0.99 * yesterday,2) #last最后交易價 return dt < bar_dict[stock].last < zt filstock = [] for stock in tar_list: con1 = ipo_days(stock,context.now) > 60 con2 = bar_dict[stock].is_trading con3 = zdt_trade(stock,context,bar_dict) if con1 & con2 & con3: filstock.append(stock) return filstock
Python 按平均持倉市值調倉

# 按平均持倉市值調倉 def for_balance(context, bar_dict): #mvalues = context.portfolio.market_value #avalues = context.portfolio.portfolio_value #per = mvalues / avalues hlist = [] for stock in context.portfolio.positions: #獲取股票及對應持倉市值 hlist.append([stock,bar_dict[stock].last * context.portfolio.positions[stock].quantity]) if hlist: #按持倉市值由大到小排序 hlist = sorted(hlist,key=lambda x:x[1], reverse=True) temp = 0 for li in hlist: #計算持倉總市值 temp += li[1] for li in hlist: #平均各股持倉市值 if bar_dict[li[0]].is_trading: order_target_value(li[0], temp/len(hlist)) return
Python PCA主成分分析算法
Python主成分分析算法的作用是提取樣本的主要特征向量,從而實現數據降維的目的。

# -*- coding: utf-8 -*- """ Created on Sun Feb 28 10:04:26 2016 PCA source code @author: liudiwei """ import numpy as np import pandas as pd import matplotlib.pyplot as plt #計算均值,要求輸入數據為numpy的矩陣格式,行表示樣本數,列表示特征 def meanX(dataX): return np.mean(dataX,axis=0)#axis=0表示按照列來求均值,如果輸入list,則axis=1 #計算方差,傳入的是一個numpy的矩陣格式,行表示樣本數,列表示特征 def variance(X): m, n = np.shape(X) mu = meanX(X) muAll = np.tile(mu, (m, 1)) X1 = X - muAll variance = 1./m * np.diag(X1.T * X1) return variance #標准化,傳入的是一個numpy的矩陣格式,行表示樣本數,列表示特征 def normalize(X): m, n = np.shape(X) mu = meanX(X) muAll = np.tile(mu, (m, 1)) X1 = X - muAll X2 = np.tile(np.diag(X.T * X), (m, 1)) XNorm = X1/X2 return XNorm """ 參數: - XMat:傳入的是一個numpy的矩陣格式,行表示樣本數,列表示特征 - k:表示取前k個特征值對應的特征向量 返回值: - finalData:參數一指的是返回的低維矩陣,對應於輸入參數二 - reconData:參數二對應的是移動坐標軸后的矩陣 """ def pca(XMat, k): average = meanX(XMat) m, n = np.shape(XMat) data_adjust = [] avgs = np.tile(average, (m, 1)) data_adjust = XMat - avgs covX = np.cov(data_adjust.T) #計算協方差矩陣 featValue, featVec= np.linalg.eig(covX) #求解協方差矩陣的特征值和特征向量 index = np.argsort(-featValue) #按照featValue進行從大到小排序 finalData = [] if k > n: print "k must lower than feature number" return else: #注意特征向量時列向量,而numpy的二維矩陣(數組)a[m][n]中,a[1]表示第1行值 selectVec = np.matrix(featVec.T[index[:k]]) #所以這里需要進行轉置 finalData = data_adjust * selectVec.T reconData = (finalData * selectVec) + average return finalData, reconData def loaddata(datafile): return np.array(pd.read_csv(datafile,sep="\t",header=-1)).astype(np.float) def plotBestFit(data1, data2): dataArr1 = np.array(data1) dataArr2 = np.array(data2) m = np.shape(dataArr1)[0] axis_x1 = [] axis_y1 = [] axis_x2 = [] axis_y2 = [] for i in range(m): axis_x1.append(dataArr1[i,0]) axis_y1.append(dataArr1[i,1]) axis_x2.append(dataArr2[i,0]) axis_y2.append(dataArr2[i,1]) fig = plt.figure() ax = fig.add_subplot(111) ax.scatter(axis_x1, axis_y1, s=50, c='red', marker='s') ax.scatter(axis_x2, axis_y2, s=50, c='blue') plt.xlabel('x1'); plt.ylabel('x2'); plt.savefig("outfile.png") plt.show() #簡單測試 #數據來源:http://www.cnblogs.com/jerrylead/archive/2011/04/18/2020209.html def test(): X = [[2.5, 0.5, 2.2, 1.9, 3.1, 2.3, 2, 1, 1.5, 1.1], [2.4, 0.7, 2.9, 2.2, 3.0, 2.7, 1.6, 1.1, 1.6, 0.9]] XMat = np.matrix(X).T k = 2 return pca(XMat, k) #根據數據集data.txt def main(): datafile = "data.txt" XMat = loaddata(datafile) k = 2 return pca(XMat, k) if __name__ == "__main__": finalData, reconMat = main() plotBestFit(finalData, reconMat)
經過主成分降維的數據如紅色圖案所示,藍色的是恢復的原始數據。可以看到經過降維的數據樣本差異更加明顯。
Python KNN最近鄰分類算法

# -*- coding: utf-8 -*- """ Created on Mon Feb 22 13:21:22 2016 K-NearestNeighbor """ import numpy as np import operator class KNNClassifier(): """This is a Nearest Neighbor classifier. """ #定義k的值 def __init__(self, k=3): self._k = k #計算新樣本與已知分類樣本的距離並從小到大排列 def _calEDistance(self, inSample, dataset): m = dataset.shape[0] diffMat = np.tile(inSample, (m,1)) - dataset sqDiffMat = diffMat**2 #每個元素平方 sqDistances = sqDiffMat.sum(axis = 1) #求和 distances = sqDistances ** 0.5 #開根號 return distances.argsort() #按距離的從小到達排列的下標值 def _classify0(self, inX, dataSet, labels): k = self._k dataSetSize = dataSet.shape[0] diffMat = np.tile(inX, (dataSetSize,1)) - dataSet sqDiffMat = diffMat**2 sqDistances = sqDiffMat.sum(axis=1) distances = sqDistances**0.5 sortedDistIndicies = distances.argsort() classCount={} for i in range(k): voteIlabel = labels[sortedDistIndicies[i]] classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1 sortedClassCount = sorted(classCount.iteritems(), key=operator.itemgetter(1), reverse=True) return sortedClassCount[0][0] #對一個樣本進行分類 def _classify(self, sample, train_X, train_y): #數據類型檢測 if isinstance(sample, np.ndarray) and isinstance(train_X, np.ndarray) \ and isinstance(train_y, np.ndarray): pass else: try: sample = np.array(sample) train_X = np.array(train_X) train_y = np.array(train_y) except: raise TypeError("numpy.ndarray required for train_X and ..") sortedDistances = self._calEDistance(sample, train_X) classCount = {} for i in range(self._k): oneVote = train_y[sortedDistances[i]] #獲取最近的第i個點的類別 classCount[oneVote] = classCount.get(oneVote, 0) + 1 sortedClassCount = sorted(classCount.iteritems(),\ key=operator.itemgetter(1), reverse=True) #print "the sample :", sample, "is classified as",sortedClassCount[0][0] return sortedClassCount[0][0] def classify(self, test_X, train_X, train_y): results = [] #數據類型檢測 if isinstance(test_X, np.ndarray) and isinstance(train_X, np.ndarray) \ and isinstance(train_y, np.ndarray): pass else: try: test_X = np.array(test_X) train_X = np.array(train_X) train_y = np.array(train_y) except: raise TypeError("numpy.ndarray required for train_X and ..") d = len(np.shape(test_X)) if d == 1: sample = test_X result = self._classify(sample, train_X, train_y) results.append(result) else: for i in range(len(test_X)): sample = test_X[i] result = self._classify(sample, train_X, train_y) results.append(result) return results if __name__=="__main__": train_X = [[1, 2, 0, 1, 0], [0, 1, 1, 0, 1], [1, 0, 0, 0, 1], [2, 1, 1, 0, 1], [1, 1, 0, 1, 1]] train_y = [1, 1, 0, 0, 0] clf = KNNClassifier(k = 3) sample = [[1,2,0,1,0],[1,2,0,1,1]] result = clf.classify(sample, train_X, train_y)
第二部分:KNN測試代碼

# -*- coding: utf-8 -*- """ Created on Mon Feb 22 13:21:22 2016 K-NearestNeighbor """ import numpy as np import operator class KNNClassifier(): """This is a Nearest Neighbor classifier. """ #定義k的值 def __init__(self, k=3): self._k = k #計算新樣本與已知分類樣本的距離並從小到大排列 def _calEDistance(self, inSample, dataset): m = dataset.shape[0] diffMat = np.tile(inSample, (m,1)) - dataset sqDiffMat = diffMat**2 #每個元素平方 sqDistances = sqDiffMat.sum(axis = 1) #求和 distances = sqDistances ** 0.5 #開根號 return distances.argsort() #按距離的從小到達排列的下標值 def _classify0(self, inX, dataSet, labels): k = self._k dataSetSize = dataSet.shape[0] diffMat = np.tile(inX, (dataSetSize,1)) - dataSet sqDiffMat = diffMat**2 sqDistances = sqDiffMat.sum(axis=1) distances = sqDistances**0.5 sortedDistIndicies = distances.argsort() classCount={} for i in range(k): voteIlabel = labels[sortedDistIndicies[i]] classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1 sortedClassCount = sorted(classCount.iteritems(), key=operator.itemgetter(1), reverse=True) return sortedClassCount[0][0] #對一個樣本進行分類 def _classify(self, sample, train_X, train_y): #數據類型檢測 if isinstance(sample, np.ndarray) and isinstance(train_X, np.ndarray) \ and isinstance(train_y, np.ndarray): pass else: try: sample = np.array(sample) train_X = np.array(train_X) train_y = np.array(train_y) except: raise TypeError("numpy.ndarray required for train_X and ..") sortedDistances = self._calEDistance(sample, train_X) classCount = {} for i in range(self._k): oneVote = train_y[sortedDistances[i]] #獲取最近的第i個點的類別 classCount[oneVote] = classCount.get(oneVote, 0) + 1 sortedClassCount = sorted(classCount.iteritems(),\ key=operator.itemgetter(1), reverse=True) #print "the sample :", sample, "is classified as",sortedClassCount[0][0] return sortedClassCount[0][0] def classify(self, test_X, train_X, train_y): results = [] #數據類型檢測 if isinstance(test_X, np.ndarray) and isinstance(train_X, np.ndarray) \ and isinstance(train_y, np.ndarray): pass else: try: test_X = np.array(test_X) train_X = np.array(train_X) train_y = np.array(train_y) except: raise TypeError("numpy.ndarray required for train_X and ..") d = len(np.shape(test_X)) if d == 1: sample = test_X result = self._classify(sample, train_X, train_y) results.append(result) else: for i in range(len(test_X)): sample = test_X[i] result = self._classify(sample, train_X, train_y) results.append(result) return results if __name__=="__main__": train_X = [[1, 2, 0, 1, 0], [0, 1, 1, 0, 1], [1, 0, 0, 0, 1], [2, 1, 1, 0, 1], [1, 1, 0, 1, 1]] train_y = [1, 1, 0, 0, 0] clf = KNNClassifier(k = 3) sample = [[1,2,0,1,0],[1,2,0,1,1]] result = clf.classify(sample, train_X, train_y)
Python 決策樹算法(ID3 &C4.5)
決策樹(Decision Tree)算法:按照樣本的屬性逐步進行分類,為了能夠使分類更快、更有效。每一個新分類屬性的選擇依據可以是信息增益IG和信息增益率IGR,前者為最基本的ID3算法,后者為改進后的C4.5算法。
以ID3為例,其訓練過程的編程思路如下:
(1)輸入x、y(x為樣本,y為label),行為樣本,列為樣本特征。
(2)計算信息增益IG,獲取使IG最大的特征。
(3)獲得刪除最佳分類特征后的樣本陣列。
(4)按照最佳分類特征的屬性值將更新后的樣本進行歸類。
屬性值1(x1,y1) 屬性值2(x2,y2) 屬性值(x3,y3)
(5)分別對以上類別重復以上操作直至到達葉節點(遞歸調用)。
葉節點的特征:
(1)所有的標簽值y都一樣。
(2)沒有特征可以繼續划分。
測試過程的編程思路如下:
(1)讀取訓練好的決策樹。
(2)從根節點開始遞歸遍歷整個決策樹直到到達葉節點為止。
以下為具體代碼,訓練后的決策樹結構為遞歸套用的字典,其是由特征值組成的索引加上label組成的。

# -*- coding: utf-8 -*- """ Created on Mon Nov 07 09:06:37 2016 @author: yehx """ # -*- coding: utf-8 -*- """ Created on Sun Feb 21 12:17:10 2016 Decision Tree Source Code @author: liudiwei """ import os import numpy as np class DecitionTree(): """This is a decision tree classifier. """ def __init__(self, criteria='ID3'): self._tree = None if criteria == 'ID3' or criteria == 'C4.5': self._criteria = criteria else: raise Exception("criterion should be ID3 or C4.5") def _calEntropy(slef, y): ''' 功能:_calEntropy用於計算香農熵 e=-sum(pi*log pi) 參數:其中y為數組array 輸出:信息熵entropy ''' n = y.shape[0] labelCounts = {} for label in y: if label not in labelCounts.keys(): labelCounts[label] = 1 else: labelCounts[label] += 1 entropy = 0.0 for key in labelCounts: prob = float(labelCounts[key])/n entropy -= prob * np.log2(prob) return entropy def _splitData(self, X, y, axis, cutoff): """ 參數:X為特征,y為label,axis為某個特征的下標,cutoff是下標為axis特征取值值 輸出:返回數據集中特征下標為axis,特征值等於cutoff的子數據集 先將特征列從樣本矩陣里除去,然后將屬性值為cutoff的數據歸為一類 """ ret = [] featVec = X[:,axis] n = X.shape[1] #特征個數 #除去第axis列特征后的樣本矩陣 X = X[:,[i for i in range(n) if i!=axis]] for i in range(len(featVec)): if featVec[i] == cutoff: ret.append(i) return X[ret, :], y[ret] def _chooseBestSplit(self, X, y): """ID3 & C4.5 參數:X為特征,y為label 功能:根據信息增益或者信息增益率來獲取最好的划分特征 輸出:返回最好划分特征的下標 """ numFeat = X.shape[1] baseEntropy = self._calEntropy(y) bestSplit = 0.0 best_idx = -1 for i in range(numFeat): featlist = X[:,i] #得到第i個特征對應的特征列 uniqueVals = set(featlist) curEntropy = 0.0 splitInfo = 0.0 for value in uniqueVals: sub_x, sub_y = self._splitData(X, y, i, value) prob = len(sub_y)/float(len(y)) #計算某個特征的某個值的概率 curEntropy += prob * self._calEntropy(sub_y) #迭代計算條件熵 splitInfo -= prob * np.log2(prob) #分裂信息,用於計算信息增益率 IG = baseEntropy - curEntropy if self._criteria=="ID3": if IG > bestSplit: bestSplit = IG best_idx = i if self._criteria=="C4.5": if splitInfo == 0.0: pass IGR = IG/splitInfo if IGR > bestSplit: bestSplit = IGR best_idx = i return best_idx def _majorityCnt(self, labellist): """ 參數:labellist是類標簽,序列類型為list 輸出:返回labellist中出現次數最多的label """ labelCount={} for vote in labellist: if vote not in labelCount.keys(): labelCount[vote] = 0 labelCount[vote] += 1 sortedClassCount = sorted(labelCount.iteritems(), key=lambda x:x[1], \ reverse=True) return sortedClassCount[0][0] def _createTree(self, X, y, featureIndex): """ 參數:X為特征,y為label,featureIndex類型是元組,記錄X特征在原始數據中的下標 輸出:根據當前的featureIndex創建一顆完整的樹 """ labelList = list(y) #如果所有的標簽都一樣(葉節點),直接返回標簽 if labelList.count(labelList[0]) == len(labelList): return labelList[0] #如果沒有特征可以繼續划分,那么將所有的label歸為大多數的一類,並返回標簽 if len(featureIndex) == 0: return self._majorityCnt(labelList) #返回最佳分類特征的下標 bestFeatIndex = self._chooseBestSplit(X,y) #返回最佳分類特征的索引 bestFeatAxis = featureIndex[bestFeatIndex] featureIndex = list(featureIndex) #獲得刪除最佳分類特征索引后的列表 featureIndex.remove(bestFeatAxis) featureIndex = tuple(featureIndex) myTree = {bestFeatAxis:{}} featValues = X[:, bestFeatIndex] uniqueVals = set(featValues) for value in uniqueVals: #對每個value遞歸地創建樹 sub_X, sub_y = self._splitData(X,y, bestFeatIndex, value) myTree[bestFeatAxis][value] = self._createTree(sub_X, sub_y, \ featureIndex) return myTree def fit(self, X, y): """ 參數:X是特征,y是類標簽 注意事項:對數據X和y進行類型檢測,保證其為array 輸出:self本身 """ if isinstance(X, np.ndarray) and isinstance(y, np.ndarray): pass else: try: X = np.array(X) y = np.array(y) except: raise TypeError("numpy.ndarray required for X,y") featureIndex = tuple(['x'+str(i) for i in range(X.shape[1])]) self._tree = self._createTree(X,y,featureIndex) return self #allow using: clf.fit().predict() def _classify(self, tree, sample): """ 用訓練好的模型對輸入數據進行分類 注意:決策樹的構建是一個遞歸的過程,用決策樹分類也是一個遞歸的過程 _classify()一次只能對一個樣本(sample)分類 """ featIndex = tree.keys()[0] #得到數的根節點值 secondDict = tree[featIndex] #得到以featIndex為划分特征的結果 axis=featIndex[1:] #得到根節點特征在原始數據中的下標 key = sample[int(axis)] #獲取待分類樣本中下標為axis的值 valueOfKey = secondDict[key] #獲取secondDict中keys為key的value值 if type(valueOfKey).__name__=='dict': #如果value為dict,則繼續遞歸分類 return self._classify(valueOfKey, sample) else: return valueOfKey def predict(self, X): if self._tree==None: raise NotImplementedError("Estimator not fitted, call `fit` first") #對X的類型進行檢測,判斷其是否是數組 if isinstance(X, np.ndarray): pass else: try: X = np.array(X) except: raise TypeError("numpy.ndarray required for X") if len(X.shape) == 1: return self._classify(self._tree, X) else: result = [] for i in range(X.shape[0]): value = self._classify(self._tree, X[i]) print str(i+1)+"-th sample is classfied as:", value result.append(value) return np.array(result) def show(self, outpdf): if self._tree==None: pass #plot the tree using matplotlib import treePlotter treePlotter.createPlot(self._tree, outpdf) if __name__=="__main__": trainfile=r"data\train.txt" testfile=r"data\test.txt" import sys sys.path.append(r"F:\CSU\Github\MachineLearning\lib") import dataload as dload train_x, train_y = dload.loadData(trainfile) test_x, test_y = dload.loadData(testfile) clf = DecitionTree(criteria="C4.5") clf.fit(train_x, train_y) result = clf.predict(test_x) outpdf = r"tree.pdf" clf.show(outpdf)
Python K均值聚類
Python K均值聚類是一種無監督的機器學習算法,能夠實現自動歸類的功能。
算法步驟如下:
(1)隨機產生K個分類中心,一般稱為質心。
(2)將所有樣本划分到距離最近的質心代表的分類中。(距離可以是歐氏距離、曼哈頓距離、夾角余弦等)
(3)計算分類后的質心,可以用同一類中所有樣本的平均屬性來代表新的質心。
(4)重復(2)(3)兩步,直到滿足以下其中一個條件:
1)分類結果沒有發生改變。
2)最小誤差(如平方誤差)達到所要求的范圍。
3)迭代總數達到設置的最大值。
常見的K均值聚類算法還有2分K均值聚類算法,其步驟如下:
(1)將所有樣本作為一類。
(2)按照傳統K均值聚類的方法將樣本分為兩類。
(3)對以上兩類分別再分為兩類,且分別計算兩種情況下誤差,僅保留誤差更小的分類;即第(2)步產生的兩類其中一類保留,另一類進行再次分類。
(4)重復對已有類別分別進行二分類,同理保留誤差最小的分類,直到達到所需要的分類數目。
具體Python代碼如下:

# -*- coding: utf-8 -*- """ Created on Tue Nov 08 14:01:44 2016 K - means cluster """ import numpy as np class KMeansClassifier(): "this is a k-means classifier" def __init__(self, k=3, initCent='random', max_iter=500 ): self._k = k self._initCent = initCent self._max_iter = max_iter self._clusterAssment = None self._labels = None self._sse = None def _calEDist(self, arrA, arrB): """ 功能:歐拉距離距離計算 輸入:兩個一維數組 """ return np.math.sqrt(sum(np.power(arrA-arrB, 2))) def _calMDist(self, arrA, arrB): """ 功能:曼哈頓距離距離計算 輸入:兩個一維數組 """ return sum(np.abs(arrA-arrB)) def _randCent(self, data_X, k): """ 功能:隨機選取k個質心 輸出:centroids #返回一個m*n的質心矩陣 """ n = data_X.shape[1] #獲取特征的維數 centroids = np.empty((k,n)) #使用numpy生成一個k*n的矩陣,用於存儲質心 for j in range(n): minJ = min(data_X[:, j]) rangeJ = float(max(data_X[:, j] - minJ)) #使用flatten拉平嵌套列表(nested list) centroids[:, j] = (minJ + rangeJ * np.random.rand(k, 1)).flatten() return centroids def fit(self, data_X): """ 輸入:一個m*n維的矩陣 """ if not isinstance(data_X, np.ndarray) or \ isinstance(data_X, np.matrixlib.defmatrix.matrix): try: data_X = np.asarray(data_X) except: raise TypeError("numpy.ndarray resuired for data_X") m = data_X.shape[0] #獲取樣本的個數 #一個m*2的二維矩陣,矩陣第一列存儲樣本點所屬的族的索引值, #第二列存儲該點與所屬族的質心的平方誤差 self._clusterAssment = np.zeros((m,2)) if self._initCent == 'random': self._centroids = self._randCent(data_X, self._k) clusterChanged = True for _ in range(self._max_iter): #使用"_"主要是因為后面沒有用到這個值 clusterChanged = False for i in range(m): #將每個樣本點分配到離它最近的質心所屬的族 minDist = np.inf #首先將minDist置為一個無窮大的數 minIndex = -1 #將最近質心的下標置為-1 for j in range(self._k): #次迭代用於尋找最近的質心 arrA = self._centroids[j,:] arrB = data_X[i,:] distJI = self._calEDist(arrA, arrB) #計算誤差值 if distJI <</span> minDist: minDist = distJI minIndex = j if self._clusterAssment[i,0] !=minIndex: clusterChanged = True self._clusterAssment[i,:] = minIndex, minDist**2 if not clusterChanged:#若所有樣本點所屬的族都不改變,則已收斂,結束迭代 break for i in range(self._k):#更新質心,將每個族中的點的均值作為質心 index_all = self._clusterAssment[:,0] #取出樣本所屬簇的索引值 value = np.nonzero(index_all==i) #取出所有屬於第i個簇的索引值 ptsInClust = data_X[value[0]] #取出屬於第i個簇的所有樣本點 self._centroids[i,:] = np.mean(ptsInClust, axis=0) #計算均值 self._labels = self._clusterAssment[:,0] self._sse = sum(self._clusterAssment[:,1]) def predict(self, X):#根據聚類結果,預測新輸入數據所屬的族 #類型檢查 if not isinstance(X,np.ndarray): try: X = np.asarray(X) except: raise TypeError("numpy.ndarray required for X") m = X.shape[0]#m代表樣本數量 preds = np.empty((m,)) for i in range(m):#將每個樣本點分配到離它最近的質心所屬的族 minDist = np.inf for j in range(self._k): distJI = self._calEDist(self._centroids[j,:], X[i,:]) if distJI <</span> minDist: minDist = distJI preds[i] = j return preds class biKMeansClassifier(): "this is a binary k-means classifier" def __init__(self, k=3): self._k = k self._centroids = None self._clusterAssment = None self._labels = None self._sse = None def _calEDist(self, arrA, arrB): """ 功能:歐拉距離距離計算 輸入:兩個一維數組 """ return np.math.sqrt(sum(np.power(arrA-arrB, 2))) def fit(self, X): m = X.shape[0] self._clusterAssment = np.zeros((m,2)) centroid0 = np.mean(X, axis=0).tolist() centList =[centroid0] for j in range(m):#計算每個樣本點與質心之間初始的平方誤差 self._clusterAssment[j,1] = self._calEDist(np.asarray(centroid0), \ X[j,:])**2 while (len(centList) <</span> self._k): lowestSSE = np.inf #嘗試划分每一族,選取使得誤差最小的那個族進行划分 for i in range(len(centList)): index_all = self._clusterAssment[:,0] #取出樣本所屬簇的索引值 value = np.nonzero(index_all==i) #取出所有屬於第i個簇的索引值 ptsInCurrCluster = X[value[0],:] #取出屬於第i個簇的所有樣本點 clf = KMeansClassifier(k=2) clf.fit(ptsInCurrCluster) #划分該族后,所得到的質心、分配結果及誤差矩陣 centroidMat, splitClustAss = clf._centroids, clf._clusterAssment sseSplit = sum(splitClustAss[:,1]) index_all = self._clusterAssment[:,0] value = np.nonzero(index_all==i) sseNotSplit = sum(self._clusterAssment[value[0],1]) if (sseSplit + sseNotSplit) <</span> lowestSSE: bestCentToSplit = i bestNewCents = centroidMat bestClustAss = splitClustAss.copy() lowestSSE = sseSplit + sseNotSplit #該族被划分成兩個子族后,其中一個子族的索引變為原族的索引 #另一個子族的索引變為len(centList),然后存入centList bestClustAss[np.nonzero(bestClustAss[:,0]==1)[0],0]=len(centList) bestClustAss[np.nonzero(bestClustAss[:,0]==0)[0],0]=bestCentToSplit centList[bestCentToSplit] = bestNewCents[0,:].tolist() centList.append(bestNewCents[1,:].tolist()) self._clusterAssment[np.nonzero(self._clusterAssment[:,0] == \ bestCentToSplit)[0],:]= bestClustAss self._labels = self._clusterAssment[:,0] self._sse = sum(self._clusterAssment[:,1]) self._centroids = np.asarray(centList) def predict(self, X):#根據聚類結果,預測新輸入數據所屬的族 #類型檢查 if not isinstance(X,np.ndarray): try: X = np.asarray(X) except: raise TypeError("numpy.ndarray required for X") m = X.shape[0]#m代表樣本數量 preds = np.empty((m,)) for i in range(m):#將每個樣本點分配到離它最近的質心所屬的族 minDist = np.inf for j in range(self._k): distJI = self._calEDist(self._centroids[j,:],X[i,:]) if distJI <</span> minDist: minDist = distJI preds[i] = j return preds
Python股票歷史漲跌幅數據獲取
股票漲跌幅數據是量化投資學習的基本數據資料之一,下面以Python代碼編程為工具,獲得所需要的歷史數據。主要步驟有:
(1) #按照市值從小到大的順序活得N支股票的代碼;
(2) #分別對這一百只股票進行100支股票操作;
(3) #獲取從2016.05.01到2016.11.17的漲跌幅數據;
(4) #選取記錄大於40個的數據,去除次新股;
(5) #將文件名名為“股票代碼.csv”。
具體代碼如下:

# -*- coding: utf-8 -*- """ Created on Thu Nov 17 23:04:33 2016 獲取股票的歷史漲跌幅,並分別存為csv格式 @author: yehx """ import numpy as np import pandas as pd #按照市值從小到大的順序活得100支股票的代碼 df = get_fundamentals( query(fundamentals.eod_derivative_indicator.market_cap) .order_by(fundamentals.eod_derivative_indicator.market_cap.asc()) .limit(100),'2016-11-17', '1y' ) #分別對這一百只股票進行100支股票操作 #獲取從2016.05.01到2016.11.17的漲跌幅數據 #選取記錄大於40個的數據,去除次新股 #將文件名名為“股票代碼.csv” for stock in range(100): priceChangeRate = get_price_change_rate(df['market_cap'].columns[stock], '20160501', '20161117') if priceChangeRate is None: openDays = 0 else: openDays = len(priceChangeRate) if openDays > 40: tempPrice = priceChangeRate[39:(openDays - 1)] for rate in range(len(tempPrice)): tempPrice[rate] = "%.3f" %tempPrice[rate] fileName = '' fileName = fileName.join(df['market_cap'].columns[i].split('.')) + '.csv' fileName tempPrice.to_csv(fileName)
Python Logistic 回歸分類
Logistic回歸可以認為是線性回歸的延伸,其作用是對二分類樣本進行訓練,從而對達到預測新樣本分類的目的。
假設有一組已知分類的MxN維樣本X,M為樣本數,N為特征維度,其相應的已知分類標簽為Mx1維矩陣Y。那么Logistic回歸的實現思路如下:
(1)用一組權重值W(Nx1)對X的特征進行線性變換,得到變換后的樣本X’(Mx1),其目標是使屬於不同分類的樣本X’存在一個明顯的一維邊界。
(2)然后再對樣本X’進一步做函數變換,從而使處於一維邊界兩測的值變換到相應的范圍之內。
(3)訓練過程就是通過改變W盡可能使得到的值位於一維邊界兩側,並且與已知分類相符。
(4)對於Logistic回歸,就是將原樣本的邊界變換到x=0這個邊界。
下面是Logistic回歸的典型代碼:

# -*- coding: utf-8 -*- """ Created on Wed Nov 09 15:21:48 2016 Logistic回歸分類 """ import numpy as np class LogisticRegressionClassifier(): def __init__(self): self._alpha = None #定義一個sigmoid函數 def _sigmoid(self, fx): return 1.0/(1 + np.exp(-fx)) #alpha為步長(學習率);maxCycles最大迭代次數 def _gradDescent(self, featData, labelData, alpha, maxCycles): dataMat = np.mat(featData) #size: m*n labelMat = np.mat(labelData).transpose() #size: m*1 m, n = np.shape(dataMat) weigh = np.ones((n, 1)) for i in range(maxCycles): hx = self._sigmoid(dataMat * weigh) error = labelMat - hx #size:m*1 weigh = weigh + alpha * dataMat.transpose() * error#根據誤差修改回歸系數 return weigh #使用梯度下降方法訓練模型,如果使用其它的尋參方法,此處可以做相應修改 def fit(self, train_x, train_y, alpha=0.01, maxCycles=100): return self._gradDescent(train_x, train_y, alpha, maxCycles) #使用學習得到的參數進行分類 def predict(self, test_X, test_y, weigh): dataMat = np.mat(test_X) labelMat = np.mat(test_y).transpose() #使用transpose()轉置 hx = self._sigmoid(dataMat*weigh) #size:m*1 m = len(hx) error = 0.0 for i in range(m): if int(hx[i]) > 0.5: print str(i+1)+'-th sample ', int(labelMat[i]), 'is classfied as: 1' if int(labelMat[i]) != 1: error += 1.0 print "classify error." else: print str(i+1)+'-th sample ', int(labelMat[i]), 'is classfied as: 0' if int(labelMat[i]) != 0: error += 1.0 print "classify error." error_rate = error/m print "error rate is:", "%.4f" %error_rate return error_rate
Python 朴素貝葉斯(Naive Bayes)分類
Naïve Bayes 分類的核心是計算條件概率P(y|x),其中y為類別,x為特征向量。其意義是在x樣本出現時,它被划分為y類的可能性(概率)。通過計算不同分類下的概率,進而把樣本划分到概率最大的一類。
根據條件概率的計算公式可以得到:
P(y|x) = P(y)*P(x|y)/P(x)。
由於在計算不同分類概率是等式右邊的分母是相同的,所以只需比較分子的大小。並且,如果各個樣本特征是獨立分布的,那么p(x
|y)等於p(xi|y)相乘。
下面以文本分類來介紹Naïve Bayes分類的應用。其思路如下:
(1)建立詞庫,即無重復的單詞表。
(2)分別計算詞庫中類別標簽出現的概率P(y)。
(3)分別計算各個類別標簽下不同單詞出現的概率P(xi|y)。
(4)在不同類別下,將待分類樣本各個特征出現概率((xi|y)相乘,然后在乘以對應的P(y)。
(5)比較不同類別下(4)中結果,將待分類樣本分到取值最大的類別。
下面是Naïve Bayes 文本分類的Python代碼,其中為了方便計算,程序中借助log對數函數將乘法轉化為了加法。

# -*- coding: utf-8 -*- """ Created on Mon Nov 14 11:15:47 2016 Naive Bayes Clssification """ # -*- coding: utf-8 -*- import numpy as np class NaiveBayes: def __init__(self): self._creteria = "NB" def _createVocabList(self, dataList): """ 創建一個詞庫向量 """ vocabSet = set([]) for line in dataList: print set(line) vocabSet = vocabSet | set(line) return list(vocabSet) #文檔詞集模型 def _setOfWords2Vec(self, vocabList, inputSet): """ 功能:根據給定的一行詞,將每個詞映射到此庫向量中,出現則標記為1,不出現則為0 """ outputVec = [0] * len(vocabList) for word in inputSet: if word in vocabList: outputVec[vocabList.index(word)] = 1 else: print "the word:%s is not in my vocabulary!" % word return outputVec # 修改 _setOfWordsVec 文檔詞袋模型 def _bagOfWords2VecMN(self, vocabList, inputSet): """ 功能:對每行詞使用第二種統計策略,統計單個詞的個數,然后映射到此庫中 輸出:一個n維向量,n為詞庫的長度,每個取值為單詞出現的次數 """ returnVec = [0]*len(vocabList) for word in inputSet: if word in vocabList: returnVec[vocabList.index(word)] += 1 # 更新此處代碼 return returnVec def _trainNB(self, trainMatrix, trainLabel): """ 輸入:訓練矩陣和類別標簽,格式為numpy矩陣格式 功能:計算條件概率和類標簽概率 """ numTrainDocs = len(trainMatrix) #統計樣本個數 numWords = len(trainMatrix[0]) #統計特征個數,理論上是詞庫的長度 pNeg = sum(trainLabel)/float(numTrainDocs) #計算負樣本出現的概率 p0Num = np.ones(numWords) #初始樣本個數為1,防止條件概率為0,影響結果 p1Num = np.ones(numWords) #作用同上 p0InAll = 2.0 #詞庫中只有兩類,所以此處初始化為2(use laplace) p1InAll = 2.0 # 再單個文檔和整個詞庫中更新正負樣本數據 for i in range(numTrainDocs): if trainLabel[i] == 1: p1Num += trainMatrix[i] p1InAll += sum(trainMatrix[i]) else: p0Num += trainMatrix[i] p0InAll += sum(trainMatrix[i]) print p1InAll #計算給定類別的條件下,詞匯表中單詞出現的概率 #然后取log對數,解決條件概率乘積下溢 p0Vect = np.log(p0Num/p0InAll) #計算類標簽為0時的其它屬性發生的條件概率 p1Vect = np.log(p1Num/p1InAll) #log函數默認以e為底 #p(ci|w=0) return p0Vect, p1Vect, pNeg def _classifyNB(self, vecSample, p0Vec, p1Vec, pNeg): """ 使用朴素貝葉斯進行分類,返回結果為0/1 """ prob_y0 = sum(vecSample * p0Vec) + np.log(1-pNeg) prob_y1 = sum(vecSample * p1Vec) + np.log(pNeg) #log是以e為底 if prob_y0 <</span> prob_y1: return 1 else: return 0 # 測試NB算法 def testingNB(self, testSample): listOPosts, listClasses = loadDataSet() myVocabList = self._createVocabList(listOPosts) # print myVocabList trainMat=[] for postinDoc in listOPosts: trainMat.append(self._bagOfWords2VecMN(myVocabList, postinDoc)) p0V,p1V,pAb = self._trainNB(np.array(trainMat), np.array(listClasses)) print trainMat thisSample = np.array(self._bagOfWords2VecMN(myVocabList, testSample)) result = self._classifyNB(thisSample, p0V, p1V, pAb) print testSample,'classified as: ', result return result ############################################################################### def loadDataSet(): wordsList=[['my', 'dog', 'has', 'flea', 'problems', 'help', 'please'], ['maybe', 'not', 'take', 'him', 'to', 'dog', 'park', 'stupid'], ['my', 'dalmation', 'is', 'so', 'cute', ' and', 'I', 'love', 'him'], ['stop', 'posting', 'stupid', 'worthless', 'garbage'], ['mr', 'licks','ate','my', 'steak', 'how', 'to', 'stop', 'him'], ['quit', 'buying', 'worthless', 'dog', 'food', 'stupid']] classLable = [0,1,0,1,0,1] # 0:good; 1:bad return wordsList, classLable if __name__=="__main__": clf = NaiveBayes() testEntry = [['love', 'my', 'girl', 'friend'], ['stupid', 'garbage'], ['Haha', 'I', 'really', "Love", "You"], ['This', 'is', "my", "dog"]] clf.testingNB(testEntry[0]) # for item in testEntry: # clf.testingNB(item)
Python股票歷史數據預處理(一)
在進行量化投資交易編程時,我們需要股票歷史數據作為分析依據,下面介紹如何通過Python獲取股票歷史數據並且將結果存為DataFrame格式。處理后的股票歷史數據下載鏈接為:http://download.csdn.net/detail/suiyingy/9688505。
具體步驟如下:
- (1) 建立股票池,這里按照股本大小來作為選擇依據。
- (2) 分別讀取股票池中所有股票的歷史漲跌幅。
- (3) 將各支股票的歷史漲跌幅存到DataFrame結構變量中,每一列代表一支股票,對於在指定時間內還沒有發行的股票的漲跌幅設置為0。
- (4) 將DataFrame最后一行的數值設置為各支股票對應的交易天數。
- (5) 將DataFrame數據存到csv文件中去。
具體代碼如下:

# -*- coding: utf-8 -*- """ Created on Thu Nov 17 23:04:33 2016 獲取股票的歷史漲跌幅,先合並為DataFrame后存為csv格式 @author: yehx """ import numpy as np import pandas as pd #按照市值從小到大的順序獲得50支股票的代碼 df = get_fundamentals( query(fundamentals.eod_derivative_indicator.market_cap) .order_by(fundamentals.eod_derivative_indicator.market_cap.asc()) .limit(50),'2016-11-17', '1y' ) b1= {} priceChangeRate_300 = get_price_change_rate('000300.XSHG', '20060101', '20161118') df300 = pd.DataFrame(priceChangeRate_300) lenReference = len(priceChangeRate_300) dfout = df300 dflen = pd.DataFrame() dflen['000300.XSHG'] = [lenReference] #分別對這一百只股票進行50支股票操作 #獲取從2006.01.01到2016.11.17的漲跌幅數據 #將數據存到DataFrame中 #DataFrame存為csv文件 for stock in range(50): priceChangeRate = get_price_change_rate(df['market_cap'].columns[stock], '20150101', '20161118') if priceChangeRate is None: openDays = 0 else: openDays = len(priceChangeRate) dftempPrice = pd.DataFrame(priceChangeRate) tempArr = [] for i in range(lenReference): if df300.index[i] in list(dftempPrice.index): #保存為4位有效數字 tempArr.append( "%.4f" %((dftempPrice.loc[str(df300.index[i])][0]))) pass else: tempArr.append(float(0.0)) fileName = '' fileName = fileName.join(df['market_cap'].columns[stock].split('.')) dfout[fileName] = tempArr dflen[fileName] = [len(priceChangeRate)] dfout = dfout.append(dflen) dfout.to_csv('00050.csv')
Python股票歷史數據預處理(二)
從網上下載的股票歷史數據往往不能直接使用,需要轉換為自己所需要的格式。下面以Python代碼編程為工具,將csv文件中存儲的股票歷史數據提取出來並處理。處理的數據結果為是30天漲跌幅子數據庫,下載地址為:http://download.csdn.net/detail/suiyingy/9688605。
主要步驟有(Python csv數據讀寫):
- #csv文件讀取股票歷史漲跌幅數據;
- #隨機選取30個歷史漲跌幅數據;
- #構建自己的數據庫;
- #將處理結果保存為新的csv文件。
具體代碼如下:

# -*- coding: utf-8 -*- """ Created on Thu Nov 17 23:04:33 2016 csv格式股票歷史漲跌幅數據處理 @author: yehx """ import numpy as np import pandas as pd import random import csv import sys reload(sys) sys.setdefaultencoding('utf-8') ''' - 加載csv格式數據 ''' def loadCSVfile1(datafile): filelist = [] with open(datafile) as file: lines = csv.reader(file) for oneline in lines: filelist.append(oneline) filelist = np.array(filelist) return filelist #數據處理 #隨機選取30個歷史漲跌幅數據 #構建自己的數據庫 def dataProcess(dataArr, subLen): totLen, totWid = np.shape(data) print totLen, totWid lenArr = dataArr[totLen-1,2:totWid] columnCnt = 1 dataOut = [] for lenData in lenArr: columnCnt = columnCnt + 1 N60 = int(lenData) / (2 * subLen) print N60 if N60 > 0: randIndex = random.sample(range(totLen-int(lenData)-1,totLen-subLen), N60) for i in randIndex: dataOut.append(dataArr[i:(i+subLen),columnCnt]) dataOut = np.array(dataOut) return dataOut if __name__=="__main__": datafile = "00100 (3).csv" data = loadCSVfile1(datafile) df = pd.DataFrame(data) m, n = np.shape(data) dataOut = dataProcess(data, 30) m, n = np.shape(dataOut) #保存處理結果 csvfile = file('csvtest.csv', 'wb') writer = csv.writer(csvfile) writer.writerows(dataOut) csvfile.close()
http://blog.sina.com.cn/s/articlelist_6017673753_0_1.html