一、介紹
知識點
- 使用 Python 從網絡上爬取信息的基本方法
- 處理語料“洗數據”的基本方法
- 詞袋模型搭建方法
- 簡單 RNN 的搭建方法
- 簡單 LSTM 的搭建方法
二、從網絡中抓取並處理數據
引入相關包
下載數據 網盤鏈接:https://pan.baidu.com/s/1Jg5NPxc9L-M8Tdgh70Tvig 提取碼:dpqq
# 導入程序所需要的程序包
#PyTorch用的包
import torch
import torch.nn as nn
import torch.optim
from torch.autograd import Variable
# 自然語言處理相關的包
import re #正則表達式的包
import jieba #結巴分詞包
from collections import Counter #搜集器,可以讓統計詞頻更簡單
#繪圖、計算用的程序包
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
數據預處理(洗數據)
提供了爬取好的評論文本在 data/good.txt 以及 data/bad.txt 中。
# 將文本中的標點符號過濾掉
def filter_punc(sentence):
sentence = re.sub("[\s+\.\!\/_,$%^*(+\"\'“”《》?“]+|[+——!,。?、~@#¥%……&*():]+", "", sentence)
return(sentence)
在下面的函數代碼中,首先會循環文本文件中的每行評論,過濾標點符號,分詞。
將分詞好的評論分別放置在 pos_sentences 與 neg_sentences 中。
# 掃描所有的文本,分詞、建立詞典,分出正向還是負向的評論,is_filter可以過濾是否篩選掉標點符號
def Prepare_data(good_file, bad_file, is_filter = True):
all_words = [] #存儲所有的單詞
pos_sentences = [] #存儲正向的評論
neg_sentences = [] #存儲負向的評論
with open(good_file, 'r') as fr:
for idx, line in enumerate(fr):
if is_filter:
#過濾標點符號
line = filter_punc(line)
#分詞
words = jieba.lcut(line)
if len(words) > 0:
all_words += words
pos_sentences.append(words)
print('{0} 包含 {1} 行, {2} 個詞.'.format(good_file, idx+1, len(all_words)))
count = len(all_words)
with open(bad_file, 'r') as fr:
for idx, line in enumerate(fr):
if is_filter:
line = filter_punc(line)
words = jieba.lcut(line)
if len(words) > 0:
all_words += words
neg_sentences.append(words)
print('{0} 包含 {1} 行, {2} 個詞.'.format(bad_file, idx+1, len(all_words)-count))
#建立詞典,diction的每一項為{w:[id, 單詞出現次數]}
diction = {}
cnt = Counter(all_words)
for word, freq in cnt.items():
diction[word] = [len(diction), freq]
print('字典大小:{}'.format(len(diction)))
return(pos_sentences, neg_sentences, diction)
這兩個函數一個是用“詞”來查“索引號”,一個是用“索引號”來查“詞”。
#根據單詞返還單詞的編碼
def word2index(word, diction):
if word in diction:
value = diction[word][0]
else:
value = -1
return(value)
#根據編碼獲得單詞
def index2word(index, diction):
for w,v in diction.items():
if v[0] == index:
return(w)
return(None)
下面將讀取包含商品評論信息的兩個文本文件。
good_file = 'data/good.txt'
bad_file = 'data/bad.txt'
pos_sentences, neg_sentences, diction = Prepare_data(good_file, bad_file, True)
st = sorted([(v[1], w) for w, v in diction.items()])
print(st)
三、基於詞袋模型的簡單文本分類器
詞帶模型就是將一句話中的所有單詞都放進一個袋子里(單詞表),而忽略掉語法、語義,甚至單詞之間的順序這些所有的信息。最終只關心每一個單詞的數量,然后根據這個數量來建立對句子進行表征的向量。
訓練數據准備
輸入一個句子和相應的詞典,得到這個句子的向量化表示。
# 輸入一個句子和相應的詞典,得到這個句子的向量化表示。
def sentence2vec(sentence, dictionary):
vector = np.zeros(len(dictionary))
for l in sentence:
vector[l] += 1
return(1.0 * vector / len(sentence))
以句子為單位,將所有的積極情感的評論文本,全部轉化為句子向量,並保存到數據集 dataset 中。
# 遍歷所有句子,將每一個詞映射成編碼
dataset = [] #數據集
labels = [] #標簽
sentences = [] #原始句子,調試用
# 處理正向評論
for sentence in pos_sentences:
new_sentence = []
for l in sentence:
if l in diction:
new_sentence.append(word2index(l, diction))
dataset.append(sentence2vec(new_sentence, diction))
labels.append(0) #正標簽為0
sentences.append(sentence)
對於消極情緒的評論如法炮制
# 處理負向評論
for sentence in neg_sentences:
new_sentence = []
for l in sentence:
if l in diction:
new_sentence.append(word2index(l, diction))
dataset.append(sentence2vec(new_sentence, diction))
labels.append(1) #負標簽為1
sentences.append(sentence)
下面進行數據集的切分。
#打亂所有的數據順序,形成數據集
# indices為所有數據下標的一個全排列
indices = np.random.permutation(len(dataset))
#重新根據打亂的下標生成數據集dataset,標簽集labels,以及對應的原始句子sentences
dataset = [dataset[i] for i in indices]
labels = [labels[i] for i in indices]
sentences = [sentences[i] for i in indices]
#對整個數據集進行划分,分為:訓練集、校准集和測試集,其中校准和測試集合的長度都是整個數據集的10分之一
test_size = len(dataset) // 10
train_data = dataset[2 * test_size :]
train_label = labels[2 * test_size :]
valid_data = dataset[: test_size]
valid_label = labels[: test_size]
test_data = dataset[test_size : 2 * test_size]
test_label = labels[test_size : 2 * test_size]
模型定義
建立多層前饋網絡
# 一個簡單的前饋神經網絡,三層,第一層線性層,加一個非線性ReLU,第二層線性層,中間有10個隱含層神經元
# 輸入維度為詞典的大小:每一段評論的詞袋模型
model = nn.Sequential(
nn.Linear(len(diction), 10),
nn.ReLU(),
nn.Linear(10, 2),
nn.LogSoftmax(dim=1),
)
編寫的是計算預測錯誤率的函數。
def rightness(predictions, labels):
"""計算預測錯誤率的函數,其中predictions是模型給出的一組預測結果,
batch_size行num_classes列的矩陣,labels是數據之中的正確答案"""
# 對於任意一行(一個樣本)的輸出值的第1個維度,求最大,得到每一行的最大元素的下標
pred = torch.max(predictions.data, 1)[1]
#將下標與labels中包含的類別進行比較,並累計得到比較正確的數量
rights = pred.eq(labels.data.view_as(pred)).sum()
#返回正確的數量和這一次一共比較了多少元素
return rights, len(labels)
訓練模型
建立好神經網絡之后,就可以進行訓練了。
# 損失函數為交叉熵
cost = torch.nn.NLLLoss()
# 優化算法為Adam,可以自動調節學習率
optimizer = torch.optim.Adam(model.parameters(), lr = 0.01)
# 記錄列表,記錄訓練時的各種數據,以用於繪圖
records = []
# loss 列表,用於記錄訓練中的 loss
losses = []
def trainModel(data, label):
# 需要將輸入的數據進行適當的變形,主要是要多出一個batch_size的維度,也即第一個為1的維度
# 這樣做是為了適應 PyTorch 函數的特殊用法,具體可以參考 PyTorch 官方文檔
x = Variable(torch.FloatTensor(data).view(1,-1))
# x的尺寸:batch_size=1, len_dictionary
# 標簽也要加一層外衣以變成1*1的張量
y = Variable(torch.LongTensor(np.array([label])))
# y的尺寸:batch_size=1, 1
# 清空梯度
optimizer.zero_grad()
# 模型預測
predict = model(x)
# 計算損失函數
loss = cost(predict, y)
# 將損失函數數值加入到列表中
losses.append(loss.data.numpy())
# 開始進行梯度反傳
loss.backward()
# 開始對參數進行一步優化
optimizer.step()
驗證模型的函數
def evaluateModel(data, label):
x = Variable(torch.FloatTensor(data).view(1, -1))
y = Variable(torch.LongTensor(np.array([label])))
# 模型預測
predict = model(x)
# 調用rightness函數計算准確度
right = rightness(predict, y)
# 計算loss
loss = cost(predict, y)
return predict, right, loss
訓練循環部分
#循環10個Epoch
for epoch in range(10):
for i, data in enumerate(zip(train_data, train_label)):
x, y = data
# 調用上面編寫的訓練函數
# x 即句子向量,y 即標簽(0 or 1)
trainModel(x, y)
# 每隔3000步,跑一下校驗數據集的數據,輸出臨時結果
if i % 3000 == 0:
val_losses = []
rights = []
# 在所有校驗數據集上實驗
for j, val in enumerate(zip(valid_data, valid_label)):
x, y = val
# 調用模型測試函數
predict, right, loss = evaluateModel(x, y)
rights.append(right)
val_losses.append(loss.data.numpy())
# 將校驗集合上面的平均准確度計算出來
right_ratio = 1.0 * np.sum([i[0] for i in rights]) / np.sum([i[1] for i in rights])
print('第{}輪,訓練損失:{:.2f}, 校驗損失:{:.2f}, 校驗准確率: {:.2f}'.format(epoch, np.mean(losses),np.mean(val_losses), right_ratio))
records.append([np.mean(losses), np.mean(val_losses), right_ratio])
繪制圖像觀察模型訓練情況
損失函數和准確度的曲線畫出來
# 繪制誤差曲線
a = [i[0] for i in records]
b = [i[1] for i in records]
c = [i[2] for i in records]
plt.plot(a, label = 'Train Loss')
plt.plot(b, label = 'Valid Loss')
plt.plot(c, label = 'Valid Accuracy')
plt.xlabel('Steps')
plt.ylabel('Loss & Accuracy')
plt.legend()
在整個測試集上運行,記錄預測結果,並計算總的正確率。
vals = [] #記錄准確率所用列表
#對測試數據集進行循環
for data, target in zip(test_data, test_label):
data, target = Variable(torch.FloatTensor(data).view(1,-1)), Variable(torch.LongTensor(np.array([target])))
output = model(data) #將特征數據喂入網絡,得到分類的輸出
val = rightness(output, target) #獲得正確樣本數以及總樣本數
vals.append(val) #記錄結果
#計算准確率
rights = (sum([tup[0] for tup in vals]), sum([tup[1] for tup in vals]))
right_rate = 1.0 * rights[0].data.numpy() / rights[1]
print(right_rate)
四、基於 RNN 的簡單文本分類器
了解 RNN 模型如何實現,以及考察它們在測試數據集上的分類准確度。
普通RNN模型
正如詞袋模型那樣,首先加載並預處理數據。
與詞袋模型不同,在這次 RNN 的實驗中,不去除評論語料中的標點符號。
# 需要重新數據預處理,主要是要加上標點符號,它對於RNN起到重要作用
# 數據來源文件
good_file = 'data/good.txt'
bad_file = 'data/bad.txt'
pos_sentences, neg_sentences, diction = Prepare_data(good_file, bad_file, False)
dataset = []
labels = []
sentences = []
# 正例集合
for sentence in pos_sentences:
new_sentence = []
for l in sentence:
if l in diction:
# 注意將每個詞編碼
new_sentence.append(word2index(l, diction))
#每一個句子都是一個不等長的整數序列
dataset.append(new_sentence)
labels.append(0)
sentences.append(sentence)
# 反例集合
for sentence in neg_sentences:
new_sentence = []
for l in sentence:
if l in diction:
new_sentence.append(word2index(l, diction))
dataset.append(new_sentence)
labels.append(1)
sentences.append(sentence)
# 重新對數據洗牌,構造數據集合
indices = np.random.permutation(len(dataset))
dataset = [dataset[i] for i in indices]
labels = [labels[i] for i in indices]
sentences = [sentences[i] for i in indices]
test_size = len(dataset) // 10
# 訓練集
train_data = dataset[2 * test_size :]
train_label = labels[2 * test_size :]
# 校驗集
valid_data = dataset[: test_size]
valid_label = labels[: test_size]
# 測試集
test_data = dataset[test_size : 2 * test_size]
test_label = labels[test_size : 2 * test_size]
采用手動編寫模型的方式,以熟悉 RNN 的結構與工作流程。本次要搭建的 RNN 模型結構如下:

# 一個手動實現的RNN模型
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(RNN, self).__init__()
self.hidden_size = hidden_size
# 一個embedding層
self.embed = nn.Embedding(input_size, hidden_size)
# 隱含層內部的相互鏈接
self.i2h = nn.Linear(2 * hidden_size, hidden_size)
# 隱含層到輸出層的鏈接
self.i2o = nn.Linear(hidden_size, output_size)
self.softmax = nn.LogSoftmax(dim=1)
def forward(self, input, hidden):
# 先進行embedding層的計算,它可以把一個數或者數列,映射成一個向量或一組向量
# input尺寸:seq_length, 1
x = self.embed(input)
# x尺寸:hidden_size
# 將輸入和隱含層的輸出(hidden)耦合在一起構成了后續的輸入
combined = torch.cat((x.view(1, -1), hidden), 1)
# combined尺寸:2*hidden_size
#
# 從輸入到隱含層的計算
hidden = self.i2h(combined)
# combined尺寸:hidden_size
# 從隱含層到輸出層的運算
output = self.i2o(hidden)
# output尺寸:output_size
# softmax函數
output = self.softmax(output)
return output, hidden
def initHidden(self):
# 對隱含單元的初始化
# 注意尺寸是:batch_size, hidden_size
return Variable(torch.zeros(1, self.hidden_size))
訓練這個 RNN 網絡。
# 開始訓練這個RNN,10個隱含層單元
rnn = RNN(len(diction), 10, 2)
# 交叉熵評價函數
cost = torch.nn.NLLLoss()
# Adam優化器
optimizer = torch.optim.Adam(rnn.parameters(), lr = 0.001)
records = []
# 學習周期10次
losses = []
for epoch in range(10):
for i, data in enumerate(zip(train_data, train_label)):
x, y = data
x = Variable(torch.LongTensor(x))
#x尺寸:seq_length(序列的長度)
y = Variable(torch.LongTensor([y]))
#x尺寸:batch_size = 1,1
optimizer.zero_grad()
#初始化隱含層單元全為0
hidden = rnn.initHidden()
# hidden尺寸:batch_size = 1, hidden_size
#手動實現RNN的時間步循環,x的長度就是總的循環時間步,因為要把x中的輸入句子全部讀取完畢
for s in range(x.size()[0]):
output, hidden = rnn(x[s], hidden)
#校驗函數
loss = cost(output, y)
losses.append(loss.data.numpy())
loss.backward()
# 開始優化
optimizer.step()
if i % 3000 == 0:
# 每間隔3000步來一次校驗集上面的計算
val_losses = []
rights = []
for j, val in enumerate(zip(valid_data, valid_label)):
x, y = val
x = Variable(torch.LongTensor(x))
y = Variable(torch.LongTensor(np.array([y])))
hidden = rnn.initHidden()
for s in range(x.size()[0]):
output, hidden = rnn(x[s], hidden)
right = rightness(output, y)
rights.append(right)
loss = cost(output, y)
val_losses.append(loss.data.numpy())
# 計算准確度
right_ratio = 1.0 * np.sum([i[0] for i in rights]) / np.sum([i[1] for i in rights])
print('第{}輪,訓練損失:{:.2f}, 測試損失:{:.2f}, 測試准確率: {:.2f}'.format(epoch, np.mean(losses),np.mean(val_losses), right_ratio))
records.append([np.mean(losses), np.mean(val_losses), right_ratio])
繪制 RNN 模型的誤差曲線
# 繪制誤差曲線
a = [i[0] for i in records]
b = [i[1] for i in records]
c = [i[2] for i in records]
plt.plot(a, label = 'Train Loss')
plt.plot(b, label = 'Valid Loss')
plt.plot(c, label = 'Valid Accuracy')
plt.xlabel('Steps')
plt.ylabel('Loss & Accuracy')
plt.legend()
在測試集上運行,並計算准確率。
vals = [] #記錄准確率所用列表
rights = list(rights)
#對測試數據集進行循環
for j, test in enumerate(zip(test_data, test_label)):
x, y = test
x = Variable(torch.LongTensor(x))
y = Variable(torch.LongTensor(np.array([y])))
hidden = rnn.initHidden()
for s in range(x.size()[0]):
output, hidden = rnn(x[s], hidden)
right = rightness(output, y)
rights.append(right)
val = rightness(output, y) #獲得正確樣本數以及總樣本數
vals.append(val) #記錄結果
#計算准確率
rights = (sum([tup[0] for tup in vals]), sum([tup[1] for tup in vals]))
right_rate = 1.0 * rights[0].data.numpy() / rights[1]
right_rate
將模型保存下來。
# 保存、加載模型(為講解用)
torch.save(rnn, 'rnn.mdl')
rnn = torch.load('rnn.mdl')
五、基於 LSTM 的簡單文本分類器
普通 RNN 的效果並不好,可以嘗試利用改進型的 RNN,即 LSTM。
LSTM 與 RNN 最大的區別就是在於每個神經元中多增加了3個控制門:遺忘門、輸入門和輸出門。
另外,在每個隱含層神經元中,LSTM 多了一個 cell 的狀態,起到了記憶的作用。
這就使得 LSTM 可以記憶更長時間的 Pattern。
class LSTMNetwork(nn.Module):
def __init__(self, input_size, hidden_size, n_layers=1):
super(LSTMNetwork, self).__init__()
self.n_layers = n_layers
self.hidden_size = hidden_size
# LSTM的構造如下:一個embedding層,將輸入的任意一個單詞映射為一個向量
# 一個LSTM隱含層,共有hidden_size個LSTM神經元
# 一個全鏈接層,外接一個softmax輸出
self.embedding = nn.Embedding(input_size, hidden_size)
self.lstm = nn.LSTM(hidden_size, hidden_size, n_layers)
self.fc = nn.Linear(hidden_size, 2)
self.logsoftmax = nn.LogSoftmax(dim=1)
def forward(self, input, hidden=None):
#input尺寸: seq_length
#詞向量嵌入
embedded = self.embedding(input)
#embedded尺寸: seq_length, hidden_size
#PyTorch設計的LSTM層有一個特別別扭的地方是,輸入張量的第一個維度需要是時間步,
#第二個維度才是batch_size,所以需要對embedded變形
embedded = embedded.view(input.data.size()[0], 1, self.hidden_size)
#embedded尺寸: seq_length, batch_size = 1, hidden_size
#調用PyTorch自帶的LSTM層函數,注意有兩個輸入,一個是輸入層的輸入,另一個是隱含層自身的輸入
# 輸出output是所有步的隱含神經元的輸出結果,hidden是隱含層在最后一個時間步的狀態。
# 注意hidden是一個tuple,包含了最后時間步的隱含層神經元的輸出,以及每一個隱含層神經元的cell的狀態
output, hidden = self.lstm(embedded, hidden)
#output尺寸: seq_length, batch_size = 1, hidden_size
#hidden尺寸: 二元組(n_layer = 1 * batch_size = 1 * hidden_size, n_layer = 1 * batch_size = 1 * hidden_size)
#我們要把最后一個時間步的隱含神經元輸出結果拿出來,送給全連接層
output = output[-1,...]
#output尺寸: batch_size = 1, hidden_size
#全鏈接層
out = self.fc(output)
#out尺寸: batch_size = 1, output_size
# softmax
out = self.logsoftmax(out)
return out
def initHidden(self):
# 對隱單元的初始化
# 對隱單元輸出的初始化,全0.
# 注意hidden和cell的維度都是layers,batch_size,hidden_size
hidden = Variable(torch.zeros(self.n_layers, 1, self.hidden_size))
# 對隱單元內部的狀態cell的初始化,全0
cell = Variable(torch.zeros(self.n_layers, 1, self.hidden_size))
return (hidden, cell)
訓練 LSTM 模型。
# 開始訓練LSTM網絡
# 構造一個LSTM網絡的實例
lstm = LSTMNetwork(len(diction), 10, 2)
#定義損失函數
cost = torch.nn.NLLLoss()
#定義優化器
optimizer = torch.optim.Adam(lstm.parameters(), lr = 0.001)
records = []
# 開始訓練,一共15個epoch
losses = []
for epoch in range(15):
for i, data in enumerate(zip(train_data, train_label)):
x, y = data
x = Variable(torch.LongTensor(x))
#x尺寸:seq_length,序列的長度
y = Variable(torch.LongTensor([y]))
#y尺寸:batch_size = 1, 1
optimizer.zero_grad()
#初始化LSTM隱含層單元的狀態
hidden = lstm.initHidden()
#hidden: 二元組(n_layer = 1 * batch_size = 1 * hidden_size, n_layer = 1 * batch_size = 1 * hidden_size)
#讓LSTM開始做運算,注意,不需要手工編寫對時間步的循環,而是直接交給PyTorch的LSTM層。
#它自動會根據數據的維度計算若干時間步
output = lstm(x, hidden)
#output尺寸: batch_size = 1, output_size
#損失函數
loss = cost(output, y)
losses.append(loss.data.numpy())
#反向傳播
loss.backward()
optimizer.step()
#每隔3000步,跑一次校驗集,並打印結果
if i % 3000 == 0:
val_losses = []
rights = []
for j, val in enumerate(zip(valid_data, valid_label)):
x, y = val
x = Variable(torch.LongTensor(x))
y = Variable(torch.LongTensor(np.array([y])))
hidden = lstm.initHidden()
output = lstm(x, hidden)
#計算校驗數據集上的分類准確度
right = rightness(output, y)
rights.append(right)
loss = cost(output, y)
val_losses.append(loss.data.numpy())
right_ratio = 1.0 * np.sum([i[0] for i in rights]) / np.sum([i[1] for i in rights])
print('第{}輪,訓練損失:{:.2f}, 測試損失:{:.2f}, 測試准確率: {:.2f}'.format(epoch, np.mean(losses),np.mean(val_losses), right_ratio))
records.append([np.mean(losses), np.mean(val_losses), right_ratio])
在測試集上計算總的正確率。
vals = [] #記錄准確率所用列表
rights = list(rights)
#對測試數據集進行循環
for j, test in enumerate(zip(test_data, test_label)):
x, y = test
x = Variable(torch.LongTensor(x))
y = Variable(torch.LongTensor(np.array([y])))
hidden = lstm.initHidden()
output = lstm(x, hidden)
right = rightness(output, y)
rights.append(right)
val = rightness(output, y) #獲得正確樣本數以及總樣本數
vals.append(val) #記錄結果
#計算准確率
rights = (sum([tup[0] for tup in vals]), sum([tup[1] for tup in vals]))
right_rate = 1.0 * rights[0].data.numpy() / rights[1]
right_rate
保存模型。
#保存、加載模型(為講解用)
torch.save(lstm, 'lstm.mdl')
lstm = torch.load('lstm.mdl')
六、總結
嘗試了兩種 RNN 網絡,一種是普通的 RNN,另一種是 LSTM。展示了它們在處理同樣的文本分類問題上的用法。
