python量化之股票分析


Python股票數據分析

最近在學習基於python的股票數據分析,其中主要用到了tushare和seaborn。tushare是一款財經類數據接口包,國內的股票數據還是比較全的

官網地址:http://tushare.waditu.com/index.html#id5。seaborn則是一款繪圖庫,通過seaborn可以輕松地畫出簡潔漂亮的圖表,而且庫本身具有一定的統計功能。

  導入的模塊:

import matplotlib.pyplot as plt

  import seaborn as sns

  import seaborn.linearmodels as snsl

from datetime import datetime

  import tushare as ts

代碼部分:

  股票收盤價走勢曲線

  sns.set_style("whitegrid")

  end = datetime.today() #開始時間結束時間,選取最近一年的數據

  start = datetime(end.year-1,end.month,end.day)

  end = str(end)[0:10]

  start = str(start)[0:10]

stock = ts.get_hist_data('300104',start,end)#選取一支股票

  stock['close'].plot(legend=True ,figsize=(10,4))

  plt.show()

股票日線

同理,可以做出5日均線、10日均線以及20日均線

  stock[['close','ma5','ma10','ma20']].plot(legend=True ,figsize=(10,4))

日線、5日均線、10日均線、20日均線

股票每日漲跌幅度

  stock['Daily Return'] = stock['close'].pct_change()

  stock['Daily Return'].plot(legend=True,figsize=(10,4))

每日漲跌幅

核密度估計

  sns.kdeplot(stock['Daily Return'].dropna())

核密度估計

核密度估計+統計柱狀圖

  sns.distplot(stock['Daily Return'].dropna(),bins=100)

核密度+柱狀圖

兩支股票的皮爾森相關系數

  sns.jointplot(stock['Daily Return'],stock['Daily Return'],alpha=0.2)

皮爾森相關系數

多只股票相關性計算

  stock_lis=['300113','300343','300295','300315`] #隨便選取了四支互聯網相關的股票

  df=pd.DataFrame()

  for stock in stock_lis: closing_df = ts.get_hist_data(stock,start,end)['close'] df = df.join(pd.DataFrame({stock:closing_df}),how='outer')

  tech_rets = df.pct_change()

  snsl.corrplot(tech_rets.dropna())

  相關性

簡單地計算股票的收益與風險,衡量股票收益與風險的數值分別為股票漲跌的平均值以及標准差,平均值為正則說明收益是正的,標准差越大則說明股票波動大,風險也大。

  rets = tech_rets.dropna()

  plt.scatter(rets.mean(),rets.std())

  plt.xlabel('Excepted Return')

  plt.ylabel('Risk')

  for label,x,y in zip(rets.columns,rets.mean(),rets.std()):#添加標注 plt.annotate( label, xy =(x,y),xytext=(15,15), textcoords = 'offset points', arrowprops = dict(arrowstyle = '-',connectionstyle = 'arc3,rad=-0.3'))

聲明:本文由入駐搜狐公眾平台的作者撰寫,除搜狐官方賬號外,觀點僅代表作者本人,不代表搜狐立場。

 

 

用Python分析公開數據選出高送轉預期股票

根據以往的經驗,每年年底都會有一波高送轉預期行情。今天,米哥就帶大家實踐一下如何利用tushare實現高送轉預期選股。

本文主要是講述選股的思路方法,選股條件和參數大家可以根據米哥提供的代碼自行修改。

1. 選股原理

一般來說,具備高送轉預期的個股,都具有總市值低、每股公積金高、每股收益大,流通股本少的特點。當然,也還有其它的因素,比如當前股價、經營收益變動情況、以及以往分紅送股習慣等等。

這里我們暫時只考慮每股公積金、每股收益、流通股本和總市值四個因素,將公積金大於等於5元,每股收益大於等於5毛,流通股本在3億以下,總市值在100億以內作為高送轉預期目標(這些參數大家可根據自己的經驗隨意調整)。

2. 數據准備

首先要導入tushare:

import tushare as ts

調取股票基本面數據和行情數據

# 基本面數據 basic = ts.get_stock_basics() # 行情和市值數據 hq = ts.get_today_all()

3. 數據清洗整理

對獲取到的數據進行清洗和整理,只保留需要的字段。(其它字段及含義,請參考 http:// tushare.org 文檔)

#當前股價,如果停牌則設置當前價格為上一個交易日股價 hq['trade'] = hq.apply(lambda x:x.settlement if x.trade==0 else x.trade, axis=1) #分別選取流通股本,總股本,每股公積金,每股收益 basedata = basic[['outstanding', 'totals', 'reservedPerShare', 'esp']] #選取股票代碼,名稱,當前價格,總市值,流通市值 hqdata = hq[['code', 'name', 'trade', 'mktcap', 'nmc']] #設置行情數據code為index列 hqdata = hqdata.set_index('code') #合並兩個數據表 data = basedata.merge(hqdata, left_index=True, right_index=True)

4. 選股條件

根據上文提到的選股參數和條件,我們對數據進一步處理。

將總市值和流通市值換成億元單位
data['mktcap'] = data['mktcap'] / 10000 data['nmc'] = data['nmc'] / 10000

設置參數和過濾值(此次各自調整)

#每股公積金>=5 res = data.reservedPerShare >= 5 #流通股本<=3億 out = data.outstanding <= 30000 #每股收益>=5毛 eps = data.esp >= 0.5 #總市值<100億 mktcap = data.mktcap <= 100

取並集結果:

allcrit = res & out & eps & mktcap
selected = data[allcrit]

具有高送轉預期股票的結果呈現:

以上字段的含義分別為:股票名稱、收盤價格、每股公積金、流通股本、每股收益(應該為eps,之前發布筆誤)、總市值和流通市值。

https://zhuanlan.zhihu.com/p/23829205

 

 

 

Python 金叉判定


def
jincha(context, bar_dict, his): #站上5日線 def zs5(context, bar_dict, his): ma_n = pd.rolling_mean(his, 5) temp = his - ma_n ​ #temp_s包含了前一天站上五日線得股票代碼 temp_s = list(temp[temp>0].iloc[-1,:].dropna().index) return temp_s #站上10日線 def zs10(context, bar_dict, his): ma_n = pd.rolling_mean(his, 10) temp = his - ma_n temp_s = list(temp[temp>0].iloc[-1,:].dropna().index) return temp_s #金叉突破 def jc(context, bar_dict, his): mas = pd.rolling_mean(his,5) mal = pd.rolling_mean(his, 10) temp = mas - mal #temp_jc昨天大於0股票代碼 #temp_r前天大於0股票代碼​ temp_jc = list(temp[temp>0].iloc[-1,:].dropna().index) temp_r = list(temp[temp>0].iloc[-2,:].dropna().index) temp = [] for stock in temp_jc: if stock not in temp_r: temp.append(stock) return temp #求三種條件下的股票代碼交集 con1 = zs5(context, bar_dict, his) con2 = zs10(context, bar_dict, his) con3 = jc(context, bar_dict, his) tar_list=[con1,con2,con3]​ tarstock = tar_list[0] for i in tar_list: tarstock = list(set(tarstock).intersection(set(i))) return tarstock

 

Python 過濾次新股、停牌、漲跌停


#
過濾次新股、是否漲跌停、是否停牌等條件 def filcon(context,bar_dict,tar_list): def zdt_trade(stock, context, bar_dict): yesterday = history(2,'1d', 'close')[stock].values[-1] zt = round(1.10 * yesterday,2) dt = round(0.99 * yesterday,2) #last最后交易價 return dt < bar_dict[stock].last < zt filstock = [] for stock in tar_list: con1 = ipo_days(stock,context.now) > 60 con2 = bar_dict[stock].is_trading con3 = zdt_trade(stock,context,bar_dict) if con1 & con2 & con3: filstock.append(stock) return filstock

 

Python 按平均持倉市值調倉


#
按平均持倉市值調倉 def for_balance(context, bar_dict): #mvalues = context.portfolio.market_value #avalues = context.portfolio.portfolio_value #per = mvalues / avalues hlist = [] for stock in context.portfolio.positions: #獲取股票及對應持倉市值 hlist.append([stock,bar_dict[stock].last * context.portfolio.positions[stock].quantity]) if hlist: #按持倉市值由大到小排序 hlist = sorted(hlist,key=lambda x:x[1], reverse=True) temp = 0 for li in hlist: #計算持倉總市值 temp += li[1] for li in hlist: #平均各股持倉市值 if bar_dict[li[0]].is_trading: order_target_value(li[0], temp/len(hlist)) return

 

Python PCA主成分分析算法

Python主成分分析算法的作用是提取樣本的主要特征向量,從而實現數據降維的目的。


#
-*- coding: utf-8 -*- """ Created on Sun Feb 28 10:04:26 2016 PCA source code @author: liudiwei """ import numpy as np import pandas as pd import matplotlib.pyplot as plt #計算均值,要求輸入數據為numpy的矩陣格式,行表示樣本數,列表示特征 def meanX(dataX): return np.mean(dataX,axis=0)#axis=0表示按照列來求均值,如果輸入list,則axis=1 #計算方差,傳入的是一個numpy的矩陣格式,行表示樣本數,列表示特征 def variance(X): m, n = np.shape(X) mu = meanX(X) muAll = np.tile(mu, (m, 1)) X1 = X - muAll variance = 1./m * np.diag(X1.T * X1) return variance #標准化,傳入的是一個numpy的矩陣格式,行表示樣本數,列表示特征 def normalize(X): m, n = np.shape(X) mu = meanX(X) muAll = np.tile(mu, (m, 1)) X1 = X - muAll X2 = np.tile(np.diag(X.T * X), (m, 1)) XNorm = X1/X2 return XNorm """ 參數: - XMat:傳入的是一個numpy的矩陣格式,行表示樣本數,列表示特征 - k:表示取前k個特征值對應的特征向量 返回值: - finalData:參數一指的是返回的低維矩陣,對應於輸入參數二 - reconData:參數二對應的是移動坐標軸后的矩陣 """ def pca(XMat, k): average = meanX(XMat) m, n = np.shape(XMat) data_adjust = [] avgs = np.tile(average, (m, 1)) data_adjust = XMat - avgs covX = np.cov(data_adjust.T) #計算協方差矩陣 featValue, featVec= np.linalg.eig(covX) #求解協方差矩陣的特征值和特征向量 index = np.argsort(-featValue) #按照featValue進行從大到小排序 finalData = [] if k > n: print "k must lower than feature number" return else: #注意特征向量時列向量,而numpy的二維矩陣(數組)a[m][n]中,a[1]表示第1行值 selectVec = np.matrix(featVec.T[index[:k]]) #所以這里需要進行轉置 finalData = data_adjust * selectVec.T reconData = (finalData * selectVec) + average return finalData, reconData def loaddata(datafile): return np.array(pd.read_csv(datafile,sep="\t",header=-1)).astype(np.float) def plotBestFit(data1, data2): dataArr1 = np.array(data1) dataArr2 = np.array(data2) m = np.shape(dataArr1)[0] axis_x1 = [] axis_y1 = [] axis_x2 = [] axis_y2 = [] for i in range(m): axis_x1.append(dataArr1[i,0]) axis_y1.append(dataArr1[i,1]) axis_x2.append(dataArr2[i,0]) axis_y2.append(dataArr2[i,1]) fig = plt.figure() ax = fig.add_subplot(111) ax.scatter(axis_x1, axis_y1, s=50, c='red', marker='s') ax.scatter(axis_x2, axis_y2, s=50, c='blue') plt.xlabel('x1'); plt.ylabel('x2'); plt.savefig("outfile.png") plt.show() #簡單測試 #數據來源:http://www.cnblogs.com/jerrylead/archive/2011/04/18/2020209.html def test(): X = [[2.5, 0.5, 2.2, 1.9, 3.1, 2.3, 2, 1, 1.5, 1.1], [2.4, 0.7, 2.9, 2.2, 3.0, 2.7, 1.6, 1.1, 1.6, 0.9]] XMat = np.matrix(X).T k = 2 return pca(XMat, k) #根據數據集data.txt def main(): datafile = "data.txt" XMat = loaddata(datafile) k = 2 return pca(XMat, k) if __name__ == "__main__": finalData, reconMat = main() plotBestFit(finalData, reconMat)

經過主成分降維的數據如紅色圖案所示,藍色的是恢復的原始數據。可以看到經過降維的數據樣本差異更加明顯。

 

Python KNN最近鄰分類算法

KNN最近鄰算法:利用向量之間的距離來分類。
步驟:
第一步:計算新樣本與已知分類樣本之間的距離。
第二步:將所求距離按從小到大排列。
第三步:選取距離最近的k個樣本。
第四步:將新樣本歸為以上k個樣本大多數中的一類。
以下為KNN最近鄰分類算法的python代碼:
第一部分:KNN分類代碼
# -*- coding: utf-8 -*-
"""
Created on Mon Feb 22 13:21:22 2016
K-NearestNeighbor
"""
import numpy as np
import operator

class KNNClassifier():
    """This is a Nearest Neighbor classifier. """

    #定義k的值
    def __init__(self, k=3):
        self._k = k

    #計算新樣本與已知分類樣本的距離並從小到大排列    
    def _calEDistance(self, inSample, dataset):
        m = dataset.shape[0]
        diffMat = np.tile(inSample, (m,1)) - dataset
        sqDiffMat = diffMat**2 #每個元素平方
        sqDistances = sqDiffMat.sum(axis = 1)  #求和
        distances = sqDistances ** 0.5 #開根號
        return distances.argsort()  #按距離的從小到達排列的下標值

    
    def _classify0(self, inX, dataSet, labels):
        k = self._k
        dataSetSize = dataSet.shape[0]                  
        diffMat = np.tile(inX, (dataSetSize,1)) - dataSet      
        sqDiffMat = diffMat**2
        sqDistances = sqDiffMat.sum(axis=1)                  
        distances = sqDistances**0.5
        sortedDistIndicies = distances.argsort()            
        classCount={}                                      
        for i in range(k):
            voteIlabel = labels[sortedDistIndicies[i]]
            classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1
        sortedClassCount = sorted(classCount.iteritems(), key=operator.itemgetter(1), reverse=True)
        return sortedClassCount[0][0]
        
    #對一個樣本進行分類
    def _classify(self, sample, train_X, train_y):
        #數據類型檢測
        if isinstance(sample, np.ndarray) and isinstance(train_X, np.ndarray) \
                and isinstance(train_y, np.ndarray):
            pass
        else:
            try:
                sample = np.array(sample)
                train_X = np.array(train_X)
                train_y = np.array(train_y)
            except:
                raise TypeError("numpy.ndarray required for train_X and ..")
        sortedDistances = self._calEDistance(sample, train_X)
        classCount = {}
        for i in range(self._k):
            oneVote = train_y[sortedDistances[i]] #獲取最近的第i個點的類別
            classCount[oneVote] = classCount.get(oneVote, 0) + 1
        sortedClassCount = sorted(classCount.iteritems(),\
                                    key=operator.itemgetter(1), reverse=True)
        #print "the sample :", sample, "is classified as",sortedClassCount[0][0]    
        return sortedClassCount[0][0]
    
    
    def classify(self, test_X, train_X, train_y):
        results = [] 
        #數據類型檢測
        if isinstance(test_X, np.ndarray) and isinstance(train_X, np.ndarray) \
                and isinstance(train_y, np.ndarray):
            pass
        else:
            try:
                test_X = np.array(test_X)
                train_X = np.array(train_X)
                train_y = np.array(train_y)
            except:
                raise TypeError("numpy.ndarray required for train_X and ..")
        d = len(np.shape(test_X))
        if d == 1:
            sample = test_X
            result = self._classify(sample, train_X, train_y)
            results.append(result)
        else:
            for i in range(len(test_X)):
                sample = test_X[i]
                result = self._classify(sample, train_X, train_y)
                results.append(result)
        return results
        
        
if __name__=="__main__":
    train_X = [[1, 2, 0, 1, 0],
               [0, 1, 1, 0, 1],
               [1, 0, 0, 0, 1],
               [2, 1, 1, 0, 1],
               [1, 1, 0, 1, 1]]
    train_y = [1, 1, 0, 0, 0]
    clf = KNNClassifier(k = 3)
    sample = [[1,2,0,1,0],[1,2,0,1,1]]
    result = clf.classify(sample, train_X, train_y)
View Code

 

第二部分:KNN測試代碼

# -*- coding: utf-8 -*-
"""
Created on Mon Feb 22 13:21:22 2016
K-NearestNeighbor
"""
import numpy as np
import operator

class KNNClassifier():
    """This is a Nearest Neighbor classifier. """

    #定義k的值
    def __init__(self, k=3):
        self._k = k

    #計算新樣本與已知分類樣本的距離並從小到大排列    
    def _calEDistance(self, inSample, dataset):
        m = dataset.shape[0]
        diffMat = np.tile(inSample, (m,1)) - dataset
        sqDiffMat = diffMat**2 #每個元素平方
        sqDistances = sqDiffMat.sum(axis = 1)  #求和
        distances = sqDistances ** 0.5 #開根號
        return distances.argsort()  #按距離的從小到達排列的下標值

    
    def _classify0(self, inX, dataSet, labels):
        k = self._k
        dataSetSize = dataSet.shape[0]                  
        diffMat = np.tile(inX, (dataSetSize,1)) - dataSet      
        sqDiffMat = diffMat**2
        sqDistances = sqDiffMat.sum(axis=1)                  
        distances = sqDistances**0.5
        sortedDistIndicies = distances.argsort()            
        classCount={}                                      
        for i in range(k):
            voteIlabel = labels[sortedDistIndicies[i]]
            classCount[voteIlabel] = classCount.get(voteIlabel,0) + 1
        sortedClassCount = sorted(classCount.iteritems(), key=operator.itemgetter(1), reverse=True)
        return sortedClassCount[0][0]
        
    #對一個樣本進行分類
    def _classify(self, sample, train_X, train_y):
        #數據類型檢測
        if isinstance(sample, np.ndarray) and isinstance(train_X, np.ndarray) \
                and isinstance(train_y, np.ndarray):
            pass
        else:
            try:
                sample = np.array(sample)
                train_X = np.array(train_X)
                train_y = np.array(train_y)
            except:
                raise TypeError("numpy.ndarray required for train_X and ..")
        sortedDistances = self._calEDistance(sample, train_X)
        classCount = {}
        for i in range(self._k):
            oneVote = train_y[sortedDistances[i]] #獲取最近的第i個點的類別
            classCount[oneVote] = classCount.get(oneVote, 0) + 1
        sortedClassCount = sorted(classCount.iteritems(),\
                                    key=operator.itemgetter(1), reverse=True)
        #print "the sample :", sample, "is classified as",sortedClassCount[0][0]    
        return sortedClassCount[0][0]
    
    
    def classify(self, test_X, train_X, train_y):
        results = [] 
        #數據類型檢測
        if isinstance(test_X, np.ndarray) and isinstance(train_X, np.ndarray) \
                and isinstance(train_y, np.ndarray):
            pass
        else:
            try:
                test_X = np.array(test_X)
                train_X = np.array(train_X)
                train_y = np.array(train_y)
            except:
                raise TypeError("numpy.ndarray required for train_X and ..")
        d = len(np.shape(test_X))
        if d == 1:
            sample = test_X
            result = self._classify(sample, train_X, train_y)
            results.append(result)
        else:
            for i in range(len(test_X)):
                sample = test_X[i]
                result = self._classify(sample, train_X, train_y)
                results.append(result)
        return results
        
        
if __name__=="__main__":
    train_X = [[1, 2, 0, 1, 0],
               [0, 1, 1, 0, 1],
               [1, 0, 0, 0, 1],
               [2, 1, 1, 0, 1],
               [1, 1, 0, 1, 1]]
    train_y = [1, 1, 0, 0, 0]
    clf = KNNClassifier(k = 3)
    sample = [[1,2,0,1,0],[1,2,0,1,1]]
    result = clf.classify(sample, train_X, train_y)
View Code

 

Python 決策樹算法(ID3 &C4.5)

決策樹(Decision Tree)算法:按照樣本的屬性逐步進行分類,為了能夠使分類更快、更有效。每一個新分類屬性的選擇依據可以是信息增益IG和信息增益率IGR,前者為最基本的ID3算法,后者為改進后的C4.5算法。

以ID3為例,其訓練過程的編程思路如下:

(1)輸入x、y(x為樣本,y為label),行為樣本,列為樣本特征。

(2)計算信息增益IG,獲取使IG最大的特征。

(3)獲得刪除最佳分類特征后的樣本陣列。

(4)按照最佳分類特征的屬性值將更新后的樣本進行歸類。

屬性值1(x1,y1)    屬性值2(x2,y2)    屬性值(x3,y3)

(5)分別對以上類別重復以上操作直至到達葉節點(遞歸調用)。

葉節點的特征:

(1)所有的標簽值y都一樣。

(2)沒有特征可以繼續划分。

測試過程的編程思路如下:

(1)讀取訓練好的決策樹。

(2)從根節點開始遞歸遍歷整個決策樹直到到達葉節點為止。

以下為具體代碼,訓練后的決策樹結構為遞歸套用的字典,其是由特征值組成的索引加上label組成的。

 


#
-*- coding: utf-8 -*- """ Created on Mon Nov 07 09:06:37 2016 @author: yehx """ # -*- coding: utf-8 -*- """ Created on Sun Feb 21 12:17:10 2016 Decision Tree Source Code @author: liudiwei """ import os import numpy as np class DecitionTree(): """This is a decision tree classifier. """ def __init__(self, criteria='ID3'): self._tree = None if criteria == 'ID3' or criteria == 'C4.5': self._criteria = criteria else: raise Exception("criterion should be ID3 or C4.5") def _calEntropy(slef, y): ''' 功能:_calEntropy用於計算香農熵 e=-sum(pi*log pi) 參數:其中y為數組array 輸出:信息熵entropy ''' n = y.shape[0] labelCounts = {} for label in y: if label not in labelCounts.keys(): labelCounts[label] = 1 else: labelCounts[label] += 1 entropy = 0.0 for key in labelCounts: prob = float(labelCounts[key])/n entropy -= prob * np.log2(prob) return entropy def _splitData(self, X, y, axis, cutoff): """ 參數:X為特征,y為label,axis為某個特征的下標,cutoff是下標為axis特征取值值 輸出:返回數據集中特征下標為axis,特征值等於cutoff的子數據集 先將特征列從樣本矩陣里除去,然后將屬性值為cutoff的數據歸為一類 """ ret = [] featVec = X[:,axis] n = X.shape[1] #特征個數 #除去第axis列特征后的樣本矩陣 X = X[:,[i for i in range(n) if i!=axis]] for i in range(len(featVec)): if featVec[i] == cutoff: ret.append(i) return X[ret, :], y[ret] def _chooseBestSplit(self, X, y): """ID3 & C4.5 參數:X為特征,y為label 功能:根據信息增益或者信息增益率來獲取最好的划分特征 輸出:返回最好划分特征的下標 """ numFeat = X.shape[1] baseEntropy = self._calEntropy(y) bestSplit = 0.0 best_idx = -1 for i in range(numFeat): featlist = X[:,i] #得到第i個特征對應的特征列 uniqueVals = set(featlist) curEntropy = 0.0 splitInfo = 0.0 for value in uniqueVals: sub_x, sub_y = self._splitData(X, y, i, value) prob = len(sub_y)/float(len(y)) #計算某個特征的某個值的概率 curEntropy += prob * self._calEntropy(sub_y) #迭代計算條件熵 splitInfo -= prob * np.log2(prob) #分裂信息,用於計算信息增益率 IG = baseEntropy - curEntropy if self._criteria=="ID3": if IG > bestSplit: bestSplit = IG best_idx = i if self._criteria=="C4.5": if splitInfo == 0.0: pass IGR = IG/splitInfo if IGR > bestSplit: bestSplit = IGR best_idx = i return best_idx def _majorityCnt(self, labellist): """ 參數:labellist是類標簽,序列類型為list 輸出:返回labellist中出現次數最多的label """ labelCount={} for vote in labellist: if vote not in labelCount.keys(): labelCount[vote] = 0 labelCount[vote] += 1 sortedClassCount = sorted(labelCount.iteritems(), key=lambda x:x[1], \ reverse=True) return sortedClassCount[0][0] def _createTree(self, X, y, featureIndex): """ 參數:X為特征,y為label,featureIndex類型是元組,記錄X特征在原始數據中的下標 輸出:根據當前的featureIndex創建一顆完整的樹 """ labelList = list(y) #如果所有的標簽都一樣(葉節點),直接返回標簽 if labelList.count(labelList[0]) == len(labelList): return labelList[0] #如果沒有特征可以繼續划分,那么將所有的label歸為大多數的一類,並返回標簽 if len(featureIndex) == 0: return self._majorityCnt(labelList) #返回最佳分類特征的下標 bestFeatIndex = self._chooseBestSplit(X,y) #返回最佳分類特征的索引 bestFeatAxis = featureIndex[bestFeatIndex] featureIndex = list(featureIndex) #獲得刪除最佳分類特征索引后的列表 featureIndex.remove(bestFeatAxis) featureIndex = tuple(featureIndex) myTree = {bestFeatAxis:{}} featValues = X[:, bestFeatIndex] uniqueVals = set(featValues) for value in uniqueVals: #對每個value遞歸地創建樹 sub_X, sub_y = self._splitData(X,y, bestFeatIndex, value) myTree[bestFeatAxis][value] = self._createTree(sub_X, sub_y, \ featureIndex) return myTree def fit(self, X, y): """ 參數:X是特征,y是類標簽 注意事項:對數據X和y進行類型檢測,保證其為array 輸出:self本身 """ if isinstance(X, np.ndarray) and isinstance(y, np.ndarray): pass else: try: X = np.array(X) y = np.array(y) except: raise TypeError("numpy.ndarray required for X,y") featureIndex = tuple(['x'+str(i) for i in range(X.shape[1])]) self._tree = self._createTree(X,y,featureIndex) return self #allow using: clf.fit().predict() def _classify(self, tree, sample): """ 用訓練好的模型對輸入數據進行分類 注意:決策樹的構建是一個遞歸的過程,用決策樹分類也是一個遞歸的過程 _classify()一次只能對一個樣本(sample)分類 """ featIndex = tree.keys()[0] #得到數的根節點值 secondDict = tree[featIndex] #得到以featIndex為划分特征的結果 axis=featIndex[1:] #得到根節點特征在原始數據中的下標 key = sample[int(axis)] #獲取待分類樣本中下標為axis的值 valueOfKey = secondDict[key] #獲取secondDict中keys為key的value值 if type(valueOfKey).__name__=='dict': #如果value為dict,則繼續遞歸分類 return self._classify(valueOfKey, sample) else: return valueOfKey def predict(self, X): if self._tree==None: raise NotImplementedError("Estimator not fitted, call `fit` first") #對X的類型進行檢測,判斷其是否是數組 if isinstance(X, np.ndarray): pass else: try: X = np.array(X) except: raise TypeError("numpy.ndarray required for X") if len(X.shape) == 1: return self._classify(self._tree, X) else: result = [] for i in range(X.shape[0]): value = self._classify(self._tree, X[i]) print str(i+1)+"-th sample is classfied as:", value result.append(value) return np.array(result) def show(self, outpdf): if self._tree==None: pass #plot the tree using matplotlib import treePlotter treePlotter.createPlot(self._tree, outpdf) if __name__=="__main__": trainfile=r"data\train.txt" testfile=r"data\test.txt" import sys sys.path.append(r"F:\CSU\Github\MachineLearning\lib") import dataload as dload train_x, train_y = dload.loadData(trainfile) test_x, test_y = dload.loadData(testfile) clf = DecitionTree(criteria="C4.5") clf.fit(train_x, train_y) result = clf.predict(test_x) outpdf = r"tree.pdf" clf.show(outpdf)

 

 

 

Python K均值聚類

 

Python K均值聚類是一種無監督的機器學習算法,能夠實現自動歸類的功能。

算法步驟如下:

(1)隨機產生K個分類中心,一般稱為質心。

(2)將所有樣本划分到距離最近的質心代表的分類中。(距離可以是歐氏距離、曼哈頓距離、夾角余弦等)

(3)計算分類后的質心,可以用同一類中所有樣本的平均屬性來代表新的質心。

(4)重復(2)(3)兩步,直到滿足以下其中一個條件:

    1)分類結果沒有發生改變。

    2)最小誤差(如平方誤差)達到所要求的范圍。

    3)迭代總數達到設置的最大值。

常見的K均值聚類算法還有2分K均值聚類算法,其步驟如下:

(1)將所有樣本作為一類。

(2)按照傳統K均值聚類的方法將樣本分為兩類。

(3)對以上兩類分別再分為兩類,且分別計算兩種情況下誤差,僅保留誤差更小的分類;即第(2)步產生的兩類其中一類保留,另一類進行再次分類。

(4)重復對已有類別分別進行二分類,同理保留誤差最小的分類,直到達到所需要的分類數目。

具體Python代碼如下:

 


#
-*- coding: utf-8 -*- """ Created on Tue Nov 08 14:01:44 2016 K - means cluster """ import numpy as np class KMeansClassifier(): "this is a k-means classifier" def __init__(self, k=3, initCent='random', max_iter=500 ): self._k = k self._initCent = initCent self._max_iter = max_iter self._clusterAssment = None self._labels = None self._sse = None def _calEDist(self, arrA, arrB): """ 功能:歐拉距離距離計算 輸入:兩個一維數組 """ return np.math.sqrt(sum(np.power(arrA-arrB, 2))) def _calMDist(self, arrA, arrB): """ 功能:曼哈頓距離距離計算 輸入:兩個一維數組 """ return sum(np.abs(arrA-arrB)) def _randCent(self, data_X, k): """ 功能:隨機選取k個質心 輸出:centroids #返回一個m*n的質心矩陣 """ n = data_X.shape[1] #獲取特征的維數 centroids = np.empty((k,n)) #使用numpy生成一個k*n的矩陣,用於存儲質心 for j in range(n): minJ = min(data_X[:, j]) rangeJ = float(max(data_X[:, j] - minJ)) #使用flatten拉平嵌套列表(nested list) centroids[:, j] = (minJ + rangeJ * np.random.rand(k, 1)).flatten() return centroids def fit(self, data_X): """ 輸入:一個m*n維的矩陣 """ if not isinstance(data_X, np.ndarray) or \ isinstance(data_X, np.matrixlib.defmatrix.matrix): try: data_X = np.asarray(data_X) except: raise TypeError("numpy.ndarray resuired for data_X") m = data_X.shape[0] #獲取樣本的個數 #一個m*2的二維矩陣,矩陣第一列存儲樣本點所屬的族的索引值, #第二列存儲該點與所屬族的質心的平方誤差 self._clusterAssment = np.zeros((m,2)) if self._initCent == 'random': self._centroids = self._randCent(data_X, self._k) clusterChanged = True for _ in range(self._max_iter): #使用"_"主要是因為后面沒有用到這個值 clusterChanged = False for i in range(m): #將每個樣本點分配到離它最近的質心所屬的族 minDist = np.inf #首先將minDist置為一個無窮大的數 minIndex = -1 #將最近質心的下標置為-1 for j in range(self._k): #次迭代用於尋找最近的質心 arrA = self._centroids[j,:] arrB = data_X[i,:] distJI = self._calEDist(arrA, arrB) #計算誤差值 if distJI <</span> minDist: minDist = distJI minIndex = j if self._clusterAssment[i,0] !=minIndex: clusterChanged = True self._clusterAssment[i,:] = minIndex, minDist**2 if not clusterChanged:#若所有樣本點所屬的族都不改變,則已收斂,結束迭代 break for i in range(self._k):#更新質心,將每個族中的點的均值作為質心 index_all = self._clusterAssment[:,0] #取出樣本所屬簇的索引值 value = np.nonzero(index_all==i) #取出所有屬於第i個簇的索引值 ptsInClust = data_X[value[0]] #取出屬於第i個簇的所有樣本點 self._centroids[i,:] = np.mean(ptsInClust, axis=0) #計算均值 self._labels = self._clusterAssment[:,0] self._sse = sum(self._clusterAssment[:,1]) def predict(self, X):#根據聚類結果,預測新輸入數據所屬的族 #類型檢查 if not isinstance(X,np.ndarray): try: X = np.asarray(X) except: raise TypeError("numpy.ndarray required for X") m = X.shape[0]#m代表樣本數量 preds = np.empty((m,)) for i in range(m):#將每個樣本點分配到離它最近的質心所屬的族 minDist = np.inf for j in range(self._k): distJI = self._calEDist(self._centroids[j,:], X[i,:]) if distJI <</span> minDist: minDist = distJI preds[i] = j return preds class biKMeansClassifier(): "this is a binary k-means classifier" def __init__(self, k=3): self._k = k self._centroids = None self._clusterAssment = None self._labels = None self._sse = None def _calEDist(self, arrA, arrB): """ 功能:歐拉距離距離計算 輸入:兩個一維數組 """ return np.math.sqrt(sum(np.power(arrA-arrB, 2))) def fit(self, X): m = X.shape[0] self._clusterAssment = np.zeros((m,2)) centroid0 = np.mean(X, axis=0).tolist() centList =[centroid0] for j in range(m):#計算每個樣本點與質心之間初始的平方誤差 self._clusterAssment[j,1] = self._calEDist(np.asarray(centroid0), \ X[j,:])**2 while (len(centList) <</span> self._k): lowestSSE = np.inf #嘗試划分每一族,選取使得誤差最小的那個族進行划分 for i in range(len(centList)): index_all = self._clusterAssment[:,0] #取出樣本所屬簇的索引值 value = np.nonzero(index_all==i) #取出所有屬於第i個簇的索引值 ptsInCurrCluster = X[value[0],:] #取出屬於第i個簇的所有樣本點 clf = KMeansClassifier(k=2) clf.fit(ptsInCurrCluster) #划分該族后,所得到的質心、分配結果及誤差矩陣 centroidMat, splitClustAss = clf._centroids, clf._clusterAssment sseSplit = sum(splitClustAss[:,1]) index_all = self._clusterAssment[:,0] value = np.nonzero(index_all==i) sseNotSplit = sum(self._clusterAssment[value[0],1]) if (sseSplit + sseNotSplit) <</span> lowestSSE: bestCentToSplit = i bestNewCents = centroidMat bestClustAss = splitClustAss.copy() lowestSSE = sseSplit + sseNotSplit #該族被划分成兩個子族后,其中一個子族的索引變為原族的索引 #另一個子族的索引變為len(centList),然后存入centList bestClustAss[np.nonzero(bestClustAss[:,0]==1)[0],0]=len(centList) bestClustAss[np.nonzero(bestClustAss[:,0]==0)[0],0]=bestCentToSplit centList[bestCentToSplit] = bestNewCents[0,:].tolist() centList.append(bestNewCents[1,:].tolist()) self._clusterAssment[np.nonzero(self._clusterAssment[:,0] == \ bestCentToSplit)[0],:]= bestClustAss self._labels = self._clusterAssment[:,0] self._sse = sum(self._clusterAssment[:,1]) self._centroids = np.asarray(centList) def predict(self, X):#根據聚類結果,預測新輸入數據所屬的族 #類型檢查 if not isinstance(X,np.ndarray): try: X = np.asarray(X) except: raise TypeError("numpy.ndarray required for X") m = X.shape[0]#m代表樣本數量 preds = np.empty((m,)) for i in range(m):#將每個樣本點分配到離它最近的質心所屬的族 minDist = np.inf for j in range(self._k): distJI = self._calEDist(self._centroids[j,:],X[i,:]) if distJI <</span> minDist: minDist = distJI preds[i] = j return preds

 

 

 

Python股票歷史漲跌幅數據獲取

股票漲跌幅數據是量化投資學習的基本數據資料之一,下面以Python代碼編程為工具,獲得所需要的歷史數據。主要步驟有:

(1) #按照市值從小到大的順序活得N支股票的代碼;

(2) #分別對這一百只股票進行100支股票操作;

(3) #獲取從2016.05.01到2016.11.17的漲跌幅數據;

(4) #選取記錄大於40個的數據,去除次新股;

(5) #將文件名名為“股票代碼.csv”。

具體代碼如下:

# -*- coding: utf-8 -*-
"""
Created on Thu Nov 17 23:04:33 2016
獲取股票的歷史漲跌幅,並分別存為csv格式
@author: yehx
"""

import numpy as np
import pandas as pd

#按照市值從小到大的順序活得100支股票的代碼
df = get_fundamentals(
        query(fundamentals.eod_derivative_indicator.market_cap)
        .order_by(fundamentals.eod_derivative_indicator.market_cap.asc())
        .limit(100),'2016-11-17', '1y'
    )

#分別對這一百只股票進行100支股票操作
#獲取從2016.05.01到2016.11.17的漲跌幅數據
#選取記錄大於40個的數據,去除次新股
#將文件名名為“股票代碼.csv”
for stock in range(100):
    priceChangeRate = get_price_change_rate(df['market_cap'].columns[stock], '20160501', '20161117')
    if priceChangeRate is None:
        openDays = 0
    else:
        openDays = len(priceChangeRate)
    if openDays > 40:
        tempPrice = priceChangeRate[39:(openDays - 1)]
        for rate in range(len(tempPrice)):
            tempPrice[rate] = "%.3f" %tempPrice[rate]
    fileName = ''
    fileName = fileName.join(df['market_cap'].columns[i].split('.')) + '.csv'
    fileName
    tempPrice.to_csv(fileName)

 

 

Python Logistic 回歸分類

 Logistic回歸可以認為是線性回歸的延伸,其作用是對二分類樣本進行訓練,從而對達到預測新樣本分類的目的。
假設有一組已知分類的MxN維樣本X,M為樣本數,N為特征維度,其相應的已知分類標簽為Mx1維矩陣Y。那么Logistic回歸的實現思路如下:
(1)用一組權重值W(Nx1)對X的特征進行線性變換,得到變換后的樣本X’(Mx1),其目標是使屬於不同分類的樣本X’存在一個明顯的一維邊界。
(2)然后再對樣本X’進一步做函數變換,從而使處於一維邊界兩測的值變換到相應的范圍之內。
(3)訓練過程就是通過改變W盡可能使得到的值位於一維邊界兩側,並且與已知分類相符。
(4)對於Logistic回歸,就是將原樣本的邊界變換到x=0這個邊界。
下面是Logistic回歸的典型代碼:

# -*- coding: utf-8 -*-
"""
Created on Wed Nov 09 15:21:48 2016
Logistic回歸分類
"""
import numpy as np


class LogisticRegressionClassifier():
    
    def __init__(self):
        self._alpha = None
    

    #定義一個sigmoid函數
    def _sigmoid(self, fx):
        return 1.0/(1 + np.exp(-fx))

    #alpha為步長(學習率);maxCycles最大迭代次數
    def _gradDescent(self, featData, labelData, alpha, maxCycles):
        dataMat = np.mat(featData)                      #size: m*n
        labelMat = np.mat(labelData).transpose()        #size: m*1
        m, n = np.shape(dataMat)
        weigh = np.ones((n, 1)) 
        for i in range(maxCycles):
            hx = self._sigmoid(dataMat * weigh)
            error = labelMat - hx       #size:m*1
            weigh = weigh + alpha * dataMat.transpose() * error#根據誤差修改回歸系數
        return weigh

    #使用梯度下降方法訓練模型,如果使用其它的尋參方法,此處可以做相應修改
    def fit(self, train_x, train_y, alpha=0.01, maxCycles=100):
        return self._gradDescent(train_x, train_y, alpha, maxCycles)

    #使用學習得到的參數進行分類
    def predict(self, test_X, test_y, weigh):
        dataMat = np.mat(test_X)
        labelMat = np.mat(test_y).transpose()  #使用transpose()轉置
        hx = self._sigmoid(dataMat*weigh)  #size:m*1
        m = len(hx)
        error = 0.0
        for i in range(m):
            if int(hx[i]) > 0.5:
                print str(i+1)+'-th sample ', int(labelMat[i]), 'is classfied as: 1' 
                if int(labelMat[i]) != 1:
                    error += 1.0
                    print "classify error."
            else:
                print str(i+1)+'-th sample ', int(labelMat[i]), 'is classfied as: 0' 
                if int(labelMat[i]) != 0:
                    error += 1.0
                    print "classify error."
        error_rate = error/m
        print "error rate is:", "%.4f" %error_rate
        return error_rate
View Code

 

 

Python 朴素貝葉斯(Naive Bayes)分類

Naïve Bayes 分類的核心是計算條件概率P(y|x),其中y為類別,x為特征向量。其意義是在x樣本出現時,它被划分為y類的可能性(概率)。通過計算不同分類下的概率,進而把樣本划分到概率最大的一類。

根據條件概率的計算公式可以得到:

P(y|x) = P(y)*P(x|y)/P(x)。

      由於在計算不同分類概率是等式右邊的分母是相同的,所以只需比較分子的大小。並且,如果各個樣本特征是獨立分布的,那么p(x

|y)等於p(xi|y)相乘。

      下面以文本分類來介紹Naïve Bayes分類的應用。其思路如下:

(1)建立詞庫,即無重復的單詞表。

(2)分別計算詞庫中類別標簽出現的概率P(y)。

(3)分別計算各個類別標簽下不同單詞出現的概率P(xi|y)。

(4)在不同類別下,將待分類樣本各個特征出現概率((xi|y)相乘,然后在乘以對應的P(y)。

(5)比較不同類別下(4)中結果,將待分類樣本分到取值最大的類別。

下面是Naïve Bayes 文本分類的Python代碼,其中為了方便計算,程序中借助log對數函數將乘法轉化為了加法。

# -*- coding: utf-8 -*-
"""
Created on Mon Nov 14 11:15:47 2016
Naive Bayes Clssification
"""
# -*- coding: utf-8 -*-
import numpy as np

class NaiveBayes:
    def __init__(self):
        self._creteria = "NB"
        
    def _createVocabList(self, dataList):
        """
        創建一個詞庫向量
        """
        vocabSet = set([])
        for line in dataList:
            print set(line)
            vocabSet = vocabSet | set(line)
        return list(vocabSet)
        
    #文檔詞集模型
    def _setOfWords2Vec(self, vocabList, inputSet):
        """
        功能:根據給定的一行詞,將每個詞映射到此庫向量中,出現則標記為1,不出現則為0
        """
        outputVec = [0] * len(vocabList)
        for word in inputSet:
            if word in vocabList:
                outputVec[vocabList.index(word)] = 1
            else:
                print "the word:%s is not in my vocabulary!" % word
        return outputVec
    
        
    # 修改 _setOfWordsVec  文檔詞袋模型
    def _bagOfWords2VecMN(self, vocabList, inputSet):
        """
        功能:對每行詞使用第二種統計策略,統計單個詞的個數,然后映射到此庫中
        輸出:一個n維向量,n為詞庫的長度,每個取值為單詞出現的次數
        """
        returnVec = [0]*len(vocabList)
        for word in inputSet:
            if word in vocabList:
                returnVec[vocabList.index(word)] += 1 # 更新此處代碼
        return returnVec
    
    
    def _trainNB(self, trainMatrix, trainLabel):
        """
        輸入:訓練矩陣和類別標簽,格式為numpy矩陣格式
        功能:計算條件概率和類標簽概率
        """
        numTrainDocs = len(trainMatrix) #統計樣本個數
        numWords = len(trainMatrix[0])  #統計特征個數,理論上是詞庫的長度
        pNeg = sum(trainLabel)/float(numTrainDocs) #計算負樣本出現的概率
        
        p0Num = np.ones(numWords) #初始樣本個數為1,防止條件概率為0,影響結果     
        p1Num = np.ones(numWords) #作用同上
       
        p0InAll = 2.0 #詞庫中只有兩類,所以此處初始化為2(use laplace)
        p1InAll = 2.0 
        
        # 再單個文檔和整個詞庫中更新正負樣本數據
        for i in range(numTrainDocs):
            if trainLabel[i] == 1:
                p1Num += trainMatrix[i]
                p1InAll += sum(trainMatrix[i])
            else:
                p0Num += trainMatrix[i]
                p0InAll += sum(trainMatrix[i])
        
        print p1InAll
        #計算給定類別的條件下,詞匯表中單詞出現的概率
        #然后取log對數,解決條件概率乘積下溢
        p0Vect = np.log(p0Num/p0InAll) #計算類標簽為0時的其它屬性發生的條件概率
        p1Vect = np.log(p1Num/p1InAll)  #log函數默認以e為底  #p(ci|w=0)
        return p0Vect, p1Vect, pNeg
        
    def _classifyNB(self, vecSample, p0Vec, p1Vec, pNeg):
        """
        使用朴素貝葉斯進行分類,返回結果為0/1
        """
        prob_y0 = sum(vecSample * p0Vec) + np.log(1-pNeg)
        prob_y1 = sum(vecSample * p1Vec) + np.log(pNeg) #log是以e為底
        if prob_y0 <</span> prob_y1:
            return 1
        else:
            return 0

    
    # 測試NB算法
    def testingNB(self, testSample):
        listOPosts, listClasses = loadDataSet()
        myVocabList = self._createVocabList(listOPosts)
#        print myVocabList
        trainMat=[]
        for postinDoc in listOPosts:
            trainMat.append(self._bagOfWords2VecMN(myVocabList, postinDoc))
        p0V,p1V,pAb = self._trainNB(np.array(trainMat), np.array(listClasses))
        print trainMat
        thisSample = np.array(self._bagOfWords2VecMN(myVocabList, testSample))
        result = self._classifyNB(thisSample, p0V, p1V, pAb)
        print testSample,'classified as: ', result
        return result


###############################################################################
def loadDataSet():
        wordsList=[['my', 'dog', 'has', 'flea', 'problems', 'help', 'please'],
                   ['maybe', 'not', 'take', 'him', 'to', 'dog', 'park', 'stupid'],
                   ['my', 'dalmation', 'is', 'so', 'cute', ' and', 'I', 'love', 'him'],
                   ['stop', 'posting', 'stupid', 'worthless', 'garbage'],
                   ['mr', 'licks','ate','my', 'steak', 'how', 'to', 'stop', 'him'],
                   ['quit', 'buying', 'worthless', 'dog', 'food', 'stupid']]
        classLable = [0,1,0,1,0,1] # 0:good; 1:bad
        return wordsList, classLable
        
if __name__=="__main__":    
    clf = NaiveBayes()
    testEntry = [['love', 'my', 'girl', 'friend'],
                 ['stupid', 'garbage'],
                 ['Haha', 'I', 'really', "Love", "You"],
                 ['This', 'is', "my", "dog"]]
    clf.testingNB(testEntry[0])
#    for item in testEntry:
#        clf.testingNB(item)
View Code

 

Python股票歷史數據預處理(一)

 

在進行量化投資交易編程時,我們需要股票歷史數據作為分析依據,下面介紹如何通過Python獲取股票歷史數據並且將結果存為DataFrame格式。處理后的股票歷史數據下載鏈接為:http://download.csdn.net/detail/suiyingy/9688505。

具體步驟如下:

  • (1) 建立股票池,這里按照股本大小來作為選擇依據。
  • (2) 分別讀取股票池中所有股票的歷史漲跌幅。
  • (3) 將各支股票的歷史漲跌幅存到DataFrame結構變量中,每一列代表一支股票,對於在指定時間內還沒有發行的股票的漲跌幅設置為0。
  • (4) 將DataFrame最后一行的數值設置為各支股票對應的交易天數。
  • (5) 將DataFrame數據存到csv文件中去。

具體代碼如下:

# -*- coding: utf-8 -*-
"""
Created on Thu Nov 17 23:04:33 2016
獲取股票的歷史漲跌幅,先合並為DataFrame后存為csv格式
@author: yehx
"""

import numpy as np
import pandas as pd

#按照市值從小到大的順序獲得50支股票的代碼
df = get_fundamentals(
        query(fundamentals.eod_derivative_indicator.market_cap)
        .order_by(fundamentals.eod_derivative_indicator.market_cap.asc())
        .limit(50),'2016-11-17', '1y'
    )
b1= {}
priceChangeRate_300 = get_price_change_rate('000300.XSHG', '20060101', '20161118')
df300 = pd.DataFrame(priceChangeRate_300)
lenReference =  len(priceChangeRate_300)
dfout = df300

dflen = pd.DataFrame()
dflen['000300.XSHG'] = [lenReference]
#分別對這一百只股票進行50支股票操作
#獲取從2006.01.01到2016.11.17的漲跌幅數據
#將數據存到DataFrame中
#DataFrame存為csv文件
for stock in range(50):
    priceChangeRate = get_price_change_rate(df['market_cap'].columns[stock], '20150101', '20161118')
    if priceChangeRate is None:
        openDays = 0
    else:
        openDays = len(priceChangeRate)
        dftempPrice = pd.DataFrame(priceChangeRate)
        tempArr = []
        for i in range(lenReference):          
            if df300.index[i] in list(dftempPrice.index):
                #保存為4位有效數字
                tempArr.append( "%.4f" %((dftempPrice.loc[str(df300.index[i])][0])))
                pass
            else:
                tempArr.append(float(0.0))
        fileName = ''
        fileName = fileName.join(df['market_cap'].columns[stock].split('.')) 
        dfout[fileName] = tempArr
        dflen[fileName] = [len(priceChangeRate)]

dfout = dfout.append(dflen)
dfout.to_csv('00050.csv')

 

 

Python股票歷史數據預處理(二)

 

從網上下載的股票歷史數據往往不能直接使用,需要轉換為自己所需要的格式。下面以Python代碼編程為工具,將csv文件中存儲的股票歷史數據提取出來並處理。處理的數據結果為是30天漲跌幅子數據庫,下載地址為:http://download.csdn.net/detail/suiyingy/9688605。

主要步驟有(Python csv數據讀寫):

  • #csv文件讀取股票歷史漲跌幅數據;
  • #隨機選取30個歷史漲跌幅數據;
  • #構建自己的數據庫;
  • #將處理結果保存為新的csv文件。

具體代碼如下:

# -*- coding: utf-8 -*-
"""
Created on Thu Nov 17 23:04:33 2016
csv格式股票歷史漲跌幅數據處理
@author: yehx
"""

import numpy as np
import pandas as pd
import random
import csv
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

'''
    - 加載csv格式數據
'''
def loadCSVfile1(datafile):
    filelist = []
    with open(datafile) as file:
        lines = csv.reader(file)
        for oneline in lines:
            filelist.append(oneline)
    filelist = np.array(filelist)
    return filelist

#數據處理
#隨機選取30個歷史漲跌幅數據
#構建自己的數據庫
    
def dataProcess(dataArr, subLen):
    totLen, totWid = np.shape(data)
    print totLen, totWid
    lenArr = dataArr[totLen-1,2:totWid]
    columnCnt = 1
    dataOut = []
    for lenData in lenArr:
        columnCnt = columnCnt + 1
        N60 = int(lenData) / (2 * subLen)
        print N60
        if N60 > 0:
            randIndex = random.sample(range(totLen-int(lenData)-1,totLen-subLen), N60)
            for i in randIndex:
                dataOut.append(dataArr[i:(i+subLen),columnCnt])
    dataOut = np.array(dataOut)
    
    return dataOut




if __name__=="__main__":
    datafile = "00100 (3).csv"
    data = loadCSVfile1(datafile) 
    df = pd.DataFrame(data)
    m, n = np.shape(data)
    dataOut = dataProcess(data, 30)
    m, n = np.shape(dataOut)
    #保存處理結果
    csvfile = file('csvtest.csv', 'wb')
    writer = csv.writer(csvfile)
    writer.writerows(dataOut)
    csvfile.close()

http://blog.sina.com.cn/s/articlelist_6017673753_0_1.html

https://www.cnblogs.com/ttrrpp/

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM