MMD理論推導和python實現


最先提出是這篇論文:https://dl.acm.org/doi/10.5555/2503308.2188410
用於判斷兩個分布p和q是否相同。
基本假設:如果兩個分布生成足夠多的樣本在f上對應的均值都相等,那么可以認為這兩個分布是同一個分布。
基本兩個分布的樣本,通過尋找在樣本空間上的連續函數f,求不用分布的樣本在f上的函數值的均值,通過把兩個均值做差可以得到兩個分布對應於f的mean discrepancy。尋找一個f,使得這個mean discrepancy有最大值。這個最大值就是MMD。取MMD作為檢驗統計量,如果這個值足夠的小,則認為兩個分布相同。否則認為它們不相同。
定義如下:

MMD的經驗估計如下。x,y分別是從p和q通過獨立同分布采樣得到的兩個數據集。

在給定兩個分布的觀測集X,Y的情況下,這個結果會嚴重依賴於給定的函數集F。為了能表示MMD的性質:當且僅當p和q是相同分布的時候MMD為0,那么要求F足夠rich;另一方面為了使檢驗具有足夠的連續性(be consistent in power),從而使得MMD的經驗估計可以隨着觀測集規模增大迅速收斂到它的期望,F必須足夠restrictive。文中證明了當F是universal RKHS上的(unit ball)單位球時,可以滿足上面兩個性質。




上界就是f:be a unit ball in a universal RKHS,比如高斯核或者拉普拉斯核。進一步給定RKHS對應合函數,則MMD的平方可以表示為:

估計如下:


補充:





如何得到。參看知乎https://zhuanlan.zhihu.com/p/1142648310 和 http://iera.name/a-story-of-basis-and-kernel-part-ii-reproducing-kernel-hilbert-space/
代碼參考https://github.com/easezyc/deep-transfer-learning/blob/5e94d519b7bb7f94f0e43687aa4663aca18357de/MUDA/MFSAN/MFSAN_3src/mmd.py

import torch

def guassian_kernel(source, target, kernel_mul=2.0, kernel_num=5, fix_sigma=None):
    '''
    將源域數據和目標域數據轉化為核矩陣,即上文中的K
    Params: 
     source: 源域數據,行表示樣本數目,列表示樣本數據維度
     target: 目標域數據 同source
     kernel_mul: 多核MMD,以bandwidth為中心,兩邊擴展的基數,比如bandwidth/kernel_mul, bandwidth, bandwidth*kernel_mul
     kernel_num: 取不同高斯核的數量
     fix_sigma: 是否固定,如果固定,則為單核MMD
 Return:
  sum(kernel_val): 多個核矩陣之和
    '''
    n_samples = int(source.size()[0])+int(target.size()[0])
    # 求矩陣的行數,即兩個域的的樣本總數,一般source和target的尺度是一樣的,這樣便於計算
    total = torch.cat([source, target], dim=0)#將source,target按列方向合並
    #將total復制(n+m)份
    total0 = total.unsqueeze(0).expand(int(total.size(0)), int(total.size(0)), int(total.size(1)))
    #將total的每一行都復制成(n+m)行,即每個數據都擴展成(n+m)份
    total1 = total.unsqueeze(1).expand(int(total.size(0)), int(total.size(0)), int(total.size(1)))
    # total1 - total2 得到的矩陣中坐標(i,j, :)代表total中第i行數據和第j行數據之間的差 
    # sum函數,對第三維進行求和,即平方后再求和,獲得高斯核指數部分的分子,是L2范數的平方
    L2_distance_square = ((total0-total1)**2).sum(2) 
    #調整高斯核函數的sigma值
    if fix_sigma:
        bandwidth = fix_sigma
    else:
        bandwidth = torch.sum(L2_distance_square) / (n_samples**2-n_samples)
    # 多核MMD
    #以fix_sigma為中值,以kernel_mul為倍數取kernel_num個bandwidth值(比如fix_sigma為1時,得到[0.25,0.5,1,2,4]
    bandwidth /= kernel_mul ** (kernel_num // 2)
    bandwidth_list = [bandwidth * (kernel_mul**i) for i in range(kernel_num)]
    print(bandwidth_list)
    #高斯核函數的數學表達式
    kernel_val = [torch.exp(-L2_distance_square / bandwidth_temp) for bandwidth_temp in bandwidth_list]
    #得到最終的核矩陣
    return sum(kernel_val)#/len(kernel_val)

def mmd_rbf(source, target, kernel_mul=2.0, kernel_num=5, fix_sigma=None):
    '''
    計算源域數據和目標域數據的MMD距離
    Params: 
     source: 源域數據,行表示樣本數目,列表示樣本數據維度
     target: 目標域數據 同source
     kernel_mul: 多核MMD,以bandwidth為中心,兩邊擴展的基數,比如bandwidth/kernel_mul, bandwidth, bandwidth*kernel_mul
     kernel_num: 取不同高斯核的數量
     fix_sigma: 是否固定,如果固定,則為單核MMD
 Return:
  loss: MMD loss
    '''
    source_num = int(source.size()[0])#一般默認為源域和目標域的batchsize相同
    target_num = int(target.size()[0])
    kernels = guassian_kernel(source, target,
        kernel_mul=kernel_mul, kernel_num=kernel_num, fix_sigma=fix_sigma)
    #根據式(3)將核矩陣分成4部分
    XX = torch.mean(kernels[:source_num, :source_num])
    YY = torch.mean(kernels[source_num:, source_num:])
    XY = torch.mean(kernels[:source_num, source_num:])
    YX = torch.mean(kernels[source_num:, :source_num])
    loss = XX + YY -XY - YX
    return loss#因為一般都是n==m,所以L矩陣一般不加入計算
import random
import matplotlib
import matplotlib.pyplot as plt

SAMPLE_SIZE = 500
buckets = 50

#第一種分布:對數正態分布,得到一個中值為mu,標准差為sigma的正態分布。mu可以取任何值,sigma必須大於零。
plt.subplot(1,2,1)
plt.xlabel("random.lognormalvariate")
mu = -0.6
sigma = 0.15#將輸出數據限制到0-1之間
res1 = [random.lognormvariate(mu, sigma) for _ in range(1, SAMPLE_SIZE)]
plt.hist(res1, buckets)

#第二種分布:beta分布。參數的條件是alpha 和 beta 都要大於0, 返回值在0~1之間。
plt.subplot(1,2,2)
plt.xlabel("random.betavariate")
alpha = 1
beta = 10
res2 = [random.betavariate(alpha, beta) for _ in range(1, SAMPLE_SIZE)]
plt.hist(res2, buckets)

plt.savefig('data.jpg')
plt.show()
from torch.autograd import Variable

#參數值見上段代碼
#分別從對數正態分布和beta分布取兩組數據
diff_1 = []
for i in range(10):
    diff_1.append([random.lognormvariate(mu, sigma) for _ in range(1, SAMPLE_SIZE)])

diff_2 = []
for i in range(10):
    diff_2.append([random.betavariate(alpha, beta) for _ in range(1, SAMPLE_SIZE)])

X = torch.Tensor(diff_1)
Y = torch.Tensor(diff_2)
X,Y = Variable(X), Variable(Y)
print(mmd_rbf(X,Y))
from torch.autograd import Variable

#參數值見以上代碼
#從對數正態分布取兩組數據
same_1 = []
for i in range(10):
    same_1.append([random.lognormvariate(mu, sigma) for _ in range(1, SAMPLE_SIZE)])

same_2 = []
for i in range(10):
    same_2.append([random.lognormvariate(mu, sigma) for _ in range(1, SAMPLE_SIZE)])

X = torch.Tensor(same_1)
Y = torch.Tensor(same_2)
X,Y = Variable(X), Variable(Y)
print(mmd_rbf(X,Y))

[1] https://blog.csdn.net/xiaocong1990/article/details/72051375
[2] https://blog.csdn.net/sinat_34173979/article/details/105876584
[3] https://zhuanlan.zhihu.com/p/114264831
[4] http://songcy.net/posts/story-of-basis-and-kernel-part-2/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM