動手學pytorch-Word2vec


Word2vec

1.詞嵌入基礎

2.PTB數據集

3.Skip-Gram

4.負采樣近似

5.訓練模型

1.詞嵌入基礎

使用 one-hot 向量表示單詞,雖然它們構造起來很容易,但通常並不是一個好選擇。一個主要的原因是,one-hot 詞向量無法准確表達不同詞之間的相似度,如常常使用的余弦相似度。

Word2Vec 詞嵌入工具的提出正是為了解決上面這個問題,它將每個詞表示成一個定長的向量,並通過在語料庫上的預訓練使得這些向量能較好地表達不同詞之間的相似和類比關系,以引入一定的語義信息。基於兩種概率模型的假設,可以定義兩種 Word2Vec 模型:

  • Skip-Gram 跳字模型:假設背景詞由中心詞生成,即建模 \(P(w_o\mid w_c)\),其中 \(w_c\) 為中心詞,\(w_o\) 為任一背景詞;

Image Name

Image Name

本文主要介紹 Skip-Gram 模型的實現,CBOW 實現與其類似。

import collections
import math
import random
import sys
import time
import os
import numpy as np
import torch
from torch import nn
import torch.utils.data as Data

2.PTB 數據集

簡單來說,Word2Vec 能從語料中學到如何將離散的詞映射為連續空間中的向量,並保留其語義上的相似關系。本文使用經典的 PTB 語料庫進行訓練。PTB (Penn Tree Bank) 是一個常用的小型語料庫,它采樣自《華爾街日報》的文章,包括訓練集、驗證集和測試集。在PTB訓練集上訓練詞嵌入模型。

載入數據集

with open('../../inputs/ptb.train.txt', 'r') as f:
    lines = f.readlines() # 該數據集中句子以換行符為分割
    raw_dataset = [st.split() for st in lines] # st是sentence的縮寫,單詞以空格為分割
print('# sentences: %d' % len(raw_dataset))

# 對於數據集的前3個句子,打印每個句子的詞數和前5個詞
# 句尾符為 '' ,生僻詞全用 '' 表示,數字則被替換成了 'N'
for st in raw_dataset[:3]:
    print('# tokens:', len(st), st[:5])
# sentences: 42068
# tokens: 24 ['aer', 'banknote', 'berlitz', 'calloway', 'centrust']
# tokens: 15 ['pierre', '<unk>', 'N', 'years', 'old']
# tokens: 11 ['mr.', '<unk>', 'is', 'chairman', 'of']

建立詞語索引

counter = collections.Counter([tk for st in raw_dataset for tk in st]) # tk是token的縮寫
counter = dict(filter(lambda x: x[1] >= 5, counter.items())) # 只保留在數據集中至少出現5次的詞

idx_to_token = [tk for tk, _ in counter.items()]
token_to_idx = {tk: idx for idx, tk in enumerate(idx_to_token)}
dataset = [[token_to_idx[tk] for tk in st if tk in token_to_idx]
           for st in raw_dataset] # raw_dataset中的單詞在這一步被轉換為對應的idx
num_tokens = sum([len(st) for st in dataset])
'# tokens: %d' % num_tokens
# '# tokens: 887100'

二次采樣

文本數據中一般會出現一些高頻詞,如英文中的“the”“a”和“in”。通常來說,在一個背景窗口中,一個詞(如“chip”)和較低頻詞(如“microprocessor”)同時出現比和較高頻詞(如“the”)同時出現對訓練詞嵌入模型更有益。因此,訓練詞嵌入模型時可以對詞進行二次采樣。 具體來說,數據集中每個被索引詞 \(w_i\) 將有一定概率被丟棄,該丟棄概率為

\[P(w_i)=\max(1-\sqrt{\frac{t}{f(w_i)}},0) \]

其中 \(f(w_i)\) 是數據集中詞 \(w_i\) 的個數與總詞數之比,常數 \(t\) 是一個超參數(實驗中設為 \(10^{−4}\))。可見,只有當 \(f(w_i)>t\) 時,才有可能在二次采樣中丟棄詞 \(w_i\),並且越高頻的詞被丟棄的概率越大。具體的代碼如下:

def discard(idx):
    '''
    @params:
        idx: 單詞的下標
    @return: True/False 表示是否丟棄該單詞
    '''
    return random.uniform(0, 1) < 1 - math.sqrt(
        1e-4 / (counter[idx_to_token[idx]] / num_tokens))

subsampled_dataset = [[tk for tk in st if not discard(tk)] for st in dataset]
print('# tokens: %d' % sum([len(st) for st in subsampled_dataset]))

def compare_counts(token):
    return '# %s: before=%d, after=%d' % (token, sum(
        [st.count(token_to_idx[token]) for st in dataset]), sum(
        [st.count(token_to_idx[token]) for st in subsampled_dataset]))

print(compare_counts('the'))
print(compare_counts('join'))
# tokens: 376269
# the: before=50770, after=2204
# join: before=45, after=45

提取中心詞和背景詞

def get_centers_and_contexts(dataset, max_window_size):
    '''
    @params:
        dataset: 數據集為句子的集合,每個句子則為單詞的集合,此時單詞已經被轉換為相應數字下標
        max_window_size: 背景詞的詞窗大小的最大值
    @return:
        centers: 中心詞的集合
        contexts: 背景詞窗的集合,與中心詞對應,每個背景詞窗則為背景詞的集合
    '''
    centers, contexts = [], []
    for st in dataset:
        if len(st) < 2:  # 每個句子至少要有2個詞才可能組成一對“中心詞-背景詞”
            continue
        centers += st
        for center_i in range(len(st)):
            window_size = random.randint(1, max_window_size) # 隨機選取背景詞窗大小
            indices = list(range(max(0, center_i - window_size),
                                 min(len(st), center_i + 1 + window_size)))
            indices.remove(center_i)  # 將中心詞排除在背景詞之外
            contexts.append([st[idx] for idx in indices])
    return centers, contexts

all_centers, all_contexts = get_centers_and_contexts(subsampled_dataset, 5)

tiny_dataset = [list(range(7)), list(range(7, 10))]
print('dataset', tiny_dataset)
for center, context in zip(*get_centers_and_contexts(tiny_dataset, 2)):
    print('center', center, 'has contexts', context)
dataset [[0, 1, 2, 3, 4, 5, 6], [7, 8, 9]]
center 0 has contexts [1]
center 1 has contexts [0, 2, 3]
center 2 has contexts [1, 3]
center 3 has contexts [1, 2, 4, 5]
center 4 has contexts [2, 3, 5, 6]
center 5 has contexts [3, 4, 6]
center 6 has contexts [4, 5]
center 7 has contexts [8]
center 8 has contexts [7, 9]
center 9 has contexts [8]

注:數據批量讀取的實現需要依賴負采樣近似的實現,故放於負采樣近似部分進行講解。

3.Skip-Gram

在跳字模型中,每個詞被表示成兩個 \(d\) 維向量,用來計算條件概率。假設這個詞在詞典中索引為 \(i\) ,當它為中心詞時向量表示為 \(\boldsymbol{v}_i\in\mathbb{R}^d\),而為背景詞時向量表示為 \(\boldsymbol{u}_i\in\mathbb{R}^d\) 。設中心詞 \(w_c\) 在詞典中索引為 \(c\),背景詞 \(w_o\) 在詞典中索引為 \(o\),假設給定中心詞生成背景詞的條件概率滿足下式:

\[P(w_o\mid w_c)=\frac{\exp(\boldsymbol{u}_o^\top \boldsymbol{v}_c)}{\sum_{i\in\mathcal{V}}\exp(\boldsymbol{u}_i^\top \boldsymbol{v}_c)} \]

PyTorch 預置的 Embedding 層

embed = nn.Embedding(num_embeddings=10, embedding_dim=4)
print(embed.weight)

x = torch.tensor([[1, 2, 3], [4, 5, 6]], dtype=torch.long)
print(embed(x))
Parameter containing:
tensor([[ 0.1457,  0.3388, -2.6896, -1.6433],
        [ 1.3397,  1.9627,  0.0307, -0.1593],
        [ 0.4292, -0.2249, -0.3765, -0.4930],
        [ 1.5861, -0.4672, -0.4935,  1.2721],
        [-0.9143,  0.5252,  1.3651,  1.2648],
        [-0.3117, -0.0466,  0.9327, -0.8765],
        [-0.1167,  1.0003, -1.6003,  1.1744],
        [-0.6663, -0.4376,  0.1040, -0.4025],
        [-1.1091, -0.0036, -0.3064,  0.9317],
        [-1.2386,  0.2226, -0.6514,  1.5125]], requires_grad=True)
tensor([[[ 1.3397,  1.9627,  0.0307, -0.1593],
         [ 0.4292, -0.2249, -0.3765, -0.4930],
         [ 1.5861, -0.4672, -0.4935,  1.2721]],

        [[-0.9143,  0.5252,  1.3651,  1.2648],
         [-0.3117, -0.0466,  0.9327, -0.8765],
         [-0.1167,  1.0003, -1.6003,  1.1744]]], grad_fn=<EmbeddingBackward>)

PyTorch 預置的批量乘法

X = torch.ones((2, 1, 4))
Y = torch.ones((2, 4, 6))
print(torch.bmm(X, Y).shape)
# torch.Size([2, 1, 6])

Skip-Gram 模型的前向計算

def skip_gram(center, contexts_and_negatives, embed_v, embed_u):
    '''
    @params:
        center: 中心詞下標,形狀為 (n, 1) 的整數張量
        contexts_and_negatives: 背景詞和噪音詞下標,形狀為 (n, m) 的整數張量
        embed_v: 中心詞的 embedding 層
        embed_u: 背景詞的 embedding 層
    @return:
        pred: 中心詞與背景詞(或噪音詞)的內積,之后可用於計算概率 p(w_o|w_c)
    '''
    v = embed_v(center) # shape of (n, 1, d)
    u = embed_u(contexts_and_negatives) # shape of (n, m, d)
    pred = torch.bmm(v, u.permute(0, 2, 1)) # bmm((n, 1, d), (n, d, m)) => shape of (n, 1, m)
    return pred

4.負采樣近似

由於 softmax 運算考慮了背景詞可能是詞典 \(\mathcal{V}\) 中的任一詞,對於含幾十萬或上百萬詞的較大詞典,就可能導致計算的開銷過大。將以 skip-gram 模型為例,介紹負采樣 (negative sampling) 的實現來嘗試解決這個問題。

負采樣方法用以下公式來近似條件概率 \(P(w_o\mid w_c)=\frac{\exp(\boldsymbol{u}_o^\top \boldsymbol{v}_c)}{\sum_{i\in\mathcal{V}}\exp(\boldsymbol{u}_i^\top \boldsymbol{v}_c)}\)

\[P(w_o\mid w_c)=P(D=1\mid w_c,w_o)\prod_{k=1,w_k\sim P(w)}^K P(D=0\mid w_c,w_k) \]

其中 \(P(D=1\mid w_c,w_o)=\sigma(\boldsymbol{u}_o^\top\boldsymbol{v}_c)\)\(\sigma(\cdot)\) 為 sigmoid 函數。對於一對中心詞和背景詞,從詞典中隨機采樣 \(K\) 個噪聲詞(實驗中設 \(K=5\))。根據 Word2Vec 論文的建議,噪聲詞采樣概率 \(P(w)\) 設為 \(w\) 詞頻與總詞頻之比的 \(0.75\) 次方。

def get_negatives(all_contexts, sampling_weights, K):
    '''
    @params:
        all_contexts: [[w_o1, w_o2, ...], [...], ... ]
        sampling_weights: 每個單詞的噪聲詞采樣概率
        K: 隨機采樣個數
    @return:
        all_negatives: [[w_n1, w_n2, ...], [...], ...]
    '''
    all_negatives, neg_candidates, i = [], [], 0
    population = list(range(len(sampling_weights)))
    for contexts in all_contexts:
        negatives = []
        while len(negatives) < len(contexts) * K:
            if i == len(neg_candidates):
                # 根據每個詞的權重(sampling_weights)隨機生成k個詞的索引作為噪聲詞。
                # 為了高效計算,可以將k設得稍大一點
                i, neg_candidates = 0, random.choices(
                    population, sampling_weights, k=int(1e5))
            neg, i = neg_candidates[i], i + 1
            # 噪聲詞不能是背景詞
            if neg not in set(contexts):
                negatives.append(neg)
        all_negatives.append(negatives)
    return all_negatives

sampling_weights = [counter[w]**0.75 for w in idx_to_token]
all_negatives = get_negatives(all_contexts, sampling_weights, 5)

print(all_contexts[1])
print(all_negatives[1])
# [0, 6, 11]
# [6113, 84, 705, 854, 247, 2311, 9608, 340, 53, 7686, 725, 317, 907, 419, 2487]

注:除負采樣方法外,還有層序 softmax (hiererarchical softmax) 方法也可以用來解決計算量過大的問題,參考原書10.2.2節

批量讀取數據

class MyDataset(torch.utils.data.Dataset):
    def __init__(self, centers, contexts, negatives):
        assert len(centers) == len(contexts) == len(negatives)
        self.centers = centers
        self.contexts = contexts
        self.negatives = negatives
        
    def __getitem__(self, index):
        return (self.centers[index], self.contexts[index], self.negatives[index])

    def __len__(self):
        return len(self.centers)
    
def batchify(data):
    '''
    用作DataLoader的參數collate_fn
    @params:
        data: 長為batch_size的列表,列表中的每個元素都是__getitem__得到的結果
    @outputs:
        batch: 批量化后得到 (centers, contexts_negatives, masks, labels) 元組
            centers: 中心詞下標,形狀為 (n, 1) 的整數張量
            contexts_negatives: 背景詞和噪聲詞的下標,形狀為 (n, m) 的整數張量
            masks: 與補齊相對應的掩碼,形狀為 (n, m) 的0/1整數張量
            labels: 指示中心詞的標簽,形狀為 (n, m) 的0/1整數張量
    '''
    max_len = max(len(c) + len(n) for _, c, n in data)
    centers, contexts_negatives, masks, labels = [], [], [], []
    for center, context, negative in data:
        cur_len = len(context) + len(negative)
        centers += [center]
        contexts_negatives += [context + negative + [0] * (max_len - cur_len)]
        masks += [[1] * cur_len + [0] * (max_len - cur_len)] # 使用掩碼變量mask來避免填充項對損失函數計算的影響
        labels += [[1] * len(context) + [0] * (max_len - len(context))]
        batch = (torch.tensor(centers).view(-1, 1), torch.tensor(contexts_negatives),
            torch.tensor(masks), torch.tensor(labels))
    return batch

batch_size = 512
num_workers = 0 if sys.platform.startswith('win32') else 4

dataset = MyDataset(all_centers, all_contexts, all_negatives)
data_iter = Data.DataLoader(dataset, batch_size, shuffle=True,
                            collate_fn=batchify, 
                            num_workers=num_workers)
for batch in data_iter:
    for name, data in zip(['centers', 'contexts_negatives', 'masks',
                           'labels'], batch):
        print(name, 'shape:', data.shape)
    break

# centers shape: torch.Size([512, 1])
# contexts_negatives shape: torch.Size([512, 60])
# masks shape: torch.Size([512, 60])
# labels shape: torch.Size([512, 60])

5.訓練模型

損失函數

應用負采樣方法后,可利用最大似然估計的對數等價形式將損失函數定義為如下

\[\sum_{t=1}^T\sum_{-m\le j\le m,j\ne 0} [-\log P(D=1\mid w^{(t)},w^{(t+j)})-\sum_{k=1,w_k\sim P(w)^K}\log P(D=0\mid w^{(t)},w_k)] \]

根據這個損失函數的定義,可以直接使用二元交叉熵損失函數進行計算:

class SigmoidBinaryCrossEntropyLoss(nn.Module):
    def __init__(self):
        super(SigmoidBinaryCrossEntropyLoss, self).__init__()
    def forward(self, inputs, targets, mask=None):
        '''
        @params:
            inputs: 經過sigmoid層后為預測D=1的概率
            targets: 0/1向量,1代表背景詞,0代表噪音詞
        @return:
            res: 平均到每個label的loss
        '''
        inputs, targets, mask = inputs.float(), targets.float(), mask.float()
        res = nn.functional.binary_cross_entropy_with_logits(inputs, targets, reduction="none", weight=mask)
        res = res.sum(dim=1) / mask.float().sum(dim=1)
        return res

loss = SigmoidBinaryCrossEntropyLoss()

pred = torch.tensor([[1.5, 0.3, -1, 2], [1.1, -0.6, 2.2, 0.4]])
label = torch.tensor([[1, 0, 0, 0], [1, 1, 0, 0]]) # 標簽變量label中的1和0分別代表背景詞和噪聲詞
mask = torch.tensor([[1, 1, 1, 1], [1, 1, 1, 0]])  # 掩碼變量
print(loss(pred, label, mask))

def sigmd(x):
    return - math.log(1 / (1 + math.exp(-x)))
print('%.4f' % ((sigmd(1.5) + sigmd(-0.3) + sigmd(1) + sigmd(-2)) / 4)) # 注意1-sigmoid(x) = sigmoid(-x)
print('%.4f' % ((sigmd(1.1) + sigmd(-0.6) + sigmd(-2.2)) / 3))

# tensor([0.8740, 1.2100])
# 0.8740
# 1.2100

模型初始化

embed_size = 100
# 兩層Embedding 分別編碼中心詞向量和背景向量
net = nn.Sequential(nn.Embedding(num_embeddings=len(idx_to_token), embedding_dim=embed_size),
                    nn.Embedding(num_embeddings=len(idx_to_token), embedding_dim=embed_size))

訓練模型

def train(net, lr, num_epochs):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print("train on", device)
    net = net.to(device)
    optimizer = torch.optim.Adam(net.parameters(), lr=lr)
    for epoch in range(num_epochs):
        start, l_sum, n = time.time(), 0.0, 0
        for batch in data_iter:
            center, context_negative, mask, label = [d.to(device) for d in batch]
            
            pred = skip_gram(center, context_negative, net[0], net[1])
            
            l = loss(pred.view(label.shape), label, mask).mean() # 一個batch的平均loss
            optimizer.zero_grad()
            l.backward()
            optimizer.step()
            l_sum += l.cpu().item()
            n += 1
        print('epoch %d, loss %.2f, time %.2fs'
              % (epoch + 1, l_sum / n, time.time() - start))

train(net, 0.01, 5)

# train on cpu
# epoch 1, loss 0.61, time 221.30s
# epoch 2, loss 0.42, time 227.70s
# epoch 3, loss 0.38, time 240.50s
# epoch 4, loss 0.36, time 253.79s
# epoch 5, loss 0.34, time 238.51s

測試模型

def get_similar_tokens(query_token, k, embed):
    '''
    @params:
        query_token: 給定的詞語
        k: 近義詞的個數
        embed: 預訓練詞向量
    '''
    W = embed.weight.data
    x = W[token_to_idx[query_token]]
    # 添加的1e-9是為了數值穩定性
    cos = torch.matmul(W, x) / (torch.sum(W * W, dim=1) * torch.sum(x * x) + 1e-9).sqrt()
    _, topk = torch.topk(cos, k=k+1)
    topk = topk.cpu().numpy()
    for i in topk[1:]:  # 除去輸入詞
        print('cosine sim=%.3f: %s' % (cos[i], (idx_to_token[i])))
        
get_similar_tokens('chip', 3, net[0])

# cosine sim=0.446: intel
# cosine sim=0.427: computer
# cosine sim=0.427: computers


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM