最先提出是這篇論文:https://dl.acm.org/doi/10.5555/2503308.2188410
用於判斷兩個分布p和q是否相同。
基本假設:如果兩個分布生成足夠多的樣本在f上對應的均值都相等,那么可以認為這兩個分布是同一個分布。
基本兩個分布的樣本,通過尋找在樣本空間上的連續函數f,求不用分布的樣本在f上的函數值的均值,通過把兩個均值做差可以得到兩個分布對應於f的mean discrepancy。尋找一個f,使得這個mean discrepancy有最大值。這個最大值就是MMD。取MMD作為檢驗統計量,如果這個值足夠的小,則認為兩個分布相同。否則認為它們不相同。
定義如下:
MMD的經驗估計如下。x,y分別是從p和q通過獨立同分布采樣得到的兩個數據集。
在給定兩個分布的觀測集X,Y的情況下,這個結果會嚴重依賴於給定的函數集F。為了能表示MMD的性質:當且僅當p和q是相同分布的時候MMD為0,那么要求F足夠rich;另一方面為了使檢驗具有足夠的連續性(be consistent in power),從而使得MMD的經驗估計可以隨着觀測集規模增大迅速收斂到它的期望,F必須足夠restrictive。文中證明了當F是universal RKHS上的(unit ball)單位球時,可以滿足上面兩個性質。
上界就是f:be a unit ball in a universal RKHS,比如高斯核或者拉普拉斯核。進一步給定RKHS對應合函數,則MMD的平方可以表示為:
估計如下:
補充:
如何得到。參看知乎https://zhuanlan.zhihu.com/p/1142648310 和 http://iera.name/a-story-of-basis-and-kernel-part-ii-reproducing-kernel-hilbert-space/
代碼參考https://github.com/easezyc/deep-transfer-learning/blob/5e94d519b7bb7f94f0e43687aa4663aca18357de/MUDA/MFSAN/MFSAN_3src/mmd.py
import torch
def guassian_kernel(source, target, kernel_mul=2.0, kernel_num=5, fix_sigma=None):
'''
將源域數據和目標域數據轉化為核矩陣,即上文中的K
Params:
source: 源域數據,行表示樣本數目,列表示樣本數據維度
target: 目標域數據 同source
kernel_mul: 多核MMD,以bandwidth為中心,兩邊擴展的基數,比如bandwidth/kernel_mul, bandwidth, bandwidth*kernel_mul
kernel_num: 取不同高斯核的數量
fix_sigma: 是否固定,如果固定,則為單核MMD
Return:
sum(kernel_val): 多個核矩陣之和
'''
n_samples = int(source.size()[0])+int(target.size()[0])
# 求矩陣的行數,即兩個域的的樣本總數,一般source和target的尺度是一樣的,這樣便於計算
total = torch.cat([source, target], dim=0)#將source,target按列方向合並
#將total復制(n+m)份
total0 = total.unsqueeze(0).expand(int(total.size(0)), int(total.size(0)), int(total.size(1)))
#將total的每一行都復制成(n+m)行,即每個數據都擴展成(n+m)份
total1 = total.unsqueeze(1).expand(int(total.size(0)), int(total.size(0)), int(total.size(1)))
# total1 - total2 得到的矩陣中坐標(i,j, :)代表total中第i行數據和第j行數據之間的差
# sum函數,對第三維進行求和,即平方后再求和,獲得高斯核指數部分的分子,是L2范數的平方
L2_distance_square = ((total0-total1)**2).sum(2)
#調整高斯核函數的sigma值
if fix_sigma:
bandwidth = fix_sigma
else:
bandwidth = torch.sum(L2_distance_square) / (n_samples**2-n_samples)
# 多核MMD
#以fix_sigma為中值,以kernel_mul為倍數取kernel_num個bandwidth值(比如fix_sigma為1時,得到[0.25,0.5,1,2,4]
bandwidth /= kernel_mul ** (kernel_num // 2)
bandwidth_list = [bandwidth * (kernel_mul**i) for i in range(kernel_num)]
print(bandwidth_list)
#高斯核函數的數學表達式
kernel_val = [torch.exp(-L2_distance_square / bandwidth_temp) for bandwidth_temp in bandwidth_list]
#得到最終的核矩陣
return sum(kernel_val)#/len(kernel_val)
def mmd_rbf(source, target, kernel_mul=2.0, kernel_num=5, fix_sigma=None):
'''
計算源域數據和目標域數據的MMD距離
Params:
source: 源域數據,行表示樣本數目,列表示樣本數據維度
target: 目標域數據 同source
kernel_mul: 多核MMD,以bandwidth為中心,兩邊擴展的基數,比如bandwidth/kernel_mul, bandwidth, bandwidth*kernel_mul
kernel_num: 取不同高斯核的數量
fix_sigma: 是否固定,如果固定,則為單核MMD
Return:
loss: MMD loss
'''
source_num = int(source.size()[0])#一般默認為源域和目標域的batchsize相同
target_num = int(target.size()[0])
kernels = guassian_kernel(source, target,
kernel_mul=kernel_mul, kernel_num=kernel_num, fix_sigma=fix_sigma)
#根據式(3)將核矩陣分成4部分
XX = torch.mean(kernels[:source_num, :source_num])
YY = torch.mean(kernels[source_num:, source_num:])
XY = torch.mean(kernels[:source_num, source_num:])
YX = torch.mean(kernels[source_num:, :source_num])
loss = XX + YY -XY - YX
return loss#因為一般都是n==m,所以L矩陣一般不加入計算
import random
import matplotlib
import matplotlib.pyplot as plt
SAMPLE_SIZE = 500
buckets = 50
#第一種分布:對數正態分布,得到一個中值為mu,標准差為sigma的正態分布。mu可以取任何值,sigma必須大於零。
plt.subplot(1,2,1)
plt.xlabel("random.lognormalvariate")
mu = -0.6
sigma = 0.15#將輸出數據限制到0-1之間
res1 = [random.lognormvariate(mu, sigma) for _ in range(1, SAMPLE_SIZE)]
plt.hist(res1, buckets)
#第二種分布:beta分布。參數的條件是alpha 和 beta 都要大於0, 返回值在0~1之間。
plt.subplot(1,2,2)
plt.xlabel("random.betavariate")
alpha = 1
beta = 10
res2 = [random.betavariate(alpha, beta) for _ in range(1, SAMPLE_SIZE)]
plt.hist(res2, buckets)
plt.savefig('data.jpg')
plt.show()
from torch.autograd import Variable
#參數值見上段代碼
#分別從對數正態分布和beta分布取兩組數據
diff_1 = []
for i in range(10):
diff_1.append([random.lognormvariate(mu, sigma) for _ in range(1, SAMPLE_SIZE)])
diff_2 = []
for i in range(10):
diff_2.append([random.betavariate(alpha, beta) for _ in range(1, SAMPLE_SIZE)])
X = torch.Tensor(diff_1)
Y = torch.Tensor(diff_2)
X,Y = Variable(X), Variable(Y)
print(mmd_rbf(X,Y))
from torch.autograd import Variable
#參數值見以上代碼
#從對數正態分布取兩組數據
same_1 = []
for i in range(10):
same_1.append([random.lognormvariate(mu, sigma) for _ in range(1, SAMPLE_SIZE)])
same_2 = []
for i in range(10):
same_2.append([random.lognormvariate(mu, sigma) for _ in range(1, SAMPLE_SIZE)])
X = torch.Tensor(same_1)
Y = torch.Tensor(same_2)
X,Y = Variable(X), Variable(Y)
print(mmd_rbf(X,Y))
[1] https://blog.csdn.net/xiaocong1990/article/details/72051375
[2] https://blog.csdn.net/sinat_34173979/article/details/105876584
[3] https://zhuanlan.zhihu.com/p/114264831
[4] http://songcy.net/posts/story-of-basis-and-kernel-part-2/