一.模型結構
實現一個簡單的孿生網絡(結構如上圖),做語義相似度:
1.從上圖可看出整體的結構相對較簡單,左右兩邊基本一致。A句和B句分別進入左右兩個結構。輸入到網絡中是token embedding + position_embedding
2.再經過cnn-encoder進行編碼
3.多頭注意力層,self-attention的輸入:一個是本句cnn-encoder的輸出;一個是另一句的cnn-encoder的輸出。作為兩句的交互層
4.將cnn-encoder的輸出和self-attention的輸出進行cat連接
5.接一個fc層
6.一個平均池化層
7.最后是用cosine余弦作相似度匹配計算
cnn-encoder結構如下:
二.程序:(完整項目見:https://github.com/jiangnanboy/semantic_matching/tree/master/model1)
# 搭建模型 class Encoder(nn.Module): def __init__(self, input_dim, hid_dim, n_layers, kernel_size, dropout, max_length=30): super(Encoder, self).__init__() #for kernel in kernel_size: assert kernel_size % 2 == 1,'kernel size must be odd!' # 卷積核size為奇數,方便序列兩邊pad處理 self.scale = torch.sqrt(torch.FloatTensor([0.5])).to(DEVICE) # 確保整個網絡的方差不會發生顯著變化 self.tok_embedding = nn.Embedding(input_dim, hid_dim) # token編碼 self.pos_embedding = nn.Embedding(max_length, hid_dim) # token的位置編碼 #self.emb2hid = nn.Linear(emb_dim, hid_dim) # 線性層,從emb_dim轉為hid_dim #self.hid2emb = nn.Linear(hid_dim, emb_dim) # 線性層,從hid_dim轉為emb_dim # 卷積塊 self.convs = nn.ModuleList([nn.Conv1d(in_channels=hid_dim, out_channels=2*hid_dim, # 卷積后輸出的維度,這里2*hid_dim是為了后面的glu激活函數 kernel_size=kernel_size, padding=(kernel_size - 1)//2) # 序列兩邊補0個數,保持維度不變 for _ in range(n_layers)]) ''' 利用不同size的卷積核進行特征提取 self.conv_1 = nn.ModuleList([nn.Conv1d(in_channels=hid_dim, out_channels=2*hid_dim, # 卷積后輸出的維度,這里2*hid_dim是為了后面的glu激活函數 kernel_size=kernel_size[0], padding=(kernel_size[0] - 1)//2) # 序列兩邊補0個數,保持維度不變 for _ in range(n_layers)]) self.conv_2 = nn.ModuleList([nn.Conv1d(in_channels=hid_dim, out_channels=2*hid_dim, # 卷積后輸出的維度,這里2*hid_dim是為了后面的glu激活函數 kernel_size=kernel_size[1], padding=(kernel_size[1] - 1)//2) # 序列兩邊補0個數,保持維度不變 for _ in range(n_layers)]) self.conv_3 = nn.ModuleList([nn.Conv1d(in_channels=hid_dim, out_channels=2*hid_dim, # 卷積后輸出的維度,這里2*hid_dim是為了后面的glu激活函數 kernel_size=kernel_size[2], padding=(kernel_size[2] - 1)//2) # 序列兩邊補0個數,保持維度不變 for _ in range(n_layers)]) # 幾個卷積模塊轉換維度 self.convhid2hid = nn.Linear(len(kernel_size) * hid_dim, hid_dim) ''' self.dropout = nn.Dropout(dropout) def forward(self, src): # src: [batch_size, src_len] batch_size = src.shape[0] src_len = src.shape[1] # 創建token位置信息 pos = torch.arange(src_len).unsqueeze(0).repeat(batch_size, 1).to(DEVICE) # [batch_size, src_len] # 對token與其位置進行編碼 tok_embedded = self.tok_embedding(src) # [batch_size, src_len, emb_dim] pos_embedded = self.pos_embedding(pos.long()) # [batch_size, src_len, emb_dim] # 對token embedded和pos_embedded逐元素加和 embedded = self.dropout(tok_embedded + pos_embedded) # [batch_size, src_len, emb_dim] # embedded經過一線性層,將emb_dim轉為hid_dim,作為卷積塊的輸入 #conv_input = self.emb2hid(embedded) # [batch_size, src_len, hid_dim] # 轉變維度,卷積在輸入數據的最后一維進行 conv_input = embedded.permute(0, 2, 1) # [batch_size, hid_dim, src_len] # 以下進行卷積塊 for i, conv in enumerate(self.convs): # 進行卷積 conved = conv(self.dropout(conv_input)) # [batch_size, 2*hid_dim, src_len] # 進行激活glu conved = F.glu(conved, dim=1) # [batch_size, hid_dim, src_len] # 進行殘差連接 conved = (conved + conv_input) * self.scale # [batch_size, hid_dim, src_len] # 作為下一個卷積塊的輸入 conv_input = conved # 經過一線性層,將hid_dim轉為emb_dim,作為enocder的卷積輸出的特征 #conved = self.hid2emb(conved.permute(0, 2, 1)) # [batch_size, src_len, emb_dim] ''' 利用不同size的卷積核進行特征提取 # 第一個kernel_size conved_input = conv_input for i, conv in enumerate(self.conv_1): # 進行卷積 conved1 = conv(self.dropout(conved_input)) # [batch_size, 2*hid_dim, src_len] # 進行激活glu conved1 = F.glu(conved1, dim=1) # [batch_size, hid_dim, src_len] # 進行殘差連接 conved1 = (conved1 + conved_input) * self.scale # [batch_size, hid_dim, src_len] # 作為下一個卷積塊的輸入 conved_input = conved1 combine_conv_module = conved1 # 第二個kernel_size conved_input = conv_input for i, conv in enumerate(self.conv_2): # 進行卷積 conved2 = conv(self.dropout(conved_input)) # [batch_size, 2*hid_dim, src_len] # 進行激活glu conved2 = F.glu(conved2, dim=1) # [batch_size, hid_dim, src_len] # 進行殘差連接 conved2 = (conved2 + conved_input) * self.scale # [batch_size, hid_dim, src_len] # 作為下一個卷積塊的輸入 conved_input = conved2 combine_conv_module = torch.cat([combine_conv_module, conved2], dim = 1) # 第三個kernel_size conved_input = conv_input for i, conv in enumerate(self.conv_3): # 進行卷積 conved3 = conv(self.dropout(conved_input)) # [batch_size, 2*hid_dim, src_len] # 進行激活glu conved3 = F.glu(conved3, dim=1) # [batch_size, hid_dim, src_len] # 進行殘差連接 conved3 = (conved3 + conved_input) * self.scale # [batch_size, hid_dim, src_len] # 作為下一個卷積塊的輸入 conved_input = conved3 combine_conv_module = torch.cat([combine_conv_module, conved3], dim = 1) conved = self.convhid2hid(combine_conv_module.permute(0, 2, 1)) # [batch_size, src_len, hid_dim] ''' # 又是一個殘差連接,逐元素加和輸出,作為encoder的聯合輸出特征 combined = (conved.permute(0, 2, 1) + embedded) * self.scale # [batch_size, src_len, emb_dim] return conved, combined ''' 多頭注意力multi-head attention ''' class MultiHeadAttentionLayer(nn.Module): def __init__(self, hid_dim, n_heads, dropout): super(MultiHeadAttentionLayer, self).__init__() assert hid_dim % n_heads == 0 self.hid_dim = hid_dim self.n_heads = n_heads self.head_dim = hid_dim // n_heads self.fc_q = nn.Linear(hid_dim, hid_dim) self.fc_k = nn.Linear(hid_dim, hid_dim) self.fc_v = nn.Linear(hid_dim, hid_dim) self.fc_o = nn.Linear(hid_dim, hid_dim) self.dropout = nn.Dropout(dropout) self.scale = torch.sqrt(torch.FloatTensor([self.hid_dim])).to(DEVICE) # 縮放因子 def forward(self, query, key, value, mask=None): ''' query: [batch_size, query_len, hid_dim] key: [batch_size, key_len, hid_dim] value: [batch_size, value_len, hid_dim] ''' batch_size = query.shape[0] Q = self.fc_q(query) # [batch_size, query_len, hid_dim] K = self.fc_k(key) # [batch_size, key_len, hid_dim] V = self.fc_v(value) # [batch_size, value_len, hid_dim] Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3) # [batch_size, n_heads, query_len, head_dim] K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3) # [batch_size, n_heads, key_len, head_dim] V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3) # [batch_size, n_heads, value_len, head_dim] # [batch_size, n_heads, query_len, head_dim] * [batch_size, n_heads, head_dim, key_len] energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale # [batch_size, n_heads, query_len, key_len] if mask != None: energy = energy.masked_fill(mask == 0, -1e10) attention = torch.softmax(energy, dim=-1) # [batch_size, n_heads, query_len, key_len] # [batch_size, n_heads, query_len, key_len] * [batch_size, n_heads, value_len, head_dim] x = torch.matmul(self.dropout(attention), V) # [batch_size, n_heads, query_len, head_dim] x = x.permute(0, 2, 1, 3).contiguous() # [batch_size, query_len, n_heads, head_dim] x = x.view(batch_size, -1, self.hid_dim) # [batch_size, query_len, hid_dim] x = self.fc_o(x) # [batch_size, query_len, hid_dim] return x, attention class SiameseNetwork(nn.Module): def __init__(self, EncoderA, hid_dim, n_heads, dropout): super(SiameseNetwork, self).__init__() self.EncoderA = EncoderA #self.EncoderB = EncoderB #self.dropout = nn.Dropout(dropout) # 多頭 self.self_attention = MultiHeadAttentionLayer(hid_dim, n_heads, dropout) self.fcA = nn.Linear(2 * hid_dim, hid_dim) self.fcB = nn.Linear(2 * hid_dim, hid_dim) self.fc_out = nn.Linear(5 * hid_dim, 2) def calculate_attention(self, convedA, convedB): ''' convedA:[batch_size, len, hid_dim] convedB:[batch_size, len, hid_dim] ''' energy = torch.matmul(convedA, convedB.permute(0, 2, 1)) # [batch_size, trg_len, src_len] attention = F.softmax(energy, dim=2) # [batch_size, trg_len, src_len] attention_encoding = torch.matmul(attention, convedB) # [batch_size, trg_len, hid_dim] return attention, attention_encoding def forward(self, sentA, sentB): convedA, combinedA = self.EncoderA(sentA) convedB, combinedB = self.EncoderA(sentB) # 普通attention #attentionA, attended_encodingA = self.calculate_attention(combinedB, combinedA) #attentionB, attended_encodingB = self.calculate_attention(combinedA, combinedB) # 多頭attention,來自transformer模型中 self_attentionA, attentionA = self.self_attention(combinedB, combinedA, combinedA) self_attentionB, attentionB = self.self_attention(combinedA, combinedB, combinedB) combinedA = torch.cat([self_attentionA, combinedA], dim=2) # [batch_size, len, 2 * hid_dim] combinedB = torch.cat([self_attentionB, combinedB], dim=2) # [batch_size, len, 2 * hid_dim] combinedA = self.fcA(combinedA) # [batch_size, len, hid_dim] combinedB = self.fcB(combinedB) # [batch_size, len, hid_dim] combinedA = F.avg_pool1d(combinedA.permute(0, 2, 1), combinedA.shape[1]).squeeze(2) # [batch_size, emb_dim] combinedB = F.avg_pool1d(combinedB.permute(0, 2, 1), combinedB.shape[1]).squeeze(2) # [batch_size, emb_dim] similarity = torch.cosine_similarity(combinedA, combinedB, dim=1) # 直接計算和學習相似度 # 以下是做二分類 # [p, q, p+q, p-q, p*q] #fc_out = self.fc_out(torch.cat([combinedA, combinedB, combinedA+combinedB, combinedA-combinedB, combinedA*combinedB], dim=1)) # 【batch_size, 2】 return similarity