PyTorch基礎——使用神經網絡識別文字中的情感信息


一、介紹

知識點

  • 使用 Python 從網絡上爬取信息的基本方法
  • 處理語料“洗數據”的基本方法
  • 詞袋模型搭建方法
  • 簡單 RNN 的搭建方法
  • 簡單 LSTM 的搭建方法

二、從網絡中抓取並處理數據

引入相關包

下載數據 網盤鏈接:https://pan.baidu.com/s/1Jg5NPxc9L-M8Tdgh70Tvig 提取碼:dpqq

# 導入程序所需要的程序包

#PyTorch用的包
import torch
import torch.nn as nn
import torch.optim
from torch.autograd import Variable

# 自然語言處理相關的包
import re #正則表達式的包
import jieba #結巴分詞包
from collections import Counter #搜集器,可以讓統計詞頻更簡單

#繪圖、計算用的程序包
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

數據預處理(洗數據)

提供了爬取好的評論文本在 data/good.txt 以及 data/bad.txt 中。

# 將文本中的標點符號過濾掉

def filter_punc(sentence):
    sentence = re.sub("[\s+\.\!\/_,$%^*(+\"\'“”《》?“]+|[+——!,。?、~@#¥%……&*():]+", "", sentence)  
    return(sentence)

在下面的函數代碼中,首先會循環文本文件中的每行評論,過濾標點符號,分詞。

將分詞好的評論分別放置在 pos_sentencesneg_sentences 中。

# 掃描所有的文本,分詞、建立詞典,分出正向還是負向的評論,is_filter可以過濾是否篩選掉標點符號
def Prepare_data(good_file, bad_file, is_filter = True):
    all_words = [] #存儲所有的單詞
    pos_sentences = [] #存儲正向的評論
    neg_sentences = [] #存儲負向的評論
    with open(good_file, 'r') as fr:
        for idx, line in enumerate(fr):
            if is_filter:
                #過濾標點符號
                line = filter_punc(line)
            #分詞
            words = jieba.lcut(line)
            if len(words) > 0:
                all_words += words
                pos_sentences.append(words)
    print('{0} 包含 {1} 行, {2} 個詞.'.format(good_file, idx+1, len(all_words)))

    count = len(all_words)
    with open(bad_file, 'r') as fr:
        for idx, line in enumerate(fr):
            if is_filter:
                line = filter_punc(line)
            words = jieba.lcut(line)
            if len(words) > 0:
                all_words += words
                neg_sentences.append(words)
    print('{0} 包含 {1} 行, {2} 個詞.'.format(bad_file, idx+1, len(all_words)-count))

    #建立詞典,diction的每一項為{w:[id, 單詞出現次數]}
    diction = {}
    cnt = Counter(all_words)
    for word, freq in cnt.items():
        diction[word] = [len(diction), freq]
    print('字典大小:{}'.format(len(diction)))
    return(pos_sentences, neg_sentences, diction)

這兩個函數一個是用“詞”來查“索引號”,一個是用“索引號”來查“詞”。

#根據單詞返還單詞的編碼
def word2index(word, diction):
    if word in diction:
        value = diction[word][0]
    else:
        value = -1
    return(value)

#根據編碼獲得單詞
def index2word(index, diction):
    for w,v in diction.items():
        if v[0] == index:
            return(w)
    return(None)

下面將讀取包含商品評論信息的兩個文本文件。

good_file = 'data/good.txt'
bad_file  = 'data/bad.txt'

pos_sentences, neg_sentences, diction = Prepare_data(good_file, bad_file, True)
st = sorted([(v[1], w) for w, v in diction.items()])
print(st)

三、基於詞袋模型的簡單文本分類器

詞帶模型就是將一句話中的所有單詞都放進一個袋子里(單詞表),而忽略掉語法、語義,甚至單詞之間的順序這些所有的信息。最終只關心每一個單詞的數量,然后根據這個數量來建立對句子進行表征的向量。

訓練數據准備

輸入一個句子和相應的詞典,得到這個句子的向量化表示。

# 輸入一個句子和相應的詞典,得到這個句子的向量化表示。
def sentence2vec(sentence, dictionary):
    vector = np.zeros(len(dictionary))
    for l in sentence:
        vector[l] += 1
    return(1.0 * vector / len(sentence))

以句子為單位,將所有的積極情感的評論文本,全部轉化為句子向量,並保存到數據集 dataset 中。

# 遍歷所有句子,將每一個詞映射成編碼
dataset = [] #數據集
labels = [] #標簽
sentences = [] #原始句子,調試用

# 處理正向評論
for sentence in pos_sentences:
    new_sentence = []
    for l in sentence:
        if l in diction:
            new_sentence.append(word2index(l, diction))
    dataset.append(sentence2vec(new_sentence, diction))
    labels.append(0) #正標簽為0
    sentences.append(sentence)

對於消極情緒的評論如法炮制

# 處理負向評論
for sentence in neg_sentences:
    new_sentence = []
    for l in sentence:
        if l in diction:
            new_sentence.append(word2index(l, diction))
    dataset.append(sentence2vec(new_sentence, diction))
    labels.append(1) #負標簽為1
    sentences.append(sentence)

下面進行數據集的切分。

#打亂所有的數據順序,形成數據集
# indices為所有數據下標的一個全排列
indices = np.random.permutation(len(dataset))

#重新根據打亂的下標生成數據集dataset,標簽集labels,以及對應的原始句子sentences
dataset = [dataset[i] for i in indices]
labels = [labels[i] for i in indices]
sentences = [sentences[i] for i in indices]

#對整個數據集進行划分,分為:訓練集、校准集和測試集,其中校准和測試集合的長度都是整個數據集的10分之一
test_size = len(dataset) // 10
train_data = dataset[2 * test_size :]
train_label = labels[2 * test_size :]

valid_data = dataset[: test_size]
valid_label = labels[: test_size]

test_data = dataset[test_size : 2 * test_size]
test_label = labels[test_size : 2 * test_size]

模型定義

建立多層前饋網絡

# 一個簡單的前饋神經網絡,三層,第一層線性層,加一個非線性ReLU,第二層線性層,中間有10個隱含層神經元
# 輸入維度為詞典的大小:每一段評論的詞袋模型
model = nn.Sequential(
    nn.Linear(len(diction), 10),
    nn.ReLU(),
    nn.Linear(10, 2),
    nn.LogSoftmax(dim=1),
)

編寫的是計算預測錯誤率的函數。

def rightness(predictions, labels):
    """計算預測錯誤率的函數,其中predictions是模型給出的一組預測結果,
       batch_size行num_classes列的矩陣,labels是數據之中的正確答案"""
    
    # 對於任意一行(一個樣本)的輸出值的第1個維度,求最大,得到每一行的最大元素的下標
    pred = torch.max(predictions.data, 1)[1] 
    #將下標與labels中包含的類別進行比較,並累計得到比較正確的數量
    rights = pred.eq(labels.data.view_as(pred)).sum() 
    #返回正確的數量和這一次一共比較了多少元素
    return rights, len(labels) 

訓練模型

建立好神經網絡之后,就可以進行訓練了。

# 損失函數為交叉熵
cost = torch.nn.NLLLoss()
# 優化算法為Adam,可以自動調節學習率
optimizer = torch.optim.Adam(model.parameters(), lr = 0.01)
# 記錄列表,記錄訓練時的各種數據,以用於繪圖
records = []

# loss 列表,用於記錄訓練中的 loss
losses = []

def trainModel(data, label):
    # 需要將輸入的數據進行適當的變形,主要是要多出一個batch_size的維度,也即第一個為1的維度
    # 這樣做是為了適應 PyTorch 函數的特殊用法,具體可以參考 PyTorch 官方文檔
    x = Variable(torch.FloatTensor(data).view(1,-1))
    # x的尺寸:batch_size=1, len_dictionary
    # 標簽也要加一層外衣以變成1*1的張量
    y = Variable(torch.LongTensor(np.array([label])))
    # y的尺寸:batch_size=1, 1

    # 清空梯度
    optimizer.zero_grad()
    # 模型預測
    predict = model(x)
    # 計算損失函數
    loss = cost(predict, y)
    # 將損失函數數值加入到列表中
    losses.append(loss.data.numpy())
    # 開始進行梯度反傳
    loss.backward()
    # 開始對參數進行一步優化
    optimizer.step()

驗證模型的函數

def evaluateModel(data, label):
    x = Variable(torch.FloatTensor(data).view(1, -1))
    y = Variable(torch.LongTensor(np.array([label])))
    # 模型預測
    predict = model(x)
    # 調用rightness函數計算准確度
    right = rightness(predict, y)
    # 計算loss
    loss = cost(predict, y)

    return predict, right, loss

訓練循環部分

#循環10個Epoch
for epoch in range(10):
    for i, data in enumerate(zip(train_data, train_label)):
        x, y = data
        # 調用上面編寫的訓練函數
        # x 即句子向量,y 即標簽(0 or 1)
        trainModel(x, y)

        # 每隔3000步,跑一下校驗數據集的數據,輸出臨時結果
        if i % 3000 == 0:
            val_losses = []
            rights = []
            # 在所有校驗數據集上實驗
            for j, val in enumerate(zip(valid_data, valid_label)):
                x, y = val
                # 調用模型測試函數
                predict, right, loss = evaluateModel(x, y)
                rights.append(right)
                val_losses.append(loss.data.numpy())

            # 將校驗集合上面的平均准確度計算出來
            right_ratio = 1.0 * np.sum([i[0] for i in rights]) / np.sum([i[1] for i in rights])
            print('第{}輪,訓練損失:{:.2f}, 校驗損失:{:.2f}, 校驗准確率: {:.2f}'.format(epoch, np.mean(losses),np.mean(val_losses), right_ratio))
            records.append([np.mean(losses), np.mean(val_losses), right_ratio])

繪制圖像觀察模型訓練情況

損失函數和准確度的曲線畫出來

# 繪制誤差曲線
a = [i[0] for i in records]
b = [i[1] for i in records]
c = [i[2] for i in records]
plt.plot(a, label = 'Train Loss')
plt.plot(b, label = 'Valid Loss')
plt.plot(c, label = 'Valid Accuracy')
plt.xlabel('Steps')
plt.ylabel('Loss & Accuracy')
plt.legend()

在整個測試集上運行,記錄預測結果,並計算總的正確率。

vals = [] #記錄准確率所用列表

#對測試數據集進行循環
for data, target in zip(test_data, test_label):
    data, target = Variable(torch.FloatTensor(data).view(1,-1)), Variable(torch.LongTensor(np.array([target])))
    output = model(data) #將特征數據喂入網絡,得到分類的輸出
    val = rightness(output, target) #獲得正確樣本數以及總樣本數
    vals.append(val) #記錄結果

#計算准確率
rights = (sum([tup[0] for tup in vals]), sum([tup[1] for tup in vals]))
right_rate = 1.0 * rights[0].data.numpy() / rights[1]
print(right_rate)

四、基於 RNN 的簡單文本分類器

了解 RNN 模型如何實現,以及考察它們在測試數據集上的分類准確度。

普通RNN模型

正如詞袋模型那樣,首先加載並預處理數據。

與詞袋模型不同,在這次 RNN 的實驗中,不去除評論語料中的標點符號。

# 需要重新數據預處理,主要是要加上標點符號,它對於RNN起到重要作用
# 數據來源文件
good_file = 'data/good.txt'
bad_file  = 'data/bad.txt'

pos_sentences, neg_sentences, diction = Prepare_data(good_file, bad_file, False)
dataset = []
labels = []
sentences = []

# 正例集合
for sentence in pos_sentences:
    new_sentence = []
    for l in sentence:
        if l in diction:
            # 注意將每個詞編碼
            new_sentence.append(word2index(l, diction))
    #每一個句子都是一個不等長的整數序列
    dataset.append(new_sentence)
    labels.append(0)
    sentences.append(sentence)

# 反例集合
for sentence in neg_sentences:
    new_sentence = []
    for l in sentence:
        if l in diction:
            new_sentence.append(word2index(l, diction))
    dataset.append(new_sentence)
    labels.append(1)
    sentences.append(sentence)

# 重新對數據洗牌,構造數據集合
indices = np.random.permutation(len(dataset))
dataset = [dataset[i] for i in indices]
labels = [labels[i] for i in indices]
sentences = [sentences[i] for i in indices]

test_size = len(dataset) // 10

# 訓練集
train_data = dataset[2 * test_size :]
train_label = labels[2 * test_size :]

# 校驗集
valid_data = dataset[: test_size]
valid_label = labels[: test_size]

# 測試集
test_data = dataset[test_size : 2 * test_size]
test_label = labels[test_size : 2 * test_size]

采用手動編寫模型的方式,以熟悉 RNN 的結構與工作流程。本次要搭建的 RNN 模型結構如下:

此處輸入圖片的描述

# 一個手動實現的RNN模型
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(RNN, self).__init__()

        self.hidden_size = hidden_size
        # 一個embedding層
        self.embed = nn.Embedding(input_size, hidden_size)
        # 隱含層內部的相互鏈接
        self.i2h = nn.Linear(2 * hidden_size, hidden_size)
        # 隱含層到輸出層的鏈接
        self.i2o = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):

        # 先進行embedding層的計算,它可以把一個數或者數列,映射成一個向量或一組向量
        # input尺寸:seq_length, 1
        x = self.embed(input)
        # x尺寸:hidden_size

        # 將輸入和隱含層的輸出(hidden)耦合在一起構成了后續的輸入
        combined = torch.cat((x.view(1, -1), hidden), 1)
        # combined尺寸:2*hidden_size
        #
        # 從輸入到隱含層的計算
        hidden = self.i2h(combined)
        # combined尺寸:hidden_size

        # 從隱含層到輸出層的運算
        output = self.i2o(hidden)
        # output尺寸:output_size

        # softmax函數
        output = self.softmax(output)
        return output, hidden

    def initHidden(self):
        # 對隱含單元的初始化
        # 注意尺寸是:batch_size, hidden_size
        return Variable(torch.zeros(1, self.hidden_size))

訓練這個 RNN 網絡。

# 開始訓練這個RNN,10個隱含層單元
rnn = RNN(len(diction), 10, 2)

# 交叉熵評價函數
cost = torch.nn.NLLLoss()

# Adam優化器
optimizer = torch.optim.Adam(rnn.parameters(), lr = 0.001)
records = []

# 學習周期10次
losses = []
for epoch in range(10):

    for i, data in enumerate(zip(train_data, train_label)):
        x, y = data
        x = Variable(torch.LongTensor(x))
        #x尺寸:seq_length(序列的長度)
        y = Variable(torch.LongTensor([y]))
        #x尺寸:batch_size = 1,1
        optimizer.zero_grad()

        #初始化隱含層單元全為0
        hidden = rnn.initHidden()
        # hidden尺寸:batch_size = 1, hidden_size

        #手動實現RNN的時間步循環,x的長度就是總的循環時間步,因為要把x中的輸入句子全部讀取完畢
        for s in range(x.size()[0]):
            output, hidden = rnn(x[s], hidden)

        #校驗函數
        loss = cost(output, y)
        losses.append(loss.data.numpy())
        loss.backward()
        # 開始優化
        optimizer.step()
        if i % 3000 == 0:
            # 每間隔3000步來一次校驗集上面的計算
            val_losses = []
            rights = []
            for j, val in enumerate(zip(valid_data, valid_label)):
                x, y = val
                x = Variable(torch.LongTensor(x))
                y = Variable(torch.LongTensor(np.array([y])))
                hidden = rnn.initHidden()
                for s in range(x.size()[0]):
                    output, hidden = rnn(x[s], hidden)
                right = rightness(output, y)
                rights.append(right)
                loss = cost(output, y)
                val_losses.append(loss.data.numpy())
            # 計算准確度
            right_ratio = 1.0 * np.sum([i[0] for i in rights]) / np.sum([i[1] for i in rights])
            print('第{}輪,訓練損失:{:.2f}, 測試損失:{:.2f}, 測試准確率: {:.2f}'.format(epoch, np.mean(losses),np.mean(val_losses), right_ratio))
            records.append([np.mean(losses), np.mean(val_losses), right_ratio])

繪制 RNN 模型的誤差曲線

# 繪制誤差曲線
a = [i[0] for i in records]
b = [i[1] for i in records]
c = [i[2] for i in records]
plt.plot(a, label = 'Train Loss')
plt.plot(b, label = 'Valid Loss')
plt.plot(c, label = 'Valid Accuracy')
plt.xlabel('Steps')
plt.ylabel('Loss & Accuracy')
plt.legend()

在測試集上運行,並計算准確率。

vals = [] #記錄准確率所用列表
rights = list(rights)
#對測試數據集進行循環
for j, test in enumerate(zip(test_data, test_label)):
    x, y = test
    x = Variable(torch.LongTensor(x))
    y = Variable(torch.LongTensor(np.array([y])))
    hidden = rnn.initHidden()
    for s in range(x.size()[0]):
        output, hidden = rnn(x[s], hidden)
    right = rightness(output, y)
    rights.append(right)
    val = rightness(output, y) #獲得正確樣本數以及總樣本數
    vals.append(val) #記錄結果

#計算准確率
rights = (sum([tup[0] for tup in vals]), sum([tup[1] for tup in vals]))
right_rate = 1.0 * rights[0].data.numpy() / rights[1]
right_rate

將模型保存下來。

# 保存、加載模型(為講解用)
torch.save(rnn, 'rnn.mdl')
rnn = torch.load('rnn.mdl')

五、基於 LSTM 的簡單文本分類器

普通 RNN 的效果並不好,可以嘗試利用改進型的 RNN,即 LSTM。

LSTM 與 RNN 最大的區別就是在於每個神經元中多增加了3個控制門:遺忘門、輸入門和輸出門。

另外,在每個隱含層神經元中,LSTM 多了一個 cell 的狀態,起到了記憶的作用。

這就使得 LSTM 可以記憶更長時間的 Pattern。

class LSTMNetwork(nn.Module):
    def __init__(self, input_size, hidden_size, n_layers=1):
        super(LSTMNetwork, self).__init__()
        self.n_layers = n_layers
        self.hidden_size = hidden_size

        # LSTM的構造如下:一個embedding層,將輸入的任意一個單詞映射為一個向量
        # 一個LSTM隱含層,共有hidden_size個LSTM神經元
        # 一個全鏈接層,外接一個softmax輸出
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, n_layers)
        self.fc = nn.Linear(hidden_size, 2)
        self.logsoftmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden=None):

        #input尺寸: seq_length
        #詞向量嵌入
        embedded = self.embedding(input)
        #embedded尺寸: seq_length, hidden_size

        #PyTorch設計的LSTM層有一個特別別扭的地方是,輸入張量的第一個維度需要是時間步,
        #第二個維度才是batch_size,所以需要對embedded變形
        embedded = embedded.view(input.data.size()[0], 1, self.hidden_size)
        #embedded尺寸: seq_length, batch_size = 1, hidden_size

        #調用PyTorch自帶的LSTM層函數,注意有兩個輸入,一個是輸入層的輸入,另一個是隱含層自身的輸入
        # 輸出output是所有步的隱含神經元的輸出結果,hidden是隱含層在最后一個時間步的狀態。
        # 注意hidden是一個tuple,包含了最后時間步的隱含層神經元的輸出,以及每一個隱含層神經元的cell的狀態

        output, hidden = self.lstm(embedded, hidden)
        #output尺寸: seq_length, batch_size = 1, hidden_size
        #hidden尺寸: 二元組(n_layer = 1 * batch_size = 1 * hidden_size, n_layer = 1 * batch_size = 1 * hidden_size)

        #我們要把最后一個時間步的隱含神經元輸出結果拿出來,送給全連接層
        output = output[-1,...]
        #output尺寸: batch_size = 1, hidden_size

        #全鏈接層
        out = self.fc(output)
        #out尺寸: batch_size = 1, output_size
        # softmax
        out = self.logsoftmax(out)
        return out

    def initHidden(self):
        # 對隱單元的初始化

        # 對隱單元輸出的初始化,全0.
        # 注意hidden和cell的維度都是layers,batch_size,hidden_size
        hidden = Variable(torch.zeros(self.n_layers, 1, self.hidden_size))
        # 對隱單元內部的狀態cell的初始化,全0
        cell = Variable(torch.zeros(self.n_layers, 1, self.hidden_size))
        return (hidden, cell)

訓練 LSTM 模型。

# 開始訓練LSTM網絡

# 構造一個LSTM網絡的實例
lstm = LSTMNetwork(len(diction), 10, 2)

#定義損失函數
cost = torch.nn.NLLLoss()

#定義優化器
optimizer = torch.optim.Adam(lstm.parameters(), lr = 0.001)
records = []

# 開始訓練,一共15個epoch
losses = []
for epoch in range(15):
    for i, data in enumerate(zip(train_data, train_label)):
        x, y = data
        x = Variable(torch.LongTensor(x))
        #x尺寸:seq_length,序列的長度
        y = Variable(torch.LongTensor([y]))
        #y尺寸:batch_size = 1, 1
        optimizer.zero_grad()

        #初始化LSTM隱含層單元的狀態
        hidden = lstm.initHidden()
        #hidden: 二元組(n_layer = 1 * batch_size = 1 * hidden_size, n_layer = 1 * batch_size = 1 * hidden_size)

        #讓LSTM開始做運算,注意,不需要手工編寫對時間步的循環,而是直接交給PyTorch的LSTM層。
        #它自動會根據數據的維度計算若干時間步
        output = lstm(x, hidden)
        #output尺寸: batch_size = 1, output_size

        #損失函數
        loss = cost(output, y)
        losses.append(loss.data.numpy())

        #反向傳播
        loss.backward()
        optimizer.step()

        #每隔3000步,跑一次校驗集,並打印結果
        if i % 3000 == 0:
            val_losses = []
            rights = []
            for j, val in enumerate(zip(valid_data, valid_label)):
                x, y = val
                x = Variable(torch.LongTensor(x))
                y = Variable(torch.LongTensor(np.array([y])))
                hidden = lstm.initHidden()
                output = lstm(x, hidden)
                #計算校驗數據集上的分類准確度
                right = rightness(output, y)
                rights.append(right)
                loss = cost(output, y)
                val_losses.append(loss.data.numpy())
            right_ratio = 1.0 * np.sum([i[0] for i in rights]) / np.sum([i[1] for i in rights])
            print('第{}輪,訓練損失:{:.2f}, 測試損失:{:.2f}, 測試准確率: {:.2f}'.format(epoch, np.mean(losses),np.mean(val_losses), right_ratio))
            records.append([np.mean(losses), np.mean(val_losses), right_ratio])        

在測試集上計算總的正確率。

vals = [] #記錄准確率所用列表
rights = list(rights)
#對測試數據集進行循環
for j, test in enumerate(zip(test_data, test_label)):
    x, y = test
    x = Variable(torch.LongTensor(x))
    y = Variable(torch.LongTensor(np.array([y])))
    hidden = lstm.initHidden()
    output = lstm(x, hidden)
    right = rightness(output, y)
    rights.append(right)
    val = rightness(output, y) #獲得正確樣本數以及總樣本數
    vals.append(val) #記錄結果

#計算准確率
rights = (sum([tup[0] for tup in vals]), sum([tup[1] for tup in vals]))
right_rate = 1.0 * rights[0].data.numpy() / rights[1]
right_rate

保存模型。

#保存、加載模型(為講解用)
torch.save(lstm, 'lstm.mdl')
lstm = torch.load('lstm.mdl')

六、總結

嘗試了兩種 RNN 網絡,一種是普通的 RNN,另一種是 LSTM。展示了它們在處理同樣的文本分類問題上的用法。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM