Bilstm中文微博多情感分析


Bilstm中文微博多情感分析

数据

我的数据是来自github的一个项目:ChineseNlpCorpus 里面收集了蛮多用于自然语言处理的中文数据集/语料。

下载地址: 百度网盘
数据概览: 36 万多条,带情感标注 新浪微博,包含 4 种情感,其中喜悦约 20 万条,愤怒、厌恶、低落各约 5 万条
数据来源: 新浪微博
原数据集: 微博情感分析数据集,网上搜集,具体作者、来源不详

预处理

划分训练集和测试集

将表中的各类情感的数据各抽取前10000条做测试集,剩余的用作训练集

import pandas as pd import openpyxl from openpyxl.cell.cell import ILLEGAL_CHARACTERS_RE def test(): file='simplifyweibo_4_moods.csv' data=pd.read_csv(file) label=data.get('label') review = data.get('review') train_review=[] train_label=[] test_review=[] test_label=[] n1=n2=n3=n4=0 for i in range(len(review)): lab=int(label[i]) line=str(review[i]) line=ILLEGAL_CHARACTERS_RE.sub(r'', line) if int(lab)==0: if n1<10000: n1+=1 test_label.append(lab) test_review.append(line) else: train_label.append(lab) train_review.append(line) elif int(lab)==1: if n2<10000: n2+=1 test_label.append(lab) test_review.append(line) else: train_label.append(lab) train_review.append(line) elif int(lab)==2: if n3<10000: n3+=1 test_label.append(lab) test_review.append(line) else: train_label.append(lab) train_review.append(line) elif int(lab)==3: if n4<10000: n4+=1 test_label.append(lab) test_review.append(line) else: train_label.append(lab) train_review.append(line) import openpyxl import xlsxwriter xl = openpyxl.Workbook() # 调用对象的add_sheet方法
    sheet1 = xl.create_sheet(index=0) sheet1.cell(1, 1, "label") sheet1.cell(1, 2, "review") for i in range(0, len(train_review)): sheet1.cell(i + 2, 1, train_label[i]) sheet1.cell(i + 2, 2, train_review[i]) xl.save("train.xlsx") if __name__ == '__main__': test()

得到对应的文件

 

 

生成字典

数据中的一些标点符号、特殊符号、英文字母、数字等对于我们的实验都是没有用处的,所以我们需要将他们过滤掉。

去除停用词

def tokenlize(sentence): """ 进行文本分词 :param sentence: str :return: [str,str,str] """ fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>', '\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '', '', ] sentence = re.sub("|".join(fileters), "", sentence) sentence=jieba.cut(sentence,cut_all=False) sentence=' '.join(sentence) result = [i for i in sentence.split(" ") if len(i) > 0] result=movestopwords(result) return result def stopwordslist(filepath): stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()] return stopwords # 对句子去除停用词
def movestopwords(sentence): stopwords = stopwordslist('data/stopwords.txt')  # 这里加载停用词的路径
    outstr = [] for word in sentence: if word not in stopwords: if word != '\t' and '\n': outstr.append(word) # outstr += " "
    return outstr

设计字典类

""" 文本序列化 """


class Vocab: UNK_TAG = "<UNK>"  # 表示未知字符
    PAD_TAG = "<PAD>"  # 填充符
    PAD = 0 UNK = 1

    def __init__(self): self.dict = {  # 保存词语和对应的数字
 self.UNK_TAG: self.UNK, self.PAD_TAG: self.PAD } self.count = {}  # 统计词频的

    def fit(self, sentence): """ 接受句子,统计词频 :param sentence:[str,str,str] :return:None """
        for word in sentence: self.count[word] = self.count.get(word, 0) + 1  # 所有的句子fit之后,self.count就有了所有词语的词频

    def build_vocab(self, min_count=1, max_count=None, max_features=None): """ 根据条件构造 词典 :param min_count:最小词频 :param max_count: 最大词频 :param max_features: 最大词语数 :return: """
        if min_count is not None: self.count = {word: count for word, count in self.count.items() if count >= min_count} if max_count is not None: self.count = {word: count for word, count in self.count.items() if count <= max_count} if max_features is not None: # [(k,v),(k,v)....] --->{k:v,k:v}
            self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_features]) for word in self.count: self.dict[word] = len(self.dict)  # 每次word对应一个数字

        # 把dict进行翻转
        self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys())) def transform(self, sentence, max_len=None): """ 把句子转化为数字序列 :param sentence:[str,str,str] :return: [int,int,int] """
        if len(sentence) > max_len: sentence = sentence[:max_len] else: sentence = sentence + [self.PAD_TAG] * (max_len - len(sentence))  # 填充PAD

        return [self.dict.get(i, 1) for i in sentence] def inverse_transform(self, incides): """ 把数字序列转化为字符 :param incides: [int,int,int] :return: [str,str,str] """
        return [self.inverse_dict.get(i, "<UNK>") for i in incides] def __len__(self): return len(self.dict) # # 以下是调试代码 # if __name__ == '__main__': # sentences = [["今天", "天气", "很", "好"], # ["今天", "去", "吃", "什么"]] # ws = Vocab() # for sentence in sentences: # # 统计词频 # ws.fit(sentence) # # 构造词典 # ws.build_vocab(min_count=1) # print(ws.dict) # # 把句子转换成数字序列 # ret = ws.transform(["好", "好", "好", "好", "好", "好", "好", "热", "呀"], max_len=13) # print(ret) # # 把数字序列转换成句子 # ret = ws.inverse_transform(ret) # print(ret) # pass

dataset

# -*-coding:utf-8-*-
import os import pickle import re import zipfile import jieba from torch.utils.data import Dataset, DataLoader from tqdm import tqdm import pandas as pd class ImdbDataset(Dataset): def __init__(self, train=True): if train == True: url = 'data/train.xlsx'
        else: url = "data/test.xlsx" data = pd.read_excel(url) sentence = data.get('review') label = data.get('label') self.sentence_list=sentence self.label_list=label def __getitem__(self, idx): line_text=self.sentence_list[idx] # 从txt获取评论并分词
        review = tokenlize(str(line_text)) # 获取评论对应的label
        label = int(self.label_list[idx]) return review, label def __len__(self): return len(self.sentence_list) def tokenlize(sentence): """ 进行文本分词 :param sentence: str :return: [str,str,str] """ fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>', '\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '', '', ] sentence = re.sub("|".join(fileters), "", sentence) sentence=jieba.cut(sentence,cut_all=False) sentence=' '.join(sentence) result = [i for i in sentence.split(" ") if len(i) > 0] result=movestopwords(result) return result def stopwordslist(filepath): stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()] return stopwords # 对句子去除停用词
def movestopwords(sentence): stopwords = stopwordslist('data/stopwords.txt')  # 这里加载停用词的路径
    outstr = [] for word in sentence: if word not in stopwords: if word != '\t' and '\n': outstr.append(word) # outstr += " "
    return outstr # 以下为调试代码
def collate_fn(batch): """ 对batch数据进行处理 :param batch: [一个getitem的结果,getitem的结果,getitem的结果] :return: 元组 """ reviews, labels = zip(*batch) return reviews, labels if __name__ == "__main__": from 情感分析.imdb_sentiment.vocab import Vocab imdb_dataset = ImdbDataset(True) my_dataloader = DataLoader(imdb_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn) for review,label in my_dataloader: vocab_model = pickle.load(open("./models/vocab.pkl", "rb")) print(review[0]) result = vocab_model.transform(review[0], 30) print(result) break

构建字典

# -*-coding:utf-8-*-
import pickle from tqdm import tqdm from 情感分析.weibo_many_emotion import dataset # from 情感分析.imdb_sentiment.vocab import Vocab
from torch.utils.data import DataLoader class Vocab: UNK_TAG = "<UNK>"  # 表示未知字符
    PAD_TAG = "<PAD>"  # 填充符
    PAD = 0 UNK = 1

    def __init__(self): self.dict = {  # 保存词语和对应的数字
 self.UNK_TAG: self.UNK, self.PAD_TAG: self.PAD } self.count = {}  # 统计词频的

    def fit(self, sentence): """ 接受句子,统计词频 :param sentence:[str,str,str] :return:None """
        for word in sentence: self.count[word] = self.count.get(word, 0) + 1  # 所有的句子fit之后,self.count就有了所有词语的词频

    def build_vocab(self, min_count=1, max_count=None, max_features=None): """ 根据条件构造 词典 :param min_count:最小词频 :param max_count: 最大词频 :param max_features: 最大词语数 :return: """
        if min_count is not None: self.count = {word: count for word, count in self.count.items() if count >= min_count} if max_count is not None: self.count = {word: count for word, count in self.count.items() if count <= max_count} if max_features is not None: # [(k,v),(k,v)....] --->{k:v,k:v}
            self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_features]) for word in self.count: self.dict[word] = len(self.dict)  # 每次word对应一个数字

        # 把dict进行翻转
        self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys())) def transform(self, sentence, max_len=None): """ 把句子转化为数字序列 :param sentence:[str,str,str] :return: [int,int,int] """
        if len(sentence) > max_len: sentence = sentence[:max_len] else: sentence = sentence + [self.PAD_TAG] * (max_len - len(sentence))  # 填充PAD

        return [self.dict.get(i, 1) for i in sentence] def inverse_transform(self, incides): """ 把数字序列转化为字符 :param incides: [int,int,int] :return: [str,str,str] """
        return [self.inverse_dict.get(i, "<UNK>") for i in incides] def __len__(self): return len(self.dict) def collate_fn(batch): """ 对batch数据进行处理 :param batch: [一个getitem的结果,getitem的结果,getitem的结果] :return: 元组 """ reviews, labels = zip(*batch) return reviews, labels def get_dataloader(train=True): imdb_dataset = dataset.ImdbDataset(train) my_dataloader = DataLoader(imdb_dataset, batch_size=200, shuffle=True, collate_fn=collate_fn) return my_dataloader if __name__ == '__main__': ws = Vocab() dl_train = get_dataloader(True) dl_test = get_dataloader(False) for reviews, label in tqdm(dl_train, total=len(dl_train)): for sentence in reviews: ws.fit(sentence) for reviews, label in tqdm(dl_test, total=len(dl_test)): for sentence in reviews: ws.fit(sentence) ws.build_vocab() print(len(ws)) pickle.dump(ws, open("./models/vocab.pkl", "wb"))

模型训练

# -*-coding:utf-8-*-
import pickle import torch import torch.nn as nn import torch.nn.functional as F from torch.optim import Adam from torch.utils.data import DataLoader from tqdm import tqdm from 情感分析.weibo_many_emotion import dataset from 情感分析.中文情感分类.vocab import Vocab train_batch_size = 512 test_batch_size = 128 voc_model = pickle.load(open("./models/vocab.pkl", "rb")) sequence_max_len = 100


def collate_fn(batch): """ 对batch数据进行处理 :param batch: [一个getitem的结果,getitem的结果,getitem的结果] :return: 元组 """ reviews, labels = zip(*batch) reviews = torch.LongTensor([voc_model.transform(i, max_len=sequence_max_len) for i in reviews]) labels = torch.LongTensor(labels) return reviews, labels def get_dataloader(train=True): imdb_dataset = dataset.ImdbDataset(train) batch_size = train_batch_size if train else test_batch_size return DataLoader(imdb_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn) class ImdbModel(nn.Module): def __init__(self): super(ImdbModel, self).__init__() self.embedding = nn.Embedding(num_embeddings=len(voc_model), embedding_dim=200, padding_idx=voc_model.PAD).to() self.lstm = nn.LSTM(input_size=200, hidden_size=64, num_layers=2, batch_first=True, bidirectional=True, dropout=0.1) self.fc1 = nn.Linear(64 * 2, 64) self.fc2 = nn.Linear(64, 4) def forward(self, input): """ :param input:[batch_size,max_len] :return: """ input_embeded = self.embedding(input)  # input embeded :[batch_size,max_len,200]
 output, (h_n, c_n) = self.lstm(input_embeded)  # h_n :[4,batch_size,hidden_size]
        # out :[batch_size,hidden_size*2]
        out = torch.cat([h_n[-1, :, :], h_n[-2, :, :]], dim=-1)  # 拼接正向最后一个输出和反向最后一个输出

        # 进行全连接
        out_fc1 = self.fc1(out) # 进行relu
        out_fc1_relu = F.relu(out_fc1) # 全连接
        out_fc2 = self.fc2(out_fc1_relu)  # out :[batch_size,2]
        return F.log_softmax(out_fc2, dim=-1) def device(): if torch.cuda.is_available(): return torch.device('cuda') else: return torch.device('cpu') def train(imdb_model, epoch): """ :param imdb_model: :param epoch: :return: """ train_dataloader = get_dataloader(train=True) optimizer = Adam(imdb_model.parameters()) for i in range(epoch): bar = tqdm(train_dataloader, total=len(train_dataloader)) for idx, (data, target) in enumerate(bar): optimizer.zero_grad() data = data.to(device()) target = target.to(device()) output = imdb_model(data) loss = F.nll_loss(output, target) loss.backward() optimizer.step() bar.set_description("epcoh:{} idx:{} loss:{:.6f}".format(i, idx, loss.item())) torch.save(imdb_model, 'lstm_model.pkl') def test(imdb_model): """ 验证模型 :param imdb_model: :return: """ test_loss = 0 correct = 0 imdb_model.eval() test_dataloader = get_dataloader(train=False) with torch.no_grad(): for data, target in tqdm(test_dataloader): data = data.to(device()) target = target.to(device()) output = imdb_model(data) test_loss += F.nll_loss(output, target, reduction='sum').item() pred = output.data.max(1, keepdim=True)[1]  # 获取最大值的位置,[batch_size,1]
            correct += pred.eq(target.data.view_as(pred)).sum() test_loss /= len(test_dataloader.dataset) print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format( test_loss, correct, len(test_dataloader.dataset), 100. * correct / len(test_dataloader.dataset))) def xlftest(): import numpy as np model = torch.load('lstm_model.pkl') model.to(device()) from 情感分析.weibo_many_emotion.dataset import tokenlize lines=['哈哈哈开心','真是无语,你们怎么搞的','小姐姐,祝你生日快乐','你他妈的有病'] for line in lines: print(line) review = tokenlize(line) # review=tokenlize(line)
        vocab_model = pickle.load(open("./models/vocab.pkl", "rb")) result = vocab_model.transform(review,sequence_max_len) # print(result)
        data = torch.LongTensor(result).to(device()) data=torch.reshape(data,(1,sequence_max_len)).to(device()) # print(data.shape)
        output = model(data) print(output.data) pred = output.data.max(1, keepdim=True)[1]  # 获取最大值的位置,[batch_size,1]
        print(pred.item()) if pred.item() == 0: print("喜悦") elif pred.item() == 1: print("愤怒") elif pred.item() == 2: print("厌恶") elif pred.item() == 3: print("低落") if __name__ == '__main__': # imdb_model = ImdbModel().to(device())
    # train(imdb_model,20)
    # test(imdb_model)
    xlftest()

测试结果:

对四种分类的准确度只有40左右,一部分原因是数据集不规范,还需要进行调参优化

 

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM