Pytorch基礎——使用 RNN 生成簡單序列


一、介紹

內容

使用 RNN 進行序列預測

今天我們就從一個基本的使用 RNN 生成簡單序列的例子中,來窺探神經網絡生成符號序列的秘密。

我們首先讓神經網絡模型學習形如 0^n 1^n 形式的上下文無關語法。然后再讓模型嘗試去生成這樣的字符串。在流程中將演示 RNN 及 LSTM 相關函數的使用方法。

實驗知識點

  • 什么是上下文無關文法
  • 使用 RNN 或 LSTM 模型生成簡單序列的方法
  • 探究 RNN 記憶功能的內部原理

二、什么是上下文無關語法

上下文無關語法

首先讓我們觀察以下序列:

  • 01
  • 0011
  • 000111
  • 00001111
  • ……

它們有什么特點和規律呢?

它們都只含有 0 和 1 並連續地出現,序列長度並不相等,但在每條序列中 0 和 1 的個數是相等的。我們可以用一個簡單的數學表達式來表述所有這些 01 序列的通用規律,其實就是 0^n 1^n,其中 n 就是序列中 0 或者 1 的個數。這樣的序列看似簡單,但其實它在計算機科學中有一個非常響亮的名字,叫做“上下文無關文法”(Context-free grammar)。所謂上下文無關文法,簡單來說,就是可以被一組替代規則所生成,而與本身所處的上下文(前后出現的字符)無關。

上下文無關語法序列的生成

針對上面這種 0^n 1^n 形式的上下文無關語法序列,我們人類要學會數出 0 的個數 n,這樣也就自然知道了 1 的個數。可問題的難點是,對於一個機器來說,它必須自己學習出如何數 0 的個數,而不能從任何其它的途徑獲取 n。這個問題對於人類來說很容易,並且對於一個特定編寫的程序來說也很簡單。但是對於一個通用的神經網絡模型來說,這就並不容易了,因為它自身並不會長出來一個計數器。它必須通過觀察數據歸納總結,發明一種記憶系統從而能夠看出 0 和 1 之間的長程規律,並實現等價的計數功能。尤其是當 n 很大的時候,這個問題將非常困難。因為序列越長,模型對記憶系統的要求就越高。大致了解了思路和關鍵問題后,下面就讓我們來看看如何用 RNN 來解決這個問題。

三、使用 RNN 模型進行序列生成

引入相關包

值得注意的是本次使用了 Counter 搜集器,它可以讓統計詞頻變得更簡單。

# 導入程序所需要的程序包

#PyTorch用的包
import torch
import torch.nn as nn
import torch.optim
from torch.autograd import Variable

from collections import Counter #搜集器,可以讓統計詞頻更簡單

#繪圖、計算用的程序包
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import rc
import numpy as np
#將圖形直接顯示出來
%matplotlib inline

生成訓練數據

為了讓訓練能夠有更好的效果,生成時故意將字符串的長度控制的比較短。為了讓模型可以意識到每個字符串的起始與結束,每個序列中除了 0、1 以外,還有 3、2。其中 3 代表字符串的起始,2 代表字符串的結束。所有字符串都是如下的形式:30001112,300112,3012...

那么下面首先設定控制生成字符串長度的概率。

# 生成的樣本數量
samples = 2000

# 訓練樣本中n的最大值
sz = 10
# 定義不同n的權重,我們按照10:6:4:3:1:1...來配置字符串生成中的n=1,2,3,4,5,...
probability = 1.0 * np.array([10, 6, 4, 3, 1, 1, 1, 1, 1, 1])
# 保證n的最大值為sz
probability = probability[ : sz]
# 歸一化,將權重變成概率
probability = probability / sum(probability)
train_set = []

# 開始生成samples這么多個樣本
for m in range(samples):
    # 對於每一個生成的字符串,隨機選擇一個n,n被選擇的權重被記錄在probability中
    n = np.random.choice(range(1, sz + 1), p = probability)
    # 生成這個字符串,用list的形式完成記錄
    inputs = [0] * n + [1] * n
    # 在最前面插入3表示起始字符,2插入尾端表示終止字符
    inputs.insert(0, 3)
    inputs.append(2)
    train_set.append(inputs) #將生成的字符串加入到train_set訓練集中

在生成訓練數據的同時,也將校驗數據集生成,並保存到 valid_set 中。

valid_set = []

# 再生成samples/10的校驗樣本
for m in range(samples // 10):
    n = np.random.choice(range(1, sz + 1), p = probability)
    inputs = [0] * n + [1] * n
    inputs.insert(0, 3)
    inputs.append(2)
    valid_set.append(inputs)

與訓練數據集不同的是,我們會生成少量的超長序列,也就是 n 超大的序列在校驗數據集中,用以考驗模型的能力極限。

# 再生成若干n超大的校驗樣本
for m in range(2):
    n = sz + m
    inputs = [0] * n + [1] * n
    inputs.insert(0, 3)
    inputs.append(2)
    valid_set.append(inputs)
np.random.shuffle(valid_set)

定義 RNN 模型

PyTorch 提供了豐富的常用模型調用,所以我們無需去實現 RNN 模型的結構,直接調用函數即可。

正因為有了 RNN 函數,定義本次實驗中 RNN 模型的方法與之前定義模型一樣簡單。

# 實現一個簡單的RNN模型
class SimpleRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers = 1):
        # 定義
        super(SimpleRNN, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers
        # 一個embedding層
        self.embedding = nn.Embedding(input_size, hidden_size)
        # PyTorch的RNN層,batch_first標志可以讓輸入的張量的第一個維度表示batch指標
        self.rnn = nn.RNN(hidden_size, hidden_size, num_layers, batch_first = True)
        # 輸出的全鏈接層
        self.fc = nn.Linear(hidden_size, output_size)
        # 最后的logsoftmax層
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):
        # 運算過程
        # 先進行embedding層的計算,它可以把一個數值先轉化為one-hot向量,再把這個向量轉化為一個hidden_size維的向量
        # input的尺寸為:batch_size, num_step, data_dim
        x = self.embedding(input)
        # 從輸入到隱含層的計算
        # x的尺寸為:batch_size, num_step, hidden_size
        output, hidden = self.rnn(x, hidden)
        # 從輸出output中取出最后一個時間步的數值,注意output輸出包含了所有時間步的結果,
        # output輸出尺寸為:batch_size, num_step, hidden_size
        output = output[:,-1,:]
        # output尺寸為:batch_size, hidden_size
        # 喂入最后一層全鏈接網絡
        output = self.fc(output)
        # output尺寸為:batch_size, output_size
        # softmax函數
        output = self.softmax(output)
        return output, hidden

    def initHidden(self):
        # 對隱含單元的初始化
        # 注意尺寸是: layer_size, batch_size, hidden_size
        return Variable(torch.zeros(self.num_layers, 1, self.hidden_size))

我們將上面代碼的某些語句單獨拿出來講一下,首先:

self.embedding = nn.Embedding(input_size, hidden_size)

即輸入首先會經過嵌入層被“壓縮”至 hidden_size 的尺寸。這里嵌入層起到的作用與之前的實驗相同,所以就不細講了。值得一提的是 nn.RNN 這個部件,也就是所謂的 RNN 函數。

self.rnn = nn.RNN(hidden_size, hidden_size, num_layers, batch_first = True)

在定義這個部件的時候,需要指定輸入給 RNN 層的向量尺寸 input_size(這里為輸入經過嵌入后的 hidder_size)。以及 RNN 層隱含節點的數量 hidden_size,還包括 RNN 層的層數 num_layers

最后的參數 batch_first 管理了一個與用戶編程習慣有關系的小細節。當把它設置為 True 的時候,RNN 輸入變量的第一個維度就是批數據(batch)的維度,這與我們使用其它函數的習慣是一樣的。

否則的話,按照 nn.RNN 的默認處理情況,批的維度在第二個位置上,而把第一個維度留給了時間。

訓練 RNN 模型

首先實例化模型,定義模型的損失函數與優化算法

# 生成一個最簡化的RNN,輸入size為4,可能值為0,1,2,3,輸出size為3,可能值為0,1,2
rnn = SimpleRNN(input_size = 4, hidden_size = 2, output_size = 3)
criterion = torch.nn.NLLLoss() #交叉熵損失函數
optimizer = torch.optim.Adam(rnn.parameters(), lr = 0.001) #Adam優化算法

然后是訓練函數。

train_loss = 0

def trainRNN(epoch):
    global train_loss
    train_loss = 0
    # 對train_set中的數據進行隨機洗牌,以保證每個epoch得到的訓練順序都不一樣。
    np.random.shuffle(train_set)
    # 對train_set中的數據進行循環
    for i, seq in enumerate(train_set):
        loss = 0
        hidden = rnn.initHidden()  #初始化隱含層神經元
        # 對每一個序列的所有字符進行循環
        for t in range(len(seq) - 1):
            #當前字符作為輸入,下一個字符作為標簽
            x = Variable(torch.LongTensor([seq[t]]).unsqueeze(0))
            # x尺寸:batch_size = 1, time_steps = 1, data_dimension = 1
            y = Variable(torch.LongTensor([seq[t + 1]]))
            # y尺寸:batch_size = 1, data_dimension = 1
            output, hidden = rnn(x, hidden) #RNN輸出
            # output尺寸:batch_size, output_size = 3
            # hidden尺寸:layer_size =1, batch_size=1, hidden_size
            loss += criterion(output, y) #計算損失函數
        loss = 1.0 * loss / len(seq) #計算每字符的損失數值
        optimizer.zero_grad() # 梯度清空
        loss.backward() #反向傳播,設置retain_variables
        optimizer.step() #一步梯度下降
        train_loss += loss #累積損失函數值
        # 把結果打印出來
        if i > 0 and i % 500 == 0:
            print('第{}輪, 第{}個,訓練Loss:{:.2f}'.format(epoch,
                                                    i,
                                                    train_loss.data.numpy() / i
                                                   ))

驗證函數

valid_loss = 0
errors = 0
show_out = ''

def evaluateRNN():
    global valid_loss
    global errors
    global show_out
    valid_loss = 0
    errors = 0
    show_out = ''
    for i, seq in enumerate(valid_set):
        # 對每一個valid_set中的字符串做循環
        loss = 0
        outstring = ''
        targets = ''
        diff = 0
        hidden = rnn.initHidden() #初始化隱含層神經元
        for t in range(len(seq) - 1):
            # 對每一個字符做循環
            x = Variable(torch.LongTensor([seq[t]]).unsqueeze(0))
            # x尺寸:batch_size = 1, time_steps = 1, data_dimension = 1
            y = Variable(torch.LongTensor([seq[t + 1]]))
            # y尺寸:batch_size = 1, data_dimension = 1
            output, hidden = rnn(x, hidden)
            # output尺寸:batch_size, output_size = 3
            # hidden尺寸:layer_size =1, batch_size=1, hidden_size
            mm = torch.max(output, 1)[1][0] #以概率最大的元素作為輸出
            outstring += str(mm.data.numpy()) #合成預測的字符串
            targets += str(y.data.numpy()[0]) #合成目標字符串
            loss += criterion(output, y) #計算損失函數

            diff += 1 - mm.eq(y).data.numpy()[0] #計算模型輸出字符串與目標字符串之間差異的字符數量
        loss = 1.0 * loss / len(seq)
        valid_loss += loss #累積損失函數值
        errors += diff #計算累積錯誤數
        if np.random.rand() < 0.1:
            #以0.1概率記錄一個輸出字符串
            show_out = outstring + '\n' + targets
    # 打印結果
    print(output[0][2].data.numpy())

在下面的訓練代碼中實際上進行了三重循環,Epoch 作為第一重循環,然后在 trainRNN 中對每個 train_set 中的字符串做第二重循環,最后是對每一個字符串中的每一個字符做循環。

#重復進行20次試驗
num_epoch = 20
results = []
for epoch in range(num_epoch):
    # 調用訓練函數
    trainRNN(epoch)

    # 在校驗集上測試
    evaluateRNN()

    # 打印結果
    print('第{}輪, 訓練Loss:{:.2f}, 校驗Loss:{:.2f}, 錯誤率:{:.2f}'.format(epoch, 
                                                               train_loss.data.numpy() / len(train_set),
                                                               valid_loss.data.numpy() / len(valid_set),
                                                               1.0 * errors / len(valid_set)
                                                              ))
    print(show_out)
    # 將結果保存起來
    results.append([train_loss.data.numpy() / len(train_set), 
                    valid_loss.data.numpy() / len(train_set),
                   1.0 * errors / len(valid_set)
                   ])
# 保存、提取模型(為展示用)
torch.save(rnn,'rnn.mdl')
rnn = torch.load('rnn.mdl')

觀察 RNN 模型的學習結果

下面讓 n 從 0 循環到 20,考察隨着序列的增強,模型的預測效果會有怎樣的變化。只有當模型能夠預測出最后一個 1 以及后面應該是跟 2(字串結束字符)才算預測正確,也就意味着模型記憶住了 n 這個數字。

# 讓n取0到20,看RNN是否能夠成功預測下一個字符
for n in range(20):

    inputs = [0] * n + [1] * n
    inputs.insert(0, 3)
    inputs.append(2)
    outstring = ''
    targets = ''
    diff = 0
    hiddens = []
    hidden = rnn.initHidden()
    for t in range(len(inputs) - 1):
        x = Variable(torch.LongTensor([inputs[t]]).unsqueeze(0))
        y = Variable(torch.LongTensor([inputs[t + 1]]))
        output, hidden = rnn(x, hidden)

        mm = torch.max(output, 1)[1][0]
        outstring += str(mm.data.numpy())
        targets += str(y.data.numpy()[0])

        diff += 1 - mm.eq(y).data.numpy()[0]
    print(n)
    print(outstring)
    print(targets)
    print('Diff:{}'.format(diff))

可以看到,對於大部分的預測序列來說,經過長時間訓練的 RNN 僅僅犯少量的錯誤,就是當輸入從0變為1的那個瞬間。當 n 等於 14 時,開始出現大量錯誤,所以可以認為這個簡單的 RNN 神經網絡模型的記憶容量差不多就是 13。

四、使用 LSTM 模型進行序列生成

實現一個LSTM

那么下面就開始實現這個 LSTM 模型,因為 PyTorch 同樣將 LSTM 結構封裝的如此簡潔,以至於 LSTM 模型代碼幾乎和 RNN 模型代碼沒有什么區別。

唯一不同的就是模型中調用 RNN 的位置現在改為了調用 LSTM 結構,即:

self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first = True)

LSTM 函數的各個參數意義也是與 RNN 相同的。

# 一個手動實現的LSTM模型,除了初始化隱含但願部分,所有代碼基本與SimpleRNN相同

class SimpleLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers = 1):
        super(SimpleLSTM, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers
        # 一個embedding層
        self.embedding = nn.Embedding(input_size, hidden_size)
        # 隱含層內部的相互鏈接
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first = True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):

        # 先進行embedding層的計算,它可以把一個
        # x的尺寸:batch_size, len_seq, input_size
        x = self.embedding(input)
        # x的尺寸:batch_size, len_seq, hidden_size
        # 從輸入到隱含層的計算
        output, hidden = self.lstm(x, hidden)
        # output的尺寸:batch_size, len_seq, hidden_size
        # hidden: (layer_size, batch_size, hidden_size),(layer_size, batch_size,hidden_size)
        output = output[:,-1,:]
        # output的尺寸:batch_size, hidden_size
        output = self.fc(output)
        # output的尺寸:batch_size, output_size
        # softmax函數
        output = self.softmax(output)
        return output, hidden

    def initHidden(self):
        # 對隱含單元的初始化
        # 注意尺寸是: layer_size, batch_size, hidden_size
        # 對隱單元的初始化
        # 對引單元輸出的初始化,全0.
        # 注意hidden和cell的維度都是layers,batch_size,hidden_size
        hidden = Variable(torch.zeros(self.num_layers, 1, self.hidden_size))
        # 對隱單元內部的狀態cell的初始化,全0
        cell = Variable(torch.zeros(self.num_layers, 1, self.hidden_size))
        return (hidden, cell)

雖然說 LSTM 模型的代碼與 RNN 幾乎相同,但有一個地方需要注意一下。就是在初始化隱藏層狀態的時候,LSTM 除了初始化隱藏層的狀態,還初始化了隱含層內部細胞的狀態,也就是各個“門控單元”的狀態。

訓練簡單 LSTM 模型

與 RNN 模型相同,下面進行 LSTM 模型的訓練。首先實例化模型,定義模型的損失函數與優化算法:

lstm = SimpleLSTM(input_size = 4, hidden_size = 1, output_size = 3, num_layers = 1)
criterion = torch.nn.NLLLoss()
optimizer = torch.optim.Adam(lstm.parameters(), lr = 0.001)

然后是定義訓練函數:

train_loss = 0

def trainLSTM(epoch):
    global train_loss
    train_loss = 0
    np.random.shuffle(train_set)
    # 開始所有訓練數據的循環
    for i, seq in enumerate(train_set):
        loss = 0
        hidden = lstm.initHidden()
        # 開始每一個字符的循環
        for t in range(len(seq) - 1):
            x = Variable(torch.LongTensor([seq[t]]).unsqueeze(0))
            # x的尺寸:batch_size, len_seq, hidden_size
            y = Variable(torch.LongTensor([seq[t + 1]]))
            # y的尺寸:batch_size, data_dimension
            output, hidden = lstm(x, hidden)
            # output的尺寸:batch_size, data_dimension
            # hidden: (layer_size, batch_size, hidden_size),(layer_size, batch_size,hidden_size)
            loss += criterion(output, y)
        loss = 1.0 * loss / len(seq)
        optimizer.zero_grad()
        loss.backward(retain_graph = True)
        optimizer.step()
        train_loss += loss
        if i > 0 and i % 500 == 0:
            print('第{}輪, 第{}個,訓練Loss:{:.2f}'.format(epoch,
                                                    i,
                                                    train_loss.data.numpy() / i
                                                   ))

然后是驗證函數

valid_loss = 0
errors = 0
show_out = ''

def evaluateRNN():
    global valid_loss
    global errors
    global show_out
    valid_loss = 0
    errors = 0
    show_out = ''
    for i, seq in enumerate(valid_set):
        loss = 0
        outstring = ''
        targets = ''
        diff = 0
        hidden = lstm.initHidden()
        for t in range(len(seq) - 1):
            x = Variable(torch.LongTensor([seq[t]]).unsqueeze(0))
            # x的尺寸:batch_size, len_seq, hidden_size
            y = Variable(torch.LongTensor([seq[t + 1]]))
            # y的尺寸:batch_size, data_dimension
            output, hidden = lstm(x, hidden)
            # output的尺寸:batch_size, data_dimension
            # hidden: (layer_size, batch_size, hidden_size),(layer_size, batch_size,hidden_size)
            mm = torch.max(output, 1)[1][0]
            outstring += str(mm.data.numpy())
            targets += str(y.data.numpy()[0])
            loss += criterion(output, y)

            diff += 1 - mm.eq(y).data.numpy()[0]
        loss = 1.0 * loss / len(seq)
        valid_loss += loss
        errors += diff
        if np.random.rand() < 0.1:
            show_out = outstring + '\n' + targets
    print(output[0][2].data.numpy())

下面正式進行 LSTM 模型的訓練。LSTM 模型的訓練流程與 RNN 模型是一樣的。

num_epoch = 20
results = []

# 開始訓練循環
for epoch in range(num_epoch):
    trainLSTM(epoch)        
    # 在校驗集上跑結果
    evaluateRNN()
    print('第{}輪, 訓練Loss:{:.2f}, 校驗Loss:{:.2f}, 錯誤率:{:.2f}'.format(epoch, 
                                                               train_loss.data.numpy() / len(train_set),
                                                               valid_loss.data.numpy() / len(valid_set),
                                                               1.0 * errors / len(valid_set)
                                                              ))
    print(show_out)
    results.append([train_loss.data.numpy() / len(train_set), 
                    valid_loss.data.numpy() / len(train_set),
                   1.0 * errors / len(valid_set)
                   ])
# 保存、提取模型(為展示用)
torch.save(lstm,'lstm.mdl')
lstm = torch.load('lstm.mdl')

再來看看這個# 讓n取0到20,看SimpleLSTM是否能夠成功預測下一個字符
for n in range(20):

inputs = [0] * n + [1] * n
inputs.insert(0, 3)
inputs.append(2)
outstring = ''
targets = ''
diff = 0
hiddens = []
hidden = lstm.initHidden()
for t in range(len(inputs) - 1):
    x = Variable(torch.LongTensor([inputs[t]]).unsqueeze(0))
    y = Variable(torch.LongTensor([inputs[t + 1]]))
    output, hidden = lstm(x, hidden)

    mm = torch.max(output, 1)[1][0]
    outstring += str(mm.data.numpy())
    targets += str(y.data.numpy()[0])

    diff += 1 - mm.eq(y).data.numpy()[0]
print(n)
print(outstring)
print(targets)
print('Diff:{}'.format(diff))LSTM網絡在測試集上的表現如何


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM