transformer-encoder用於問答中的意圖識別


一.利用transformer-encoder進行文本分類,用於在問答中的意圖識別。

二.結構圖

三.程序(完整程序:https://github.com/jiangnanboy/intent_classification/tree/master/transformer_encoder

import os
import torch
from torchtext import data,datasets
from torchtext.data import Iterator, BucketIterator
from torchtext.vocab import Vectors
from torch import nn,optim
import torch.nn.functional as F
import pandas as pd
import pickle

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

intent_classification_path = os.path.abspath(os.path.join(os.getcwd(), '../..'))
# 訓練數據路徑
train_data = os.path.join(intent_classification_path,'classification_data/knowledge_point_qa_data.csv')
# 讀取數據
train_data = pd.read_csv(train_data)
# 按字分    
tokenize =lambda x: x.split(' ')

TEXT = data.Field(
                    sequential=True,
                    tokenize=tokenize,
                    lower=True,
                    use_vocab=True,
                    pad_token='<pad>',
                    unk_token='<unk>',
                    batch_first=True,
                    fix_length=20)

LABEL = data.Field(
                    sequential=False,
                    use_vocab=False)

# 獲取訓練或測試數據集
def get_dataset(csv_data, text_field, label_field, test=False):
    fields = [('id', None), ('text', text_field), ('label', label_field)]
    examples = []
    if test: #測試集,不加載label
        for text in csv_data['text']:
            examples.append(data.Example.fromlist([None, text, None], fields))
    else: # 訓練集
        for text, label in zip(csv_data['text'], csv_data['label']):
            examples.append(data.Example.fromlist([None, text, label], fields))
    return examples, fields

train_examples,train_fields = get_dataset(train_data, TEXT, LABEL)

train = data.Dataset(train_examples, train_fields)
# 預訓練數據
#pretrained_embedding = os.path.join(os.getcwd(), 'sgns.sogou.char')
#vectors = Vectors(name=pretrained_embedding)
# 構建詞典
#TEXT.build_vocab(train, min_freq=1, vectors = vectors)

TEXT.build_vocab(train, min_freq=1)
words_path = os.path.join(os.getcwd(), 'words.pkl')
with open(words_path, 'wb') as f_words:
    pickle.dump(TEXT.vocab, f_words)
    
BATCH_SIZE = 163
# 構建迭代器
train_iter = BucketIterator(
                            dataset=train,
                            batch_size=BATCH_SIZE,
                            shuffle=True,
                            sort_within_batch=False)


'''
    1.輸入是序列中token的embedding與位置embedding
    2.token的embedding與其位置embedding相加,得到一個vector(這個向量融合了token與position信息)
    3.在2之前,token的embedding乘上一個scale(防止點積變大,造成梯度過小)向量[sqrt(emb_dim)],這個假設為了減少embedding中的變化,沒有這個scale,很難穩定的去訓練model。
    4.加入dropout
    5.通過N個encoder layer,得到Z。此輸出Z被傳入一個全連接層作分類。
    src_mask對於非<pad>值為1,<pad>為0。為了計算attention而遮擋<pad>這個無意義的token。與source 句子shape一致。
'''
class TransformerEncoder(nn.Module):
    def __init__(self, input_dim, output_dim, emb_dim, n_layers, n_heads, pf_dim, dropout, position_length, pad_idx):
        super(TransformerEncoder, self).__init__()
        
        self.pad_idx = pad_idx
        self.scale = torch.sqrt(torch.FloatTensor([emb_dim])).to(DEVICE)

        # 詞的embedding
        self.token_embedding = nn.Embedding(input_dim, emb_dim)
        # 對詞的位置進行embedding
        self.position_embedding = nn.Embedding(position_length, emb_dim)
        # encoder層,有幾個encoder層,每個encoder有幾個head
        self.layers = nn.ModuleList([EncoderLayer(emb_dim, n_heads, pf_dim, dropout) for _ in range(n_layers)])
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(emb_dim, output_dim)

    def mask_src_mask(self, src):
        # src=[batch_size, src_len]

        # src_mask=[batch_size, 1, 1, src_len]
        src_mask = (src != self.pad_idx).unsqueeze(1).unsqueeze(2)
        return src_mask
    
    def forward(self, src):
        # src=[batch_size, seq_len]
        # src_mask=[batch_size, 1, 1, seq_len]
        src_mask = self.mask_src_mask(src)
        
        batch_size = src.shape[0]
        src_len = src.shape[1]

        # 構建位置tensor -> [batch_size, seq_len],位置序號從(0)開始到(src_len-1)
        position = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(DEVICE)

        # 對詞和其位置進行embedding -> [batch_size,seq_len,embdim]
        token_embeded = self.token_embedding(src) * self.scale
        position_embeded = self.position_embedding(position)

        # 對詞和其位置的embedding進行按元素加和 -> [batch_size, seq_len, embdim]
        src = self.dropout(token_embeded + position_embeded)

        for layer in self.layers:
            src = layer(src, src_mask)

        # [batch_size, seq_len, emb_dim] -> [batch_size, output_dim]
        src = src.permute(0, 2, 1)
        src = torch.sum(src, dim=-1)
        src = self.fc(src)
        return src

'''
encoder layers:
    1.將src與src_mask傳入多頭attention層(multi-head attention)
    2.dropout
    3.使用殘差連接后傳入layer-norm層(輸入+輸出后送入norm)后得到的輸出
    4.輸出通過前饋網絡feedforward層
    5.dropout
    6.一個殘差連接后傳入layer-norm層后得到的輸出喂給下一層
    注意:
        layer之間不共享參數
        多頭注意力層用到的是多個自注意力層self-attention
'''
class EncoderLayer(nn.Module):
    def __init__(self, emb_dim, n_heads, pf_dim, dropout):
        super(EncoderLayer, self).__init__()
        # 注意力層后的layernorm
        self.self_attn_layer_norm = nn.LayerNorm(emb_dim)
        # 前饋網絡層后的layernorm
        self.ff_layer_norm = nn.LayerNorm(emb_dim)
        # 多頭注意力層
        self.self_attention = MultiHeadAttentionLayer(emb_dim, n_heads, dropout)
        # 前饋層
        self.feedforward = FeedforwardLayer(emb_dim, pf_dim, dropout)

        self.dropout = nn.Dropout(dropout)

    def forward(self, src, src_mask):
        #src=[batch_size, seq_len, emb_dim]
        #src_mask=[batch_size, 1, 1, seq_len]

        # self-attention
        # _src=[batch size, query_len, emb_dim]
        _src, _ = self.self_attention(src, src, src, src_mask)

        # dropout, 殘差連接以及layer-norm
        # src=[batch_size, seq_len, emb_dim]
        src = self.self_attn_layer_norm(src + self.dropout(_src))

        # 前饋網絡
        # _src=[batch_size, seq_len, emb_dim]
        _src = self.feedforward(src)

        # dropout, 殘差連接以及layer-norm
        # src=[batch_size, seq_len, emb_dim]
        src = self.ff_layer_norm(src + self.dropout(_src))

        return src
'''
多頭注意力層的計算:
    1.q,k,v的計算是通過線性層fc_q,fc_k,fc_v
    2.對query,key,value的emb_dim split成n_heads
    3.通過計算Q*K/scale計算energy
    4.利用mask遮掩不需要關注的token
    5.利用softmax與dropout
    6.5的結果與V矩陣相乘
    7.最后通過一個前饋fc_o輸出結果
注意:Q,K,V的長度一致
'''
class MultiHeadAttentionLayer(nn.Module):
    def __init__(self, emb_dim, n_heads, dropout):
        super(MultiHeadAttentionLayer, self).__init__()
        assert emb_dim % n_heads == 0
        self.emb_dim = emb_dim
        self.n_heads = n_heads
        self.head_dim = emb_dim//n_heads

        self.fc_q = nn.Linear(emb_dim, emb_dim)
        self.fc_k = nn.Linear(emb_dim, emb_dim)
        self.fc_v = nn.Linear(emb_dim, emb_dim)

        self.fc_o = nn.Linear(emb_dim, emb_dim)

        self.dropout = nn.Dropout(dropout)

        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(DEVICE)

    def forward(self, query, key, value, mask=None):
        # query=[batch_size, query_len, emb_dim]
        # key=[batch_size, key_len, emb_dim]
        # value=[batch_size, value_len, emb_dim]
        batch_size = query.shape[0]

        # Q=[batch_size, query_len, emb_dim]
        # K=[batch_size, key_len, emb_dim]
        # V=[batch_size, value_len, emb_dim]
        Q = self.fc_q(query)
        K = self.fc_k(key)
        V = self.fc_v(value)

        '''
        view與reshape的異同:
        
        torch的view()與reshape()方法都可以用來重塑tensor的shape,區別就是使用的條件不一樣。view()方法只適用於滿足連續性條件的tensor,並且該操作不會開辟新的內存空間,
        只是產生了對原存儲空間的一個新別稱和引用,返回值是視圖。而reshape()方法的返回值既可以是視圖,也可以是副本,當滿足連續性條件時返回view,
        否則返回副本[ 此時等價於先調用contiguous()方法在使用view() ]。因此當不確能否使用view時,可以使用reshape。如果只是想簡單地重塑一個tensor的shape,
        那么就是用reshape,但是如果需要考慮內存的開銷而且要確保重塑后的tensor與之前的tensor共享存儲空間,那就使用view()。
        '''

        # Q=[batch_size, n_heads, query_len, head_dim]
        # K=[batch_size, n_heads, key_len, head_dim]
        # V=[batch_size, n_heads, value_len, head_dim]
        Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)

        # 注意力打分矩陣 [batch_size, n_heads, query_len, head_dim] * [batch_size, n_heads, head_dim, key_len] = [batch_size, n_heads, query_len, key_len]
        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale

        if mask is not None:
            energy = energy.masked_fill(mask == 0, -1e10)
        # [batch_size, n_heads, query_len, key_len]
        attention = torch.softmax(energy , dim = -1)

        # [batch_size, n_heads, query_len, key_len]*[batch_size, n_heads, value_len, head_dim]=[batch_size, n_heads, query_len, head_dim]
        x = torch.matmul(self.dropout(attention), V)

        # [batch_size, query_len, n_heads, head_dim]
        x = x.permute(0, 2, 1, 3).contiguous()

        # [batch_size, query_len, emb_dim]
        x = x.view(batch_size, -1, self.emb_dim)

        # [batch_size, query_len, emb_dim]
        x = self.fc_o(x)

        return x, attention

'''
前饋層
'''
class FeedforwardLayer(nn.Module):
    def __init__(self, emb_dim, pf_dim, dropout):
        super(FeedforwardLayer, self).__init__()
        self.fc_1 = nn.Linear(emb_dim, pf_dim)
        self.fc_2 = nn.Linear(pf_dim, emb_dim)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # x=[batch_size, seq_len, emb_dim]

        # x=[batch_size, seq_len, pf_dim]
        x = self.dropout(torch.relu(self.fc_1(x)))

        # x=[batch_size, seq_len, emb_dim]
        x = self.fc_2(x)

        return x
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter(os.getcwd()+'/log', comment='transformer-encoder')

# 訓練
input_dim = len(TEXT.vocab) 
output_dim = 9
emb_dim = 256
n_layers = 3
n_heads = 8
pf_dim = 512
dropout = 0.5
position_length = 20

# <pad>
pad_index = TEXT.vocab.stoi[TEXT.pad_token]

# 構建model
model = TransformerEncoder(input_dim, output_dim, emb_dim, n_layers, n_heads, pf_dim, dropout, position_length, pad_index).to(DEVICE)
# 利用預訓練模型初始化embedding,requires_grad=True,可以fine-tune
# model.embedding.weight.data.copy_(TEXT.vocab.vectors)
# 訓練模式
model.train()
# 優化和損失
optimizer = torch.optim.Adam(model.parameters(),lr=0.001, weight_decay=0.01)
# optimizer = torch.optim.SGD(model.parameters(),lr=0.001, momentum=0.9, nesterov=True)
criterion = nn.CrossEntropyLoss()

with writer:
    for iter in range(300):
        for i, batch in enumerate(train_iter):
            train_text = batch.text
            train_label = batch.label
            train_text = train_text.to(DEVICE)
            train_label = train_label.to(DEVICE)
            out = model(train_text)
            loss = criterion(out, train_label)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if (iter+1) % 10 == 0:
                    print ('iter [{}/{}], Loss: {:.4f}'.format(iter+1, 300, loss.item()))
            #writer.add_graph(model, input_to_model=train_text,verbose=False)
            writer.add_scalar('loss',loss.item(),global_step=iter+1)
    writer.flush()
    writer.close()
            
model_path = os.path.join(os.getcwd(), "model.h5")
torch.save(model.state_dict(), model_path)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM