一、概述
1.主題:整個文本將基於《安娜卡列妮娜》這本書的英文文本作為LSTM模型的訓練數據,輸入為單個字符,通過學習整個英文文檔的字符(包括字母和標點符號等)來進行文本生成。
2.單詞層級和字符層級的區別:
1、基於字符的語言模型的好處在於處理任何單詞,標點和其他文檔結構時僅需要很小的詞匯量,而且更加靈活。
2、語言模型的目標是根據之前的 token 預測下一個 token。標准的做法是最后一層在詞匯表的每個 token 上計算一個 softmax。當詞匯庫較大時,softmax 步驟和相關的梯度計算就會成為訓練語言模型的性能瓶頸。
在開始建模之前,我們首先要明確我們的輸入和輸出。即輸入是字符,輸出是預測出的新字符。
3、相比於基於字符的模型,基於單詞的模型顯示出更高的准確性。這是因為后一種模式需要一個更大的網絡來學習長期相關關系,因為它不僅要記住單詞的順序,還必須學會去預測一個單詞在語法上的正確性。
3.原文
《安娜卡列尼娜》文本生成——利用TensorFlow構建LSTM模型
https://zhuanlan.zhihu.com/p/27087310
二、模型構建
文本生成模型構建主要包括以下四個部分:
-
1.數據預處理:加載數據、轉換數據、分割數據mini-batch
-
2.模型構建:輸入層,LSTM層,輸出層,訓練誤差,loss,optimizer
-
3.模型訓練:設置模型參數對模型進行訓練
-
4.生成新文本:使用訓練好的模型,輸入一個單詞,得到后續輸出。
1. 數據預處理
思考:如何構造RNN中的訓練格式的樣本?
訓練過程,輸入的數據是以batch形式完成的。每個batch包含 batch_size 個樣本,每個樣本長度設置為 n_seqs,即包含n_seqs個字符。現在有一個長度為N個字符的原始文本。可知:訓練集可以划分為m個batch
同時,原始文本先轉換為字符對應的索引。
因為LSTM是用前一個字符預測后一個字符,所以,label 的值為 x 后移一個字符。
import time
import numpy as np
import tensorflow as tf
with open('data/anna.txt','r') as f:
text = f.read()
vocab = set(text)
vocab_to_int = {c:i for i,c in enumerate(vocab)}
int_to_vocab = dict(enumerate(vocab))
encoded = np.array([vocab_to_int[c] for c in text],dtype=np.int32)
def get_batches(arr,batch_size,n_seqs):
# batch_size:一個batch中樣本數
# n_seqs:每個樣本的長度
batch_len = batch_size * n_seqs
n_batches = int(len(arr)/ batch_len)
# 這里我們僅保留完整的batch,對於不能整除的部分進行舍棄
arr = arr[:batch_len * n_batches]
arr = arr.reshape((batch_size,-1))
for n in range(0,arr.shape[1],n_steps):
x = arr[:, n:n+n_steps]
y = np.zeros_like(x)
# y 為當前batch中x向左平移一個單位的結果
y[:,:-1],y[:,-1] = x[:,1:], x[:,0]
yield x,y
2. 輸入層
每次輸入一個batch
def build_inputs(batch_size, n_seqs):
'''
構建輸入層
batch_size: 每個batch中的序列個數
n_seqs: 每個序列包含的字符數
'''
inputs = tf.placeholder(tf.int32, shape=(batch_size, n_seqs), name='inputs')
targets = tf.placeholder(tf.int32, shape=(batch_size, n_seqs), name='targets')
# 加入keep_prob
keep_prob = tf.placeholder(tf.float32, name='keep_prob')
return inputs, targets, keep_prob
3.LSTM層
tf.contrib.rnn有兩個包:
BasicLSTMCell: 平常說的LSTM,
LSTMCell: LSTM升級版,加了clipping,projection layer,peep-hole等操作。
MultiRNNCell:實現了對基本LSTM cell的順序堆疊。
dynamic_rnn:實現循環調用LSTMCell,實現神經網絡前向計算。
def build_lstm(lstm_size,num_layers,batch_size,keep_prob):
def lstm_cell():
lstm = tf.nn.rnn_cell.BasicLSTMCell(lstm_size)
drop = tf.contrib.rnn.DropoutWrapper(lstm, output_keep_prob=keep_prob)
return drop
# 堆疊多個LSTM單元。每層的單元都是重新實例化的,而非同一個。
cell = tf.contrib.rnn.MultiRNNCell([lstm_cell() for _ in range(num_layers)])
initial_state = cell.zero_state(batch_size,tf.float32)
return cell,initial_state
4.輸出層
指整個網絡的的最終輸出。
輸出層采用softmax,它與LSTM進行全連接。對於每一個字符來說,它經過LSTM后的輸出大小是(1,lstm_size),
所以一個batch經過LSTM的輸出為\((batch\_size,n\_seqs,lstm\_size)\)。要將這個輸出與softmax全連接層建立連接,就需要將LSTM的輸出reshape為\((batch\_size*n\_seqs,lstm\_size)\)。
softmax層的結點數應該是vocab的大小(我們要計算概率分布)。因此整個LSTM層到softmax層權重矩陣的大小為\((lstm\_size,vocab\_size)\)。
最終的輸出logits為\((batch\_size*n\_seqs,vocab\_size)\)
out為 \((batch\_size*n\_seqs,vocab\_size)\)
def build_output(lstm_output, in_size, out_size):
'''
構造輸出層
lstm_output: lstm層的輸出結果
in_size: lstm輸出層重塑后的size
out_size: softmax層的size
'''
# 將lstm的輸出按照列concate,例如[[1,2,3],[7,8,9]],
# tf.concat的結果是[1,2,3,7,8,9]
seq_output = tf.concat(lstm_output,1) # tf.concat(values,concat_dim)
# reshape
x = tf.reshape(seq_output, [-1, in_size])
# 將lstm層與softmax層全連接
with tf.variable_scope('softmax'):
softmax_w = tf.Variable(tf.truncated_normal([in_size, out_size], stddev=0.1))
softmax_b = tf.Variable(tf.zeros(out_size))
# 計算logits
logits = tf.matmul(x, softmax_w) + softmax_b
# softmax層返回概率分布
out = tf.nn.softmax(logits, name='predictions')
return out, logits
5.損失函數
采用tf.nn.softmax_cross_entropy_with_logits交叉熵來計算loss。
該函數進行兩步運算:
首先對logits進行softmax計算,
根據softmax計算后的結果和labels來計算交叉熵損失。
計算出的結果是向量形式, shape = (batch_size,),因此需要 reduce_mean來進行求均值。
def build_loss(logits, targets, lstm_size, num_classes):
'''
根據logits和targets計算損失
logits: 全連接層的輸出結果(不經過softmax)
targets: 真實標簽,形狀為(batch_size,n_seqs)
lstm_size
num_classes: vocab_size
'''
# One-hot編碼
y_one_hot = tf.one_hot(targets, num_classes)
y_reshaped = tf.reshape(y_one_hot, logits.get_shape())
# Softmax cross entropy loss
loss = tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=y_reshaped)
loss = tf.reduce_mean(loss)
return loss
6. 優化器
采用gradient clippling的方式來防止梯度爆炸。即通過設置一個閾值,當gradients超過這個閾值時,就將它重置為閾值大小,這就保證了梯度不會變得很大。
優化器的構造流程:
1、找到網絡中的可訓練參數,因為要對 w,b進行更新,
2、計算梯度:tf.gradients(loss,訓練參數),
3、梯度裁剪:tf.clip_by_global_norm(梯度,閾值),
4、實例化一個優化器:train_op = tf.train.AdadeltaOptimizer(學習率)
5、優化器進行梯度下降更新訓練參數,得到一個op:optimizer = train_op.apply_gradients(zip(grades,tvars))
def build_optimizer(loss, learning_rate, grad_clip):
'''
構造Optimizer
loss: 損失
learning_rate: 學習率
'''
# 使用clipping gradients
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(loss, tvars), grad_clip)
train_op = tf.train.AdamOptimizer(learning_rate)
optimizer = train_op.apply_gradients(zip(grads, tvars))
return optimizer
三、模型訓練
1.經過上面五個步驟,我們完成了所有的模塊設置。將這些部分組合起來,構建一個類。
class CharRNN:
def __init__(self, num_classes, batch_size=64, n_seqs=50,
lstm_size=128, num_layers=2, learning_rate=0.001,
grad_clip=5, sampling=False):
# 預測階段,batch_size=1,文本長度=1;即輸入一個字符,預測下一個字符。
if sampling == True:
batch_size, n_seqs = 1, 1
else:
batch_size, n_seqs = batch_size, n_seqs
tf.reset_default_graph()
# 輸入層
self.inputs, self.targets, self.keep_prob = build_inputs(batch_size, n_seqs)
# LSTM層
cell, self.initial_state = build_lstm(lstm_size, num_layers, batch_size, self.keep_prob)
# 對輸入進行one-hot編碼
x_one_hot = tf.one_hot(self.inputs, num_classes)
# 運行RNN outputs [batch_size,n_seqs,lstm_size]
outputs, state = tf.nn.dynamic_rnn(cell, x_one_hot, initial_state=self.initial_state)
self.final_state = state
# 預測結果
self.prediction, self.logits = build_output(outputs, lstm_size, num_classes)
# Loss 和 optimizer (with gradient clipping)
self.loss = build_loss(self.logits, self.targets, lstm_size, num_classes)
self.optimizer = build_optimizer(self.loss, learning_rate, grad_clip)
2、訓練階段
1、實例化CharRNN
2、給定輸入數據:feed字典
3、對sess.run([op],feed)
batch_size = 100 # Sequences per batch
n_seqs = 100 # 序列長度
lstm_size = 512 # Size of hidden layers in LSTMs
num_layers = 2 # Number of LSTM layers
learning_rate = 0.001 # Learning rate
keep_prob = 0.5 # Dropout keep probability``
config = tf.ConfigProto(
allow_soft_placement=True, # 自動選擇CPU 還是GPU
log_device_placement=False # 是否打印設備日志
)
epochs = 20
save_every_n = 200 # 200個batch保存一次模型
model = CharRNN(len(vocab), batch_size=batch_size, n_seqs=n_seqs,
lstm_size=lstm_size, num_layers=num_layers,
learning_rate=learning_rate)
saver = tf.train.Saver(max_to_keep=100) # Maximum number of recent checkpoints to keep. Defaults to 5。默認保存最近的5個模型的參數。
with tf.Session(config=config) as sess:
sess.run(tf.global_variables_initializer())
counter = 0
for e in range(epochs):
new_state = sess.run(model.initial_state)
loss = 0
for x,y in get_batches(encoded,batch_size,n_seqs):
counter += 1
start = time.time()
feed = {
model.inputs:x,
model.targets:y,
model.keep_prob:keep_prob,
model.initial_state:new_state
}
batch_loss,new_state,_ = sess.run([model.loss,model.final_state,model.optimizer],feed_dict=feed)
end = time.time()
# control the print lines
if counter % 100 == 0:
print('輪數: {}/{}... '.format(e+1, epochs),
'訓練步數: {}... '.format(counter),
'訓練誤差: {:.4f}... '.format(batch_loss),
'{:.4f} sec/batch'.format((end-start)))
if (counter % save_every_n == 0):
saver.save(sess, "checkpoints/i{}_l{}.ckpt".format(counter, lstm_size))
saver.save(sess,"checkpoints/i{}_l{}.ckpt".format(counter,lstm_size))
ps:訓練過程遇到的問題
1、開始使用CPU進行訓練,最后的訓練誤差為3.2,感覺出了問題。換成GPU之后:
輪數: 19/20... 訓練步數: 3600... 訓練誤差: 1.2566... 0.1666 sec/batch
輪數: 19/20... 訓練步數: 3700... 訓練誤差: 1.2511... 0.1556 sec/batch
輪數: 20/20... 訓練步數: 3800... 訓練誤差: 1.1943... 0.1606 sec/batch
輪數: 20/20... 訓練步數: 3900... 訓練誤差: 1.2473... 0.1626 sec/batch
四、文本生成:
現在我們可以基於我們的訓練參數進行文本的生成。當我們輸入一個字符時,LSTM會預測下一個字符,我們再將新的字符進行輸入,這樣能不斷的循環下去生成本文。
為了減少噪音,每次的預測值我會選擇最可能的前5個進行隨機選擇,比如輸入h,預測結果概率最大的前五個為[o,e,i,u,b],我們將隨機從這五個中挑選一個作為新的字符,讓過程加入隨機因素會減少一些噪音的生成。
def pick_top_n(preds, vocab_size, top_n=5):
"""
從預測結果中選取前top_n個最可能的字符
preds: 預測結果
vocab_size
top_n
"""
p = np.squeeze(preds)
# 將除了top_n個預測值的位置都置為0
p[np.argsort(p)[:-top_n]] = 0
# 歸一化概率
p = p / np.sum(p)
# 隨機選取一個字符
c = np.random.choice(vocab_size, 1, p=p)[0]
return c
np.squeeze(),從數組的形狀中刪除單維條目,即把shape中為1的維度去掉。
在預測階段,輸入樣本形狀為(1,1)
preds為輸出層的輸出,即(1,83),表示當前輸入字符為字符表中每個字符的概率。
p = np.array([長度為83的列表]),取top_n個值。
def sample(checkpoint, n_samples, lstm_size, vocab_size, prime="The "):
"""
生成新文本
checkpoint: 某一輪迭代的參數文件
n_sample: 新文本的字符長度
lstm_size: 隱層結點數
vocab_size
prime: 起始文本
"""
# 將輸入的單詞轉換為單個字符組成的list
samples = [c for c in prime]
# sampling=True意味着batch的size=1 x 1
model = CharRNN(len(vocab), lstm_size=lstm_size, sampling=True)
saver = tf.train.Saver()
with tf.Session() as sess:
# 加載模型參數,恢復訓練
saver.restore(sess, checkpoint)
new_state = sess.run(model.initial_state)
for c in prime:
x = np.zeros((1, 1))
# 輸入單個字符
x[0,0] = vocab_to_int[c]
feed = {model.inputs: x,
model.keep_prob: 1.,
model.initial_state: new_state}
preds, new_state = sess.run([model.prediction, model.final_state],
feed_dict=feed)
c = pick_top_n(preds, len(vocab))
# 添加字符到samples中
samples.append(int_to_vocab[c])
# 不斷生成字符,直到達到指定數目
for i in range(n_samples):
x[0,0] = c
feed = {model.inputs: x,
model.keep_prob: 1.,
model.initial_state: new_state}
preds, new_state = sess.run([model.prediction, model.final_state],
feed_dict=feed)
c = pick_top_n(preds, len(vocab))
samples.append(int_to_vocab[c])
return ''.join(samples)
在for循環中,每次輸出的結果包含字符和隱狀態,隱狀態作為下一步網絡的輸入,字符保存到列表作為最后的生成文本。
tf.train_latest_checkpoint()方法,可以選擇最后的訓練的參數作為網絡參數。
# 選用最終的訓練參數作為輸入進行文本生成
checkpoint = tf.train.latest_checkpoint('checkpoints')
samp = sample(checkpoint, 2000, lstm_size, len(vocab), prime="The")
print(samp)
參考資料
1、字符級NLP優劣分析:在某些場景中比詞向量更好用
https://flashgene.com/archives/28609.html
2、《安娜卡列尼娜》文本生成——利用TensorFlow構建LSTM模型
https://zhuanlan.zhihu.com/p/27087310
3、TensorFlow學習筆記(5):交叉熵損失函數實現
https://zhuanlan.zhihu.com/p/44901953