1 # -*- coding: utf-8 -*- 2 """ 3 Created on Sat Jan 20 13:47:54 2018 4 5 @author: markli 6 """ 7 import numpy as np; 8 import random; 9 10 def tanh(x): 11 return np.tanh(x); 12 13 def tanh_derivative(x): 14 return 1.0 - np.tanh(x)*np.tanh(x); 15 16 def logistic(x): 17 return 1/(1 + np.exp(-x)); 18 19 def logistic_derivative(x): 20 return logistic(x)*(1-logistic(x)); 21 22 def ReLU(x,a=1): 23 return max(0,a * x); 24 25 def ReLU_derivative(x,a=1): 26 return 0 if x < 0 else a; 27 28 29 class NeuralNetwork: 30 ''' 31 Z = W * x + b 32 A = sigmod(Z) 33 Z 净输入 34 x 样本集合 m * n n 个特征 m 个样本数量 35 b 偏移量 36 W 权重 37 A 净输出 38 ''' 39 def __init__(self,layers,active_function=[logistic],active_function_der=[logistic_derivative],learn_rate=0.9): 40 self.weights = [2*np.random.randn(x,y)-1 for x,y in zip(layers[1:],layers[:-1])]; #weight 取值范围(-1,1) 41 self.B = [2*np.random.randn(x,1)-1 for x in layers[1:]]; #b 取值范围(-1,1) 42 self.learnRate = learn_rate; 43 self.size = len(layers); 44 self.sigmoids = []; 45 self.sigmoids_der = []; 46 for i in range(len(layers)-1): 47 if(len(active_function) == self.size-1): 48 self.sigmoids = active_function; 49 else: 50 self.sigmoids.append(active_function[0]); 51 if(len(active_function_der)== self.size-1): 52 self.sigmoids_der = active_function_der; 53 else: 54 self.sigmoids_der.append(active_function_der[0]); 55 56 57 '''后向传播算法''' 58 def BackPropgation(self,X,Y): 59 """ 60 X size*n 维,size大小为Mini_Batch_size 值大小,n 个特征 61 Y size*l 维,size大小为Mini_Batch_sieze 值大小,l 个类标签 62 一次计算size个样本带来的w,b的变化量 63 """ 64 deltb = [np.zeros(b.shape) for b in self.B]; 65 deltw = [np.zeros(w.shape) for w in self.weights]; 66 67 active = np.transpose(X); 68 actives = [active]; 69 zs = []; 70 i=0; 71 #前向传播 72 for w,b in zip(self.weights,self.B): 73 z = np.dot(w,active) + b; 74 zs.append(z); 75 active = self.sigmoids[i](z); 76 actives.append(active); 77 i = i+1; 78 79 Y = np.transpose(Y); #转置 80 cost = self.cost(actives[-1], Y) #成本函数 计算对a的一阶导数 81 z = zs[-1]; 82 delta = np.multiply(cost,self.sigmoids_der[-1](z)); #计算输出层(最后一层)的变化量 83 deltb[-1] = np.sum(delta,axis=1,keepdims=True); #计算输出层(最后一层)b的size次累计变化量 l*1 维 84 deltw[-1] = np.dot(delta, np.transpose(actives[-2]));#计算输出层(最后一层)w的size次累计变化量 x*l 维 85 for i in range(2,self.size): 86 z = zs[-i]; #当前层的z值 87 sp = self.sigmoids_der[-i](z); #对z的偏导数值 88 delta = np.multiply(np.dot(np.transpose(self.weights[-i+1]), delta), sp); #求出当前层的误差 89 #deltb = delta; 90 deltb[-i] = np.sum(delta,axis=1,keepdims=True); #当前层b的size次累计变化量 l*1 维 91 deltw[-i] = np.dot(delta, np.transpose(actives[-i-1])); # 当前层w的size次累计变化量 x*l 92 93 return deltw,deltb; 94 95 def fit(self,X,Y,mini_batch_size,epochs=1000): 96 97 N = len(Y); 98 for i in range(epochs): 99 randomlist = np.random.randint(0,N-mini_batch_size,int(N/mini_batch_size)); 100 batch_X = [X[k:k+mini_batch_size] for k in randomlist]; 101 batch_Y = [Y[k:k+mini_batch_size] for k in randomlist]; 102 for m in range(len(batch_Y)): 103 deltw,deltb = self.BackPropgation(batch_X[m],batch_Y[m]); 104 self.weights = [w - (self.learnRate / mini_batch_size) * dw for w,dw in zip(self.weights,deltw)]; 105 self.B = [b - (self.learnRate / mini_batch_size) * db for b,db in zip(self.B,deltb)]; 106 # path = sys.path[0]; 107 # with open(path,'w',encoding='utf8') as f: 108 # for j in range(len(self.weights)-1): 109 # f.write(self.weights[j+1]); 110 # f.write(self.activeFunction[j+1]); 111 # f.write(self.activeFunctionDer[j+1]); 112 # f.close(); 113 114 115 116 117 def predict(self,x): 118 """前向传播""" 119 i = 0; 120 for b, w in zip(self.B, self.weights): 121 x = self.sigmoids[i](np.dot(w, x)+b); 122 i = i + 1; 123 return x 124 125 def cost(self,a,y): 126 """ 127 损失函数对z的偏导数的除输出层对z的导数的因子部分 128 完整表达式 为 (a - y)* sigmod_derivative(z) 129 由于此处不知道输出层的激活函数故不写出来,在具体调用位置加上 130 """ 131 return a-y; 132 133
该算法按照吴恩达先生讲述的BP神经网络算法编写,实现了一次进行Mini_Batch_size 次的训练。下面给出测试代码和测试结果。
1 import numpy as np 2 from sklearn.datasets import load_digits 3 from sklearn.metrics import confusion_matrix, classification_report 4 from sklearn.preprocessing import LabelBinarizer 5 from FullNeuralNetwork import NeuralNetwork 6 from sklearn.cross_validation import train_test_split 7 8 9 10 digits = load_digits(); 11 X = digits.data; 12 y = digits.target; 13 X -= X.min(); # normalize the values to bring them into the range 0-1 14 X /= X.max(); 15 16 nn = NeuralNetwork([64,100,10]); 17 X_train, X_test, y_train, y_test = train_test_split(X, y); 18 labels_train = LabelBinarizer().fit_transform(y_train); 19 labels_test = LabelBinarizer().fit_transform(y_test); 20 21 22 # X_train.shape (1347,64) 23 #y_train.shape(1347) 24 #labels_train.shape (1347,10) 25 #labels_test.shape(450,10) 26 27 print ("start fitting"); 28 29 #print(Data); 30 nn.fit(X_train,labels_train,epochs=500,mini_batch_size=8); 31 result = nn.predict(X_test.T); 32 predictions = [np.argmax(result[:,y]) for y in range(result.shape[1])]; 33 34 print(predictions); 35 #for i in range(result.shape[1]): 36 # y = result[:,i]; 37 # predictions.append(np.argmax(y)); 38 ##print(np.atleast_2d(predictions).shape); 39 print (confusion_matrix(y_test,predictions)); 40 print (classification_report(y_test,predictions)); 41
测试结果:
总体效果还可以,需要调一调其中的参数。之前发布的代码我后来仔细看了一下,发现算法有误,现在改正过来了。基本没什么错误了,哈哈哈。