封裝TensorFlow神經網絡


為了參加今年的軟件杯設計大賽,這幾個月學習了很多新知識。現在大賽的第二輪作品優化已經提交,開始對這四個月所學知識做一些總結與記錄。

用TensorFlow搭建神經網絡。TensorFlow將神經網絡的進行封裝,使得深度學習變得簡單已用,即使是不懂的深度學習算法原理的人都可以很容易的搭建各種神經網絡的模型。我為了搭建神經網絡更加方便,對TensorFlow做了自己的封裝。

神經網絡的封裝代碼:

class NN:
    '''
        shape:訓練集數據的維度,shape[0]表示輸入數據維度,shape[1]表標簽數據維度
    '''
    def __init__(self,shape):
        with tf.name_scope("inputs"):
            self._xs = tf.placeholder(tf.float32,[None,shape[0]],name="features")
            self._ys = tf.placeholder(tf.float32,[None,shape[1]],name="labels")
            self._keep_prob = tf.placeholder(tf.float32,name="keep_prob")
        # 每一層的輸出
        self._layers_out_lst = [self._xs]
        # 損失值
        self._loss = None
        # 預測值
        self._prediction = None
        # 運行session
        self._sess = None
        # 訓練步驟
        self._train_step = None
        self._saver = None
        pass
    
    '''
        添加一個神經層
    '''
    def add_layer(self,in_size,out_size,activation_function=None,name=None):
        with tf.name_scope(name):
            with tf.name_scope("{}_weight".format(name)):
                # 權值矩陣
                weights = tf.Variable(tf.random_normal([in_size,out_size],dtype=tf.float32),name="weight")
            with tf.name_scope("{}_biases".format(name)):
                # 偏置量
                biases = tf.Variable(tf.zeros([1,out_size],dtype=tf.float32)+0.1)
            with tf.name_scope("{}_w_plus_b".format(name)):
                # 計算輸出
                wx_plus_b = tf.matmul(self._layers_out_lst[-1],weights) + biases
            with tf.name_scope("{}_output".format(name)):
                if activation_function is None:
                    outputs = wx_plus_b
                else:
                    outputs = activation_function(wx_plus_b)
                tf.summary.histogram("{}_output".format(name),outputs)
        self._prediction =outputs
        self._layers_out_lst.append(outputs)
        self._timer = None
        pass
    
    def add_loss_layer(self,cost_function,name=None):
        # self._loss = tf.reduce_mean()
        with tf.name_scope("loss"):
            self._loss = cost_function(
                onehot_labels=self._ys,logits=self._layers_out_lst[-1])
            tf.summary.scalar("loss",self._loss)
    
    '''
        添加優化器,optimizer是TensorFlow框架提供的優化器,lr是學習率
    '''
    def add_optimizer(self,optimizer,lr=0.01):
        with tf.name_scope("train_step"):
            self._train_step = optimizer(lr).minimize(self._loss)
    
    def add_dropout(self,name=None):
        with tf.name_scope(name):
            x = self._layers_out_lst[-1]
            x = tf.nn.dropout(x,keep_prob=self._keep_prob,name=name)
            self._layers_out_lst.append(x)

    '''
        inputs 是輸入的訓練數據
        labels 是訓練數據的標簽
        echop 表示訓練次數
    '''
    def fit(self,inputs,labels,echop=100,keep_prob=0.5, savepath=None,logdir='logs',step=10):
        # 如果savepath不為none,要保存訓練的模型
        if savepath is not None:
            self._saver = tf.train.Saver()
        # 初始化變量
        init = tf.global_variables_initializer()
        # 如果回話為none,說明還沒有載入模型,需要打開回話
        if self._sess is None :
            self._sess = tf.Session()   
        # 記錄訓練過程
        merge = tf.summary.merge_all()
        # 可視化工具
        writer = tf.summary.FileWriter(logdir,self._sess.graph)
        # run初始化參數
        self._sess.run(init)
        self.initProgressbar()
        for i in range(1,echop+1):
            feed_dict = {self._xs:inputs,self._ys:labels,self._keep_prob:keep_prob}
            self._sess.run(self._train_step,feed_dict=feed_dict)
            if i % step == 0:
                g = self._sess.run(merge,feed_dict=feed_dict)
                writer.add_summary(g,global_step=i)
                cost = self._sess.run(self._loss,feed_dict=feed_dict)
                print("當前損失:%s"%(str(cost)),end='')
                self.showProgressBar(i/echop)
                    
                if savepath:
                    self._saver.save(self._sess,savepath,global_step=i)
        pass

    def predict(self,inputs):
        prediction = tf.argmax(self._prediction,1)
        res = self._sess.run(prediction,feed_dict={self._xs:inputs,self._keep_prob:1.0})
        # labels = np.array([True,False])
        return res

    def evaluate(self,inputs,labels):
        # y_pre = self._sess.run(self._prediction,feed_dict={self._xs:inputs,self._ys:labels,self._keep_prob:1})
        correct_prediction = tf.equal(tf.argmax(self._prediction,1),tf.argmax(labels,1))
        accuracy = tf.reduce_mean(tf.cast(correct_prediction,tf.float32))
        result = self._sess.run(accuracy,feed_dict={self._xs:inputs,self._ys:labels,self._keep_prob:1.0})
        return result

    def restore(self,savepath):
        self._saver = tf.train.Saver()
        self._sess = tf.Session()
        self._saver.restore(self._sess,savepath)
    
    def initProgressbar(self):
        self._timer = Timer()
    
    def showProgressBar(self,rate):
        s = self._timer.format("%dh %dm %ds %dms")
        # length = int(round(rate*100))
        # s1 = "#"*length
        # s2 = "-"*(100-length)
        # print("[",s1,s2,"]",s,rate)
        print(s,"%4.2f%%"%(rate*100))

    def __del__(self):
        if self._sess is not None:
            self._sess.close()

封裝完畢后建立一個神經網絡的代碼:

def getNetWork(initshape=(205,2),path=None):
    # 創建一個神經網絡對象
    nn = NN(initshape)
    # 創建神經網絡的結構
    nn.add_layer(initshape[0],128,tf.nn.relu,name="input_layer")
    nn.add_layer(128,64,tf.nn.sigmoid,name="hidden_layer_1")
    nn.add_dropout(name="dropout_1")
    nn.add_layer(64,32,tf.nn.tanh,name="hidden_layer_2")
    nn.add_dropout(name="dropout_2")
    nn.add_layer(32,16,tf.nn.softplus,name="hidden_layer_3")
    nn.add_layer(16,2,tf.nn.elu,name="hidden_layer_4")
    nn.add_dropout(name="dropout_3")
    nn.add_loss_layer(tf.losses.softmax_cross_entropy,name="cost_layer")
    nn.add_optimizer(tf.train.AdamOptimizer,lr=0.001)
    return nn

 

封裝卷積神經網絡:

class CNN:
    '''
        shape:訓練集的形狀shape[0]表示輸入feature的形狀,shape[1]表示標簽的形狀
    '''
    def __init__(self,**args):
        x_shape = args.get("x_shape")
        y_shape = args.get("y_shape")
        self._savepath = args.get("savepath")
        self._logdir = args.get("logdir","log")
        with tf.name_scope("inputs_plcaceholders"):
            self._xs = tf.placeholder(tf.float32,x_shape)
            self._ys = tf.placeholder(tf.float32,y_shape)
            self._keep_prob = tf.placeholder(tf.float32)
        self._layer_outs = [self._xs]
        self._loss = None
        self._train_step = None
        self._sess = None
        self._saver = None
        pass
    def __del__(self):
        if self._sess is not None:
            if self._saver is not None:
                self._saver.save(self._sess,self._savepath)
            self._sess.close()
        pass

    '''
        features:訓練集特征值
        labels:訓練集標簽
        keep_prob:dropout保留率
        train_rate:訓練集中用來做訓練的數據的比例
        echop:訓練次數
        step:每次測試經過的訓練次數
    '''
    def fit(self,features,labels,keep_prob=0.8,train_rate=1,echop=100,step=10):
        # 訓練集長度
        length = int(train_rate * len(features))
        def b():
            return features[:length],labels[:length]
        self.fitbatch(b,keep_prob,echop=echop,step=step)
        pass

    def fitbatch(self,get_batch=None,keep_prob=0.8,echop=100,step=10):
        if self._savepath:
            self._saver = tf.train.Saver()
        # 初始化參數
        init = tf.global_variables_initializer()
        # 開啟回話
        self._sess = tf.Session()
        # 記錄訓練過程
        merge = tf.summary.merge_all()
        # 可視化工具
        writer = tf.summary.FileWriter(self._logdir,self._sess.graph)
        # 加載參數
        self._sess.run(init)
        timer = network.Timer()
        # 開始訓練
        for counter in range(1,echop+1):
            features,labels = get_batch(counter%6)
            # 填充詞典
            feed_dict = {self._xs:features,self._ys:labels,self._keep_prob:keep_prob}
            if counter == 1:
                for out in self._layer_outs:
                    tmp = self._sess.run(out,feed_dict=feed_dict)
                    print(tmp.shape)
            # 訓練
            self._sess.run(self._train_step,feed_dict=feed_dict)
            # 顯示預測結果
            if counter % step == 0:
                cost = self._sess.run(self._loss,feed_dict=feed_dict)
                print(counter,'\tcost',cost,end='\t')
                timer.log()
                # 記錄訓練過程中的參數
                g = self._sess.run(merge,feed_dict=feed_dict)
                writer.add_summary(g,global_step=counter)
                

    def restore(self):
        self._saver = tf.train.Saver()
        self._sess = tf.Session()
        self._saver.restore(self._sess,self._savepath)
        pass

    def predict(self,features):
        prediction = tf.argmax(self._layer_outs[-1],1)
        # prediction=self._sess.run(self._layer_outs[-1],feed_dict={self._xs:features,self._keep_prob:1.0})
        return self._sess.run(prediction,feed_dict={self._xs:features,self._keep_prob:1.0})

    def evaluate(self,features,labels):
        correct_prediction = tf.equal(tf.argmax(self._layer_outs[-1],1),tf.argmax(self._ys,1))
        accuracy = tf.reduce_mean(tf.cast(correct_prediction,tf.float32))
        feed_dict = {self._xs:features,self._ys:labels,self._keep_prob:1.0}
        return self._sess.run(accuracy,feed_dict=feed_dict)
        
    def _get_w_b(self,shape,name=None):
        with tf.name_scope("{}_weights".format(name)):
            initial=tf.truncated_normal(shape=shape,stddev=0.01)
            w = tf.Variable(initial,name="{}_weights".format(name))
        with tf.name_scope("{}_bias".format(name)):
            b = tf.Variable(tf.constant(0.1,shape=[shape[-1]]),name="{}_bias".format(name))
        return w,b
    '''
        shape:窗口形狀,長寬高與輸出高
    '''
    def add_layer(self,shape,stddev=0.01,strides=[1,10,10,1],padding='SAME',activate_function=tf.nn.relu,name=None):
        if name is None:
            name = "hidden_layer_%d"%len(self._layer_outs)
        with tf.name_scope(name):
            w,b = self._get_w_b(shape,name=name)
            outputs = tf.nn.conv2d(self._layer_outs[-1],w,strides=strides,padding=padding,name=name) + b
            if activate_function:
                outputs = activate_function(outputs) 
            self._layer_outs.append(outputs)
            tf.summary.histogram("{}_output".format(name),outputs)
        pass
    '''
        ksize:窗口大小
        strides:步長
        padding:填充方式
    '''
    def add_max_pool(self,ksize=[1,2,2,1],strides=[1,2,2,1],padding="SAME",name=None):
        if name is None:
            name = "max_pool_layer_%d"%len(self._layer_outs)
        with tf.name_scope(name):
            outputs = tf.nn.max_pool(self._layer_outs[-1],ksize=ksize,strides=strides,padding=padding,name=name)
            self._layer_outs.append(outputs)
            tf.summary.histogram("{}_output".format(name),outputs)
        pass

    def add_dropout(self,name=None):
        if name is None:
            name = "dropout_%d"%len(self._layer_outs)
        with tf.name_scope(name):
            outputs = tf.nn.dropout(self._layer_outs[-1],self._keep_prob,name=name)
            self._layer_outs.append(outputs)
        pass

    def add_cost_layer(self,activate_function,name=None):
        if name is None:
            name = "cost_layer"
        with tf.name_scope(name):
            # self._loss = tf.reduce_mean(activate_function(self._ys,self._layer_outs[-1]))
            self._loss = activate_function(self._ys,self._layer_outs[-1])
            tf.summary.scalar("loss",self._loss)
        pass

    def addOptimizer(self,optimizer,lr=0.01):
        self._train_step = optimizer(lr).minimize(self._loss)
        pass
    '''
        shape:shape[0]表示輸入上一層輸出的神經元個數(本層需要接受的個數)
            shape[1]表示本層神經元的個數(本層輸出的個數)
    '''
    def add_full_layer(self,shape,activate_function=tf.nn.relu,first=False,name=None):
        if name is None:
            name = "full_layer_%d"%len(self._layer_outs)
        with tf.name_scope(name):
            w,b = self._get_w_b(shape,name=name)
            if first:
                x = tf.reshape(self._layer_outs[-1],[-1,shape[0]])
            else:
                x = self._layer_outs[-1]
            outputs = activate_function(tf.matmul(x,w)+b)
            self._layer_outs.append(outputs)
            tf.summary.histogram("{}_output".format(name),outputs)
        pass

建立卷積神經網絡的代碼:

def main(isTrain=True):
    cnn = CNN(x_shape=[None,102,102,2],
        y_shape=[None,2],
        logdir=r"E:\iqaa\logs",
        savepath=r"E:\iqaa\model\model.ckpt")
    cnn.add_layer([5,5,2,4],strides=[1,1,1,1])
    cnn.add_max_pool(ksize=[1,3,3,1],strides=[1,3,3,1])
    cnn.add_layer([5,5,4,8],strides=[1,1,1,1])
    cnn.add_dropout()
    cnn.add_layer([5,5,8,16],strides=[1,1,1,1])
    cnn.add_max_pool([1,3,3,1],[1,3,3,1])
    cnn.add_full_layer([12*12*16,64],first=True)
    cnn.add_dropout()
    cnn.add_full_layer([64,2],first=False) 
    cnn.add_cost_layer(lambda ys,y:tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=ys,logits=y)))
    cnn.addOptimizer(tf.train.AdamOptimizer,0.001)
    if isTrain:
        cnn.fitbatch(lambda i : DataUtil.getbatch(i),keep_prob=0.8,echop=1000,step=20)
        features,labels = DataUtil.getbatch(6)
        print(cnn.evaluate(features,labels))
    else:
        cnn.restore()
    return cnn

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM