Bilstm中文微博多情感分析


Bilstm中文微博多情感分析

數據

我的數據是來自github的一個項目:ChineseNlpCorpus 里面收集了蠻多用於自然語言處理的中文數據集/語料。

下載地址: 百度網盤
數據概覽: 36 萬多條,帶情感標注 新浪微博,包含 4 種情感,其中喜悅約 20 萬條,憤怒、厭惡、低落各約 5 萬條
數據來源: 新浪微博
原數據集: 微博情感分析數據集,網上搜集,具體作者、來源不詳

預處理

划分訓練集和測試集

將表中的各類情感的數據各抽取前10000條做測試集,剩余的用作訓練集

import pandas as pd import openpyxl from openpyxl.cell.cell import ILLEGAL_CHARACTERS_RE def test(): file='simplifyweibo_4_moods.csv' data=pd.read_csv(file) label=data.get('label') review = data.get('review') train_review=[] train_label=[] test_review=[] test_label=[] n1=n2=n3=n4=0 for i in range(len(review)): lab=int(label[i]) line=str(review[i]) line=ILLEGAL_CHARACTERS_RE.sub(r'', line) if int(lab)==0: if n1<10000: n1+=1 test_label.append(lab) test_review.append(line) else: train_label.append(lab) train_review.append(line) elif int(lab)==1: if n2<10000: n2+=1 test_label.append(lab) test_review.append(line) else: train_label.append(lab) train_review.append(line) elif int(lab)==2: if n3<10000: n3+=1 test_label.append(lab) test_review.append(line) else: train_label.append(lab) train_review.append(line) elif int(lab)==3: if n4<10000: n4+=1 test_label.append(lab) test_review.append(line) else: train_label.append(lab) train_review.append(line) import openpyxl import xlsxwriter xl = openpyxl.Workbook() # 調用對象的add_sheet方法
    sheet1 = xl.create_sheet(index=0) sheet1.cell(1, 1, "label") sheet1.cell(1, 2, "review") for i in range(0, len(train_review)): sheet1.cell(i + 2, 1, train_label[i]) sheet1.cell(i + 2, 2, train_review[i]) xl.save("train.xlsx") if __name__ == '__main__': test()

得到對應的文件

 

 

生成字典

數據中的一些標點符號、特殊符號、英文字母、數字等對於我們的實驗都是沒有用處的,所以我們需要將他們過濾掉。

去除停用詞

def tokenlize(sentence): """ 進行文本分詞 :param sentence: str :return: [str,str,str] """ fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>', '\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '', '', ] sentence = re.sub("|".join(fileters), "", sentence) sentence=jieba.cut(sentence,cut_all=False) sentence=' '.join(sentence) result = [i for i in sentence.split(" ") if len(i) > 0] result=movestopwords(result) return result def stopwordslist(filepath): stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()] return stopwords # 對句子去除停用詞
def movestopwords(sentence): stopwords = stopwordslist('data/stopwords.txt')  # 這里加載停用詞的路徑
    outstr = [] for word in sentence: if word not in stopwords: if word != '\t' and '\n': outstr.append(word) # outstr += " "
    return outstr

設計字典類

""" 文本序列化 """


class Vocab: UNK_TAG = "<UNK>"  # 表示未知字符
    PAD_TAG = "<PAD>"  # 填充符
    PAD = 0 UNK = 1

    def __init__(self): self.dict = {  # 保存詞語和對應的數字
 self.UNK_TAG: self.UNK, self.PAD_TAG: self.PAD } self.count = {}  # 統計詞頻的

    def fit(self, sentence): """ 接受句子,統計詞頻 :param sentence:[str,str,str] :return:None """
        for word in sentence: self.count[word] = self.count.get(word, 0) + 1  # 所有的句子fit之后,self.count就有了所有詞語的詞頻

    def build_vocab(self, min_count=1, max_count=None, max_features=None): """ 根據條件構造 詞典 :param min_count:最小詞頻 :param max_count: 最大詞頻 :param max_features: 最大詞語數 :return: """
        if min_count is not None: self.count = {word: count for word, count in self.count.items() if count >= min_count} if max_count is not None: self.count = {word: count for word, count in self.count.items() if count <= max_count} if max_features is not None: # [(k,v),(k,v)....] --->{k:v,k:v}
            self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_features]) for word in self.count: self.dict[word] = len(self.dict)  # 每次word對應一個數字

        # 把dict進行翻轉
        self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys())) def transform(self, sentence, max_len=None): """ 把句子轉化為數字序列 :param sentence:[str,str,str] :return: [int,int,int] """
        if len(sentence) > max_len: sentence = sentence[:max_len] else: sentence = sentence + [self.PAD_TAG] * (max_len - len(sentence))  # 填充PAD

        return [self.dict.get(i, 1) for i in sentence] def inverse_transform(self, incides): """ 把數字序列轉化為字符 :param incides: [int,int,int] :return: [str,str,str] """
        return [self.inverse_dict.get(i, "<UNK>") for i in incides] def __len__(self): return len(self.dict) # # 以下是調試代碼 # if __name__ == '__main__': # sentences = [["今天", "天氣", "很", "好"], # ["今天", "去", "吃", "什么"]] # ws = Vocab() # for sentence in sentences: # # 統計詞頻 # ws.fit(sentence) # # 構造詞典 # ws.build_vocab(min_count=1) # print(ws.dict) # # 把句子轉換成數字序列 # ret = ws.transform(["好", "好", "好", "好", "好", "好", "好", "熱", "呀"], max_len=13) # print(ret) # # 把數字序列轉換成句子 # ret = ws.inverse_transform(ret) # print(ret) # pass

dataset

# -*-coding:utf-8-*-
import os import pickle import re import zipfile import jieba from torch.utils.data import Dataset, DataLoader from tqdm import tqdm import pandas as pd class ImdbDataset(Dataset): def __init__(self, train=True): if train == True: url = 'data/train.xlsx'
        else: url = "data/test.xlsx" data = pd.read_excel(url) sentence = data.get('review') label = data.get('label') self.sentence_list=sentence self.label_list=label def __getitem__(self, idx): line_text=self.sentence_list[idx] # 從txt獲取評論並分詞
        review = tokenlize(str(line_text)) # 獲取評論對應的label
        label = int(self.label_list[idx]) return review, label def __len__(self): return len(self.sentence_list) def tokenlize(sentence): """ 進行文本分詞 :param sentence: str :return: [str,str,str] """ fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>', '\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '', '', ] sentence = re.sub("|".join(fileters), "", sentence) sentence=jieba.cut(sentence,cut_all=False) sentence=' '.join(sentence) result = [i for i in sentence.split(" ") if len(i) > 0] result=movestopwords(result) return result def stopwordslist(filepath): stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()] return stopwords # 對句子去除停用詞
def movestopwords(sentence): stopwords = stopwordslist('data/stopwords.txt')  # 這里加載停用詞的路徑
    outstr = [] for word in sentence: if word not in stopwords: if word != '\t' and '\n': outstr.append(word) # outstr += " "
    return outstr # 以下為調試代碼
def collate_fn(batch): """ 對batch數據進行處理 :param batch: [一個getitem的結果,getitem的結果,getitem的結果] :return: 元組 """ reviews, labels = zip(*batch) return reviews, labels if __name__ == "__main__": from 情感分析.imdb_sentiment.vocab import Vocab imdb_dataset = ImdbDataset(True) my_dataloader = DataLoader(imdb_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn) for review,label in my_dataloader: vocab_model = pickle.load(open("./models/vocab.pkl", "rb")) print(review[0]) result = vocab_model.transform(review[0], 30) print(result) break

構建字典

# -*-coding:utf-8-*-
import pickle from tqdm import tqdm from 情感分析.weibo_many_emotion import dataset # from 情感分析.imdb_sentiment.vocab import Vocab
from torch.utils.data import DataLoader class Vocab: UNK_TAG = "<UNK>"  # 表示未知字符
    PAD_TAG = "<PAD>"  # 填充符
    PAD = 0 UNK = 1

    def __init__(self): self.dict = {  # 保存詞語和對應的數字
 self.UNK_TAG: self.UNK, self.PAD_TAG: self.PAD } self.count = {}  # 統計詞頻的

    def fit(self, sentence): """ 接受句子,統計詞頻 :param sentence:[str,str,str] :return:None """
        for word in sentence: self.count[word] = self.count.get(word, 0) + 1  # 所有的句子fit之后,self.count就有了所有詞語的詞頻

    def build_vocab(self, min_count=1, max_count=None, max_features=None): """ 根據條件構造 詞典 :param min_count:最小詞頻 :param max_count: 最大詞頻 :param max_features: 最大詞語數 :return: """
        if min_count is not None: self.count = {word: count for word, count in self.count.items() if count >= min_count} if max_count is not None: self.count = {word: count for word, count in self.count.items() if count <= max_count} if max_features is not None: # [(k,v),(k,v)....] --->{k:v,k:v}
            self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_features]) for word in self.count: self.dict[word] = len(self.dict)  # 每次word對應一個數字

        # 把dict進行翻轉
        self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys())) def transform(self, sentence, max_len=None): """ 把句子轉化為數字序列 :param sentence:[str,str,str] :return: [int,int,int] """
        if len(sentence) > max_len: sentence = sentence[:max_len] else: sentence = sentence + [self.PAD_TAG] * (max_len - len(sentence))  # 填充PAD

        return [self.dict.get(i, 1) for i in sentence] def inverse_transform(self, incides): """ 把數字序列轉化為字符 :param incides: [int,int,int] :return: [str,str,str] """
        return [self.inverse_dict.get(i, "<UNK>") for i in incides] def __len__(self): return len(self.dict) def collate_fn(batch): """ 對batch數據進行處理 :param batch: [一個getitem的結果,getitem的結果,getitem的結果] :return: 元組 """ reviews, labels = zip(*batch) return reviews, labels def get_dataloader(train=True): imdb_dataset = dataset.ImdbDataset(train) my_dataloader = DataLoader(imdb_dataset, batch_size=200, shuffle=True, collate_fn=collate_fn) return my_dataloader if __name__ == '__main__': ws = Vocab() dl_train = get_dataloader(True) dl_test = get_dataloader(False) for reviews, label in tqdm(dl_train, total=len(dl_train)): for sentence in reviews: ws.fit(sentence) for reviews, label in tqdm(dl_test, total=len(dl_test)): for sentence in reviews: ws.fit(sentence) ws.build_vocab() print(len(ws)) pickle.dump(ws, open("./models/vocab.pkl", "wb"))

模型訓練

# -*-coding:utf-8-*-
import pickle import torch import torch.nn as nn import torch.nn.functional as F from torch.optim import Adam from torch.utils.data import DataLoader from tqdm import tqdm from 情感分析.weibo_many_emotion import dataset from 情感分析.中文情感分類.vocab import Vocab train_batch_size = 512 test_batch_size = 128 voc_model = pickle.load(open("./models/vocab.pkl", "rb")) sequence_max_len = 100


def collate_fn(batch): """ 對batch數據進行處理 :param batch: [一個getitem的結果,getitem的結果,getitem的結果] :return: 元組 """ reviews, labels = zip(*batch) reviews = torch.LongTensor([voc_model.transform(i, max_len=sequence_max_len) for i in reviews]) labels = torch.LongTensor(labels) return reviews, labels def get_dataloader(train=True): imdb_dataset = dataset.ImdbDataset(train) batch_size = train_batch_size if train else test_batch_size return DataLoader(imdb_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn) class ImdbModel(nn.Module): def __init__(self): super(ImdbModel, self).__init__() self.embedding = nn.Embedding(num_embeddings=len(voc_model), embedding_dim=200, padding_idx=voc_model.PAD).to() self.lstm = nn.LSTM(input_size=200, hidden_size=64, num_layers=2, batch_first=True, bidirectional=True, dropout=0.1) self.fc1 = nn.Linear(64 * 2, 64) self.fc2 = nn.Linear(64, 4) def forward(self, input): """ :param input:[batch_size,max_len] :return: """ input_embeded = self.embedding(input)  # input embeded :[batch_size,max_len,200]
 output, (h_n, c_n) = self.lstm(input_embeded)  # h_n :[4,batch_size,hidden_size]
        # out :[batch_size,hidden_size*2]
        out = torch.cat([h_n[-1, :, :], h_n[-2, :, :]], dim=-1)  # 拼接正向最后一個輸出和反向最后一個輸出

        # 進行全連接
        out_fc1 = self.fc1(out) # 進行relu
        out_fc1_relu = F.relu(out_fc1) # 全連接
        out_fc2 = self.fc2(out_fc1_relu)  # out :[batch_size,2]
        return F.log_softmax(out_fc2, dim=-1) def device(): if torch.cuda.is_available(): return torch.device('cuda') else: return torch.device('cpu') def train(imdb_model, epoch): """ :param imdb_model: :param epoch: :return: """ train_dataloader = get_dataloader(train=True) optimizer = Adam(imdb_model.parameters()) for i in range(epoch): bar = tqdm(train_dataloader, total=len(train_dataloader)) for idx, (data, target) in enumerate(bar): optimizer.zero_grad() data = data.to(device()) target = target.to(device()) output = imdb_model(data) loss = F.nll_loss(output, target) loss.backward() optimizer.step() bar.set_description("epcoh:{} idx:{} loss:{:.6f}".format(i, idx, loss.item())) torch.save(imdb_model, 'lstm_model.pkl') def test(imdb_model): """ 驗證模型 :param imdb_model: :return: """ test_loss = 0 correct = 0 imdb_model.eval() test_dataloader = get_dataloader(train=False) with torch.no_grad(): for data, target in tqdm(test_dataloader): data = data.to(device()) target = target.to(device()) output = imdb_model(data) test_loss += F.nll_loss(output, target, reduction='sum').item() pred = output.data.max(1, keepdim=True)[1]  # 獲取最大值的位置,[batch_size,1]
            correct += pred.eq(target.data.view_as(pred)).sum() test_loss /= len(test_dataloader.dataset) print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format( test_loss, correct, len(test_dataloader.dataset), 100. * correct / len(test_dataloader.dataset))) def xlftest(): import numpy as np model = torch.load('lstm_model.pkl') model.to(device()) from 情感分析.weibo_many_emotion.dataset import tokenlize lines=['哈哈哈開心','真是無語,你們怎么搞的','小姐姐,祝你生日快樂','你他媽的有病'] for line in lines: print(line) review = tokenlize(line) # review=tokenlize(line)
        vocab_model = pickle.load(open("./models/vocab.pkl", "rb")) result = vocab_model.transform(review,sequence_max_len) # print(result)
        data = torch.LongTensor(result).to(device()) data=torch.reshape(data,(1,sequence_max_len)).to(device()) # print(data.shape)
        output = model(data) print(output.data) pred = output.data.max(1, keepdim=True)[1]  # 獲取最大值的位置,[batch_size,1]
        print(pred.item()) if pred.item() == 0: print("喜悅") elif pred.item() == 1: print("憤怒") elif pred.item() == 2: print("厭惡") elif pred.item() == 3: print("低落") if __name__ == '__main__': # imdb_model = ImdbModel().to(device())
    # train(imdb_model,20)
    # test(imdb_model)
    xlftest()

測試結果:

對四種分類的准確度只有40左右,一部分原因是數據集不規范,還需要進行調參優化

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM