自己動手實現深度學習框架-2 核心實現


目標

        完成框架設計文檔中列出的基礎類和需要在基礎類中實現的接口。使用最簡的單多層感知機(Multi-Layer Perceptron)模型對框架進行初步驗證, 因此, 除了框架的核心部分外, 還要實現一個全連接層,一個激活函數,一個優化器和一個損失函數。

框架代碼簡介

        我把這個框架命名為cute-dl, 已經上傳到github上: https://github.com/brandonlyg/cute-dl.
        目錄結構為:
        -- cutedl: 框架實現代碼
        -- example: 示例
        -- test: 單元測試

        MLP示例位於 example/mlp目錄下。

層(Layer)和層參數(LayerParam)

        相關代碼在model.py中.
        LayerParam只有屬性的定義, 沒什么邏輯在里面:

lass LayerParam(object):

    '''
    layer_name: 所屬層的的名字
    name: 參數名
    value: 參數值
    '''
    def __init__(self, layer_name, name, value):
        self.__name = layer_name+"/"+name
        self.value = value

        #梯度
        self.gradient = None
        #更新次數
        self.udt = 0

    @property
    def name(self):
        return self.__name

    def reset(self):
        self.gradient = None
        self.udt = 0

        其中參數名字是使用樹形結構, 例如: "1-MyLayer/W", 是"1-MyLayer"層的"W"參數的名字。其中"1"是層在模型中的唯一ID, "MyLayer"是層的標簽(tag), "W"是參數在這個層中的唯一名字。

        Layer需要實現兩個方法: 一個是__init__方法,一個是join方法. 其他方法不需要實現,只需按設計文檔中的描述給出定義即可。
        先來看看__init__方法:

    '''
    outshape: 輸出形狀 2 或者 (2,3)
    kargs:
        activation: 激活函數的名字
        inshape: 輸入形狀
    '''
    def __init__(self, *outshape, **kargs)
        #輸出形狀
        if len(outshape) == 1 and type(outshape[0]) == type(()):
            self.__outshape = outshape[0]
        else:
            self.__outshape = outshape

        #輸入形狀
        self.__inshape = None

        #得到激活函數
        self.__activation = activations.get('linear')

        #層在模型中的id, 是層在模型中的索引
        self.__id = 0
        #層的名字
        self.__name = '/%d-%s'%(self.__id, self.tag)

        #得到可選參數
        #print("Layer kargs:", kargs)
        if 'inshape' in kargs:
            self.__inshape = kargs['inshape']
            if type(self.__inshape) != type(()):
                self.__inshape = (self.__inshape,)
            #print("------inshape:", self.__inshape)

        if 'activation' in kargs:
            self.__activation = activations.get(kargs['activation'])


        if self.__inshape is not None:
            self.init_params()

        實現的時主要處理這么幾個問題:

  1. 輸入輸出形狀的處理. 保證輸入輸出形狀不論是int還是tuple, 最后都要轉換成tuple。
  2. 必須要有激活函數。默認使用的線性激活函數。
  3. 自動生成層的名字。

        join方法實現:

    '''
    加入到模型中
    pre_layer: 前一個層
    *inshape: 輸入形狀
    '''
    def join(self, pre_layer, *inshape):
        if self.__outshape == (-1,):
            self.__inshape = pre_layer.inshape
            self.__outshape = pre_layer.outshape
        else:
            self.__inshape = pre_layer.outshape
            if len(inshape) != 0:
                self.__inshape = inshape

        self.__id = pre_layer.layer_id + 1
        self.__name = '/%d-%s'%(self.__id, self.tag)

        self.init_params()

        這個方法主要功能是把當前層和另一層連接在一起, 讓另一個層成為當前層的(在模型中的)前一層。這里的"連接"主要體現在: 把另一個層的輸出作為輸入。對層ID的處理上, 使用簡單的累加保證層ID在模型中是唯一的, 同時還能通過ID的值知道層位於模型中的什么位置。 有了輸入輸出形狀, 就可以調用子類實現的init_params方法初始化參數了。

激活函數(Activation)

        激活函數代碼在activation.py中。
        接口定義:

'''
激活函數
'''
class Activation(object):
    name=''

    def __call__(self, in_batch):
        raise Exception("__call__ not implement")

    '''
    求梯度
    gradient: 該函數輸出值的梯度
    '''
    def grad(self, gradient):
        raise Exception("gradient not implement")

        其中類屬性name作為激活函數的名字。
        實現線性激活函數, 作為默認激活函數:

'''
線性激活函數, 沒有激活
'''
class Linear(Activation):
    name='linear'

    def __call__(self, in_batch):
        return in_batch

    def grad(self, gradient):
        return gradient

        實現最常用的relu激活函數:

'''
relu 激活函數
'''
class Relu(Activation):
    name='relu'

    def __init__(self):
        self.__grad = None

    def __call__(self, in_batch):
        #得到 <= 0的數據的索引
        indices =  in_batch <= 0

        in_batch[indices] = 0
        self.__grad = indices

        return in_batch

    def grad(self, gradient):
        gradient[self.__grad] = 0
        self.__grad = None
        return gradient

        實現用名字(name)獲取激活函數:

act_dict = {
    Linear.name: Linear,
    Relu.name: Relu
}

#創建激活函數
def get(name):
    #print(act_dict)
    #print('name:', name)
    ACT = act_dict[name]
    return ACT()

模型(Model)

        首先需要向模型中添加層

'''
    layers: Layer list
    '''
    def __init__(self, layers=None):
        self.__layers = layers

    '''
    添加層
    layer: Layer類型的對象
    '''
    def add(self, layer):
        if self.__layers is None:
            self.__layers = []

        self.__layers.append(layer)

        return self

        __init__和add方法都能實現這個功能。

        然后是層的的訪問能力:

'''
    得到一個Layer對象
    idx: Layer對象的索引
    '''
    def get_layer(self, index):
        self.__check()
        if len(self.__layers) <= index:
            raise Exception("index out of range %d"%len(self.__layers))

        return self.__layers[index]

    @property
    def layer_count(self):
        return len(self.__layers)

    '''
    得到層的迭代器
    '''
    def layer_iterator(self):
        self.__check()

        for ly in self.__layers:
            yield ly

        接下來是組裝模型:

    '''
    組裝模型
    '''
    def assemble(self):
        self.__check()
        count = len(self.__layers)

        #輸入層必須要有輸入形狀
        ly_0 = self.__layers[0]
        if ly_0.inshape is None or len(ly_0.inshape) == 0:
            raise Exception("input layer miss inshape")

        #把每一層的輸入形狀設置為上一層的輸出形狀,
        #設置輸入形狀的同時, 要求該層自動初始化參數(如果有參數的話)
        pre_ly = ly_0
        for ly in self.__layers[1:]:
            ly.join(pre_ly)
            pre_ly = ly

        向前傳播:

    '''
    使用模型預測
    in_batch: 一批輸入數據
    '''
    def predict(self, in_batch, training=False):
        self.__check()

        out = in_batch
        for ly in self.__layers:
            out = ly.forward(out, training)

        return out

        反向傳播:

    '''
    反向傳播梯度
    '''
    def backward(self, gradient):
        g = gradient
        #pdb.set_trace()
        count = len(self.__layers)
        for i in range(count-1, -1, -1):
            ly = self.__layers[i]
            g = ly.backward(g)

訓練上下文會話(Session)

        Session代碼在session.py中。
        初始化__init__:

    '''
    model: Model對象
    loss: Loss對象
    optimizer: Optimizer對象
    '''
    def __init__(self, model, loss, optimizer):
        self.__model = model
        self.__loss = loss
        self.__optimizer = optimizer

會話主要維護模型, 損失函數和優化器。這些對一個簡單的MLP模型來說已經足夠,至於genoptimizer以后再添加。

        訓練模型:

    '''
    分批訓練
    '''
    def batch_train(self, data, label):
        #使用模型預測
        out = self.__model.predict(data, training=True)
        #使用損失函數評估誤差
        loss = self.__loss(out, label)
        grad = self.__loss.gradient
        #pdb.set_trace()
        #反向傳播梯度
        self.__model.backward(self.__loss.gradient)

        #更新模型參數
        self.__optimizer(self.__model)

        return loss

        保存會話:

    '''
    保存session
    fpath: 保存的文件路徑
        fpath+'.s.pkl' 是保存session的文件
        fpath+'.m.pkl' 是保存model的文件
    '''
    def save(self, fpath):
        model = self.__model
        self.__model = None

        model.save(fpath)

        realfp = fpath + ".s.pkl"
        with open(realfp, 'wb') as f:
            pickle.dump(self, f)

        這里把模型和會話分開保存, 是為了以后可以靈活地選擇只加載模型或加載整個會話。下面是模型的保存方法, 在Model中實現:

    '''
    保存模型
    '''
    def save(self, fpath):
        dir = os.path.dirname(fpath)
        if not os.path.exists(dir):
            os.mkdir(dir)

        self.reset()
        realfp = fpath + ".m.pkl"
        with open(realfp, 'wb') as f:
            pickle.dump(self, f)

        加載會話:

    '''
    加載session
    '''
    @classmethod
    def load(cls, fpath):
        realfp = fpath + ".s.pkl"
        if not os.path.exists(realfp):
            return None

        sess = None
        with open(realfp, 'rb') as f:
            sess = pickle.load(f)

        model = Model.load(fpath)
        sess.set_model(model)

        return sess

損失函數(Loss)

        損失函數代碼在loss.py中。首先定義接口:

'''
損失函數
'''
class Loss(object):

    '''
    梯度屬性
    '''
    @property
    def gradient(self):
        raise Exception("gradient not impliment")


    '''
    計算誤差和梯度
    y_true 數據的真實標簽
    y_pred 模型預測的標簽

    return 誤差值
    '''
    def __call__(self, y_true, y_pred):
        raise Exception("__call__ not impliment")

        接下來給出均方誤差損失函數實現:

'''
均方誤差損失函數
'''
class Mse(Loss):

    def __init__(self):
        self.__grad = None

    def __call__(self, y_true, y_pred):
        err = y_true - y_pred
        loss =  (err**2).mean(axis=0)/2

        n = y_true.shape[0]
        self.__grad = err/n
        #pdb.set_trace()
        return loss.sum()

    @property
    def gradient(self):
        return self.__grad

學習率優化器(Optimizer)

        優化器代碼在optimizer.py中。
        定義接口:

'''
學習率優化器
'''
class Optimizer(object):

    '''
    更新參數
    '''
    def __call__(self, model):
        raise Exception('not implement')

        實現一個固定學習率優化器, 沒有用任何參數優化算法。

'''
固定學習率優化器
'''
class Fixed(Optimizer):

    '''
    lt: 學習率
    '''
    def __init__(self, lt=0.01):
        self.__lt = lt

    def __call__(self, model):
        #pdb.set_trace()
        for ly in model.layer_iterator():
            for p in ly.params:
                p.value -= self.__lt * p.gradient
                p.udt += 1

        到目前為止,一個能夠支持最簡單MLP模型的框架已經完成。接下來用一個MLP示例來驗證一下。

MLPS示例

        使用MLP模型完成一個廣義線性回歸的任務, 代碼在examples/mlp/linear-regression.py中。
        假設這個任務是擬合一個二次多項式函數:

'''
任務目標函數
'''
def target_func(x):
    ##加入服從參數(0, 0.25^2)正態分布噪聲
    y = (x - 2)**2 + 0.25 * np.random.randn(len(x))
    return y

看一下這個函數的圖像:

        從使用這個函數采樣得到數據集:

'''
生成數據集
返回: train_x, train_y, test_x, test_y
train_x, train_y 訓練數據集的數據和標簽
test_x, test_y 驗證數據解的數據和標簽
'''
def generate_dataset():
    '''
    生成200條數據, 隨機取出80%條作為訓練數據集, 剩余數據為測試數據集
    '''
    fpath = "./ds.pkl"
    if os.path.exists(fpath):
        with open(fpath, 'rb') as f:
            ds = pickle.load(f)
            return ds

    count = 200
    x = np.linspace(-1, 5, count)
    y = target_func(x)

    #打亂順序
    indices = np.arange(count)
    np.random.shuffle(indices)
    #訓練數據集
    split = int(count*0.8)
    idxs = indices[:split]
    train_x = x[idxs].reshape((-1,1))
    train_y = y[idxs].reshape((-1,1))

    #測試數據集
    idxs = sorted(indices[split:])
    test_x = x[idxs].reshape((-1, 1))
    shape = test_x.shape
    test_y = y[idxs].reshape((-1, 1))

    ds = {
        'train_x': train_x,
        'train_y': train_y,
        'test_x': test_x,
        'test_y': test_y
    }
    with open(fpath, 'wb') as f:
        pickle.dump(ds, f)

    return ds

#得到數據集
ds_0 = generate_dataset()
print("train shape:", ds_0['train_x'].shape)
print("test shape:", ds_0['test_x'].shape)

#訓練集只取一部分
count = 100
ds_1 = {
    'train_x': ds_0['train_x'][:16],
    'train_y': ds_0['train_y'][:16],
    'test_x': ds_0['test_x'],
    'test_y': ds_0['test_y']
}

        這里得到兩個數據集, 一個數據集中有160條訓練數據, 40條驗證數據。另一個中有16條訓練數據和40條驗證數據。

        分批訓練模型:

'''
訓練模型
'''
def train(epochs, ds, model=None, batch_size=64, record_epochs=1):
    #加載/構建session
    sess = None
    if model is None:
        sess = Session.load(model_path)
    else:
        sess = Session(model,
                    loss=losses.Mse(),
                    optimizer = optimizers.Fixed()
                )

    train_x = ds['train_x']
    train_y = ds['train_y']
    test_x = ds['test_x']
    test_y = ds['test_y']

    batchs = int(train_x.shape[0]/batch_size)
    print("epochs:%d, batchs=%d"%(epochs, batchs))

    #記錄訓練歷史
    history = {
        'loss': [],
        'val_loss': [],
        'epochs': [],
        'val_x': test_x,
        'val_y': test_y,
        'val_pred': None
    }

    print("start training ")
    t_start = time.time()
    steps = epochs * batchs

    epoch = 1
    #循環訓練
    for step in range(steps):
        start = (step % batchs) * batch_size
        end = start + batch_size
        batch_x = train_x[start:end]
        batch_y = train_y[start:end]

        loss = sess.batch_train(batch_x, batch_y)

        cur_epoch = int(step/batchs) + 1

        #每輪打印一次
        if step > 0 and  step % batchs == 0:
            print((('epoch:%05d/%d loss=%f'%(cur_epoch, epochs, loss))+' '*50)[:50], end='\r')

        #記錄
        if step % batchs == 0 and (cur_epoch - epoch == record_epochs or cur_epoch == epochs):
            epoch = cur_epoch

            y_pred = sess.model.predict(test_x)
            val_loss = sess.loss(test_y, y_pred)

            history['loss'].append(loss)
            history['val_loss'].append(val_loss)
            history['epochs'].append(epoch)
            history['val_pred']  = y_pred

            print((('epoch:%05d/%d loss=%f, val_loss=%f'%(cur_epoch, epochs, loss, val_loss))+' '*50)[:50], end='\r')
            print("")

    sess.save(model_path)
    print("training finished cost:%f" % (time.time() - t_start))

    return history

        通過這段代碼可以看出,框架雖然看起來可用, 但訓練模型是仍然需要不少代碼,不夠友好。不過沒關系,目前先通過示例積累經驗,以后在把分批訓練的功能加入到Session中。要牢記現階段的主要任務: 對框架進行初步驗證。

驗證1: 使用線性模型擬合目標函數

#欠擬合示例
def fit_1():
    model = Model([
        nnlys.Dense(32, inshape=1),
        nnlys.Dense(1)
    ])
    model.assemble()
    #這個模型是一個線性模型, 用來擬合非線性函數, 模型復雜度不夠,一定會表現出欠擬合
    history = train(20000, ds_0, model, record_epochs=100)
    fit_report(history, report_path+'01.png')

        擬合報告:

可以看到不論是訓練誤差還是驗證誤差都很大, 下面的擬合圖形更是慘不忍睹。模型呈欠擬合。

驗證2:使用同樣多參數的非線性模型擬合目標函數

#使用增加模型復雜度解決欠擬合問題
def fit_2():
    model = Model([
        nnlys.Dense(32, inshape=1, activation='relu'),
        nnlys.Dense(1)
    ])
    model.assemble()
    #使用了relu激活函數模型變成了非線性的, 增加了模型的復雜度
    history = train(30000, ds_0, model, record_epochs=300)
    history['loss'] = history['loss'][5:]
    history['val_loss'] = history['val_loss'][5:]
    history['epochs'] = history['epochs'][5:]
    fit_report(history, report_path+'02.png')

        擬合報告:

擬合情況比較理想。

驗證3: 增加模型復雜度減少訓練數據

#過擬合
def fit_3():
    model = Model([
        nnlys.Dense(512, inshape=1, activation='relu'),
        nnlys.Dense(128, activation='relu'),
        nnlys.Dense(1)
    ])
    model.assemble()
    #使用數據集ds_1, 只有16條訓練數據
    history = train(30000, ds_1, model, batch_size=16, record_epochs=300)
    history['loss'] = history['loss'][20:]
    history['val_loss'] = history['val_loss'][20:]
    history['epochs'] = history['epochs'][20:]
    fit_report(history, report_path+'03.png')

        擬合報告:

可以看到訓練誤差持續降低, 而驗證誤差先低后高, 說明隨着訓練輪次的增加,模型過多地學習到了訓練數據的模式, 導致泛化誤差增大,呈現過過擬合。

驗證3: 仍然使用較少的訓練數據但降低模型的復雜度

#減少參數數量緩解過擬合
def fit_4():
    model = Model([
        nnlys.Dense(128, inshape=1, activation='relu'),
        nnlys.Dense(64, activation='relu'),
        nnlys.Dense(1)
    ])
    model.assemble()

    history = train(30000, ds_1, model, batch_size=16, record_epochs=300)
    history['loss'] = history['loss'][20:]
    history['val_loss'] = history['val_loss'][20:]
    history['epochs'] = history['epochs'][20:]
    fit_report(history, report_path+'04.png')

        擬合報告:

可以看到過擬合現象有所緩解,到25000左右才出現過擬合現象,擬合圖形變得稍微好一點, 過擬合只是略有緩解。

總結

        目前已經實現了一個最簡單可運行的深度學習框架。從驗證情況看,它已經達到預期,能夠支持簡單的MLP模型, 但很直接地暴露出兩個問題:

  1. 訓練模型需要比較多的代碼,不夠友好。
  2. 框架本身沒有提供處理過擬合現象的方案。
    下一步的主要目標就是解決這兩個問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM