一、直接采樣
直接采樣的思想是,通過對均勻分布采樣,實現對任意分布的采樣。因為均勻分布采樣好猜,我們想要的分布采樣不好采,那就采取一定的策略通過簡單采取求復雜采樣。
假設y服從某項分布p(y),其累積分布函數CDF為h(y),有樣本z~Uniform(0,1),我們令 z = h(y),即 y = h(z)^(-1),結果y即為對分布p(y)的采樣。
直接采樣的核心思想在與CDF以及逆變換的應用。在原分布p(y)中,如果某個區域[a, b]的分布較多,然后對應在CDF曲線中,[h(a), h(b)]的曲線斜率便較大。那么,經過逆變換之后,對y軸(z)進行均勻分布采樣時,分布多的部分(占據y軸部分多)對應抽樣得到的概率便更大,
局限性
實際中,所有采樣的分布都較為復雜,CDF的求解和反函數的求解都不太可行。
二、拒絕采樣
拒絕采樣是由一個易於采樣的分布出發,為一個難以直接采樣的分布產生采樣樣本的通用算法。既然 p(x) 太復雜在程序中沒法直接采樣,那么便一個可抽樣的分布 q(x) 比如高斯分布,然后按照一定的方法拒絕某些樣本,達到接近 p(x) 分布的目的。
計算步驟
設定一個方便抽樣的函數 q(x),以及一個常量 k,使得 p(x) 總在 k*q(x) 的下方。(參考上圖)
- x 軸方向:從 q(x) 分布抽樣得到 a;
- y 軸方向:從均勻分布(0, k*q(a)) 中抽樣得到 u;
- 如果剛好落到灰色區域: u > p(a), 拒絕, 否則接受這次抽樣;
- 重復以上過程。
計算步驟(BN)
- 根據網絡指定的先驗概率分布生成采樣樣本;
- 拒絕所有與證據不匹配的樣本;
- 在剩余樣本中對事件X=x的出現頻繁程度計數從而得到估計概率、
局限性
- 拒絕了太多的樣本!隨着證據變量個數的增多,與證據e相一致的樣本在所有樣本中所占的比例呈指數級下降,所以對於復雜問題這種方法是完全不可用的。
- 難以找到合適的k*q(a),接受概率可能會很低。
三、重要性采樣(似然加權)
重要性采樣主要是用於求一個復雜分布p(x)的均值,最后並沒有得到樣本。
重要性采樣的思想是借助一個易於采樣的簡單分布q(x),對這個簡單分布q(x)所得到的樣本全部接受。但是以此得到的樣本肯定不滿足分布p(x),因此需要對每一個樣本附加相應的重要性權重。在重要性采樣中,以p(x0)/q(x0)的值作為每個樣本的權重。這樣,當樣本和分布p(x)相近時,對應的權重大;與分布p(x)相差過多時,對應的權重小。這個方法采樣得到的是帶有重要性權重的服從q(z)分布的樣本,這個權重乘以樣本之后的結果其實就是服從p(z)分布的。
通過上述公式,我們可以知道重要性采樣可以用於近似復雜分布的均值。
四、吉布斯采樣
假設有一個例子:E:吃飯、學習、打球;時間T:上午、下午、晚上;天氣W:晴朗、刮風、下雨。樣本(E,T,W)滿足一定的概率分布。現要對其進行采樣,如:打球+下午+晴朗。
問題是我們不知道p(E,T,W),或者說,不知道三件事的聯合分布。當然,如果知道的話,就沒有必要用吉布斯采樣了。但是,我們知道三件事的條件分布。也就是說,p(E|T,W), p(T|E,W), p(W|E,T)。現在要做的就是通過這三個已知的條件分布,再用Gibbs sampling的方法,得到聯合分布。
具體方法:首先隨便初始化一個組合,i.e. 學習+晚上+刮風,然后依條件概率改變其中的一個變量。具體說,假設我們知道晚上+刮風,我們給E生成一個變量,比如,學習→吃飯。我們再依條件概率改下一個變量,根據學習+刮風,把晚上變成上午。類似地,把刮風變成刮風(當然可以變成相同的變量)。這樣學習+晚上+刮風→吃飯+上午+刮風。同樣的方法,得到一個序列,每個單元包含三個變量,也就是一個馬爾可夫鏈。然后跳過初始的一定數量的單元(比如100個),然后隔一定的數量取一個單元(比如隔20個取1個)。這樣sample到的單元,是逼近聯合分布的。
五、蓄水池采樣
蓄水池抽樣(Reservoir Sampling ),即能夠在o(n)時間內對n個數據進行等概率隨機抽取,例如:從1000個數據中等概率隨機抽取出100個。另外,如果數據集合的量特別大或者還在增長(相當於未知數據集合總量),該算法依然可以等概率抽樣。
算法步驟:
- 先選取數據流中的前k個元素,保存在集合A中;
- 從第j(k + 1 <= j <= n)個元素開始,每次先以概率p = k/j選擇是否讓第j個元素留下。若j被選中,則從A中隨機選擇一個元素並用該元素j替換它;否則直接淘汰該元素;
- 重復步驟2直到結束,最后集合A中剩下的就是保證隨機抽取的k個元素。
六、MCMC算法
馬氏鏈收斂定理
馬氏鏈定理:如果一個非周期馬氏鏈具有轉移概率矩陣P,且它的任何兩個狀態是連通的,那么\(\lim_{p\to\infty}P_{ij}^n\)存在且與i無關,記\(\lim_{p\to\infty}P_{ij}^n = \pi(j)\),我們有:
其中\(\pi = [\pi(1), \pi(2), ... , \pi(j), ...], \sum_{i=0}^{\infty}\pi_i = 1, \pi\)稱為馬氏鏈的平穩分布。
所有的 MCMC(Markov Chain Monte Carlo) 方法都是以這個定理作為理論基礎的。
說明:
- 該定理中馬氏鏈的狀態不要求有限,可以是有無窮多個的;
- 定理中的“非周期“這個概念不解釋,因為我們遇到的絕大多數馬氏鏈都是非周期的;
細致平穩條件
針對一個新的分布,如何構造對應的轉移矩陣?
對於一個分布\(\pi(x)\),根據細致平穩條件,如果構造的轉移矩陣P滿足\(\pi(i)P_{ij} = \pi(j)P_{ji}\),那么\(\pi(x)\)即為該馬氏鏈的平穩分布,因此可以根據這個條件去構造轉移矩陣。
通常情況下,初始的轉移矩陣\(P\)一般不滿足細致平穩條件,因此我們通過引入接受率構造出新的轉移矩陣\(P'\),使其和\(\pi(x)\)滿足細致平穩條件。由此,我們便可以用任何轉移概率矩陣(均勻分布、高斯分布)作為狀態間的轉移概率。
如果我們假設狀態之間的轉移概率是相同的,那么在算法實現時,接收率可以簡單得用\(\pi(j)/\pi(i)\)表示。
Metropolis-Hastings采樣
對於給定的概率分布p(x),我們希望能有便捷的方式生成它對應的樣本。由於馬氏鏈能收斂到平穩分布, 於是一個很的漂亮想法是:如果我們能構造一個轉移矩陣為P的馬氏鏈,使得該馬氏鏈的平穩分布恰好是p(x), 那么我們從任何一個初始狀態x0出發沿着馬氏鏈轉移, 得到一個轉移序列 x0,x1,x2,⋯xn,xn+1⋯,, 如果馬氏鏈在第n步已經收斂了,於是我們就得到了 π(x) 的樣本xn,xn+1⋯。
在馬爾科夫狀態鏈中,每一個狀態代表一個樣本\(x_n\),即所有變量的賦值情況。
通過分析MCMC源碼,可以知道:假設狀態間的轉移概率相同,那么下一個樣本的采樣會依賴於上一個樣本。假設上一個樣本所對應的原始分布概率\(\pi(x)\)很小,那么下一個樣本的接受率很大概率為1;反之如果上一個樣本的原始分布概率\(\pi(x)\)很大,那么下一個樣本會有挺大概率被拒絕。這樣的機制保證了生成的樣本服從分布\(\pi(x)\)。
從上述分析可以看出,假如初始狀態的樣本對應的分布概率很小,那么在算法剛開始運行時所產生的樣本(即使是分布概率很小的樣本)很大可能都會被接收,從而使得算法剛開始運行時采樣的樣本不滿足原始分布\(\pi(x)\)。只要算法采樣到分布概率大的樣本(此時即為收斂!),那么之后所采樣得到的樣本就會基本服從原始分布。當然,從初始狀態遍歷到分布概率大的狀態時需要運行一段時間,這段過程即為收斂的過程。MCMC算法在收斂之后,保證了在分布概率\(\pi(x)\)大的地方產生更多的樣本,在分布概率\(\pi(x)\)小的地方產生較少的樣本。
一個馬爾可夫鏈需要經過多次的狀態轉移過程采用達到一個穩定狀態,這時候采樣才比較接近真實的分布。此過程稱為burn in。一般可通過丟棄前面的N個采樣結果來達到burn in。
疑問
-
MCMC的收斂是什么意思?這個過程中是什么參數會更新導致收斂?如何確定何時收斂?
收斂過程沒有參數會更新,收斂的思想類似於大數定理。應用MCMC算法采樣時,初始的樣本量少,服從的分布可能和復雜分布\(\pi(x)\)相差挺遠,但隨着狀態轉移數的增加(轉移矩陣P的應用),根據上述定理的證明,最終的樣本分布會逐漸服從復雜分布\(\pi(x)\)。
-
\(\pi\)是每個狀態所對應的概率分布嗎?如果是的話,初始選定一個狀態后,這個\(\pi\)如何設定?或則在MCMC證明過程中,初始\(\pi\)的概率分布如何設置?
在MCMC的證明過程中,\(\pi\)是每個狀態所對應的概率分布。證明中所給定的初始\(\pi\)應該只是為了證明無論初始樣本符合什么分布,在經過一定數量的轉移之后,得到的樣本會服從復雜分布\(\pi (x)\),在實際代碼實現中,不用對這個\(\pi\)進行設定。
七、代碼
import numpy as np
import random
import matplotlib.pyplot as plt
import pandas as pdf
Rejection Sampling
def f(x):
if 0 <= x and x <= 0.25:
y = 8 * x
elif 0.25 < x and x <= 1:
y = (1 - x) * 8/3
else:
y = 0
return y
def g(x):
if 0 <= x and x <= 1:
y = 1
else:
y = 0
return y
def plot(fun):
X = np.arange(0, 1.0, 0.01)
Y = []
for x in X:
Y.append(fun(x))
plt.plot(X, Y)
plt.xlabel("x")
plt.ylabel("y")
plt.show()
plot(f)
plot(g)
def rejection_sampling(N=10000):
M = 3
cnt = 0
samples = {}
while cnt < N:
x = random.random()
acc_rate = f(x) / (M * g(x))
u = random.random()
if acc_rate >= u:
if samples.get(x) == None:
samples[x] = 1
else:
samples[x] = samples[x] + 1
cnt = cnt + 1
return samples
s = rejection_sampling(100000)
X = []
Y = []
for k, v in s.items():
X.append(k)
Y.append(v)
plt.hist(X, bins=100, edgecolor='None')
MCMC Sampling
Metropolis-Hastings Algorithm
PI = 3.1415926
def get_p(x):
# 模擬pi函數
return 1/(2*PI)*np.exp(- x[0]**2 - x[1]**2)
def get_tilde_p(x):
# 模擬不知道怎么計算Z的PI,20這個值對於外部采樣算法來說是未知的,對外只暴露這個函數結果
return get_p(x)
def domain_random(): #計算定義域一個隨機值
return np.random.random()*3.8-1.9
def metropolis(x):
new_x = (domain_random(),domain_random()) #新狀態
#計算接收概率
acc = min(1,get_tilde_p((new_x[0],new_x[1]))/get_tilde_p((x[0],x[1])))
#使用一個隨機數判斷是否接受
u = np.random.random()
if u<acc:
return new_x
return x
def testMetropolis(counts = 100,drawPath = False):
plt.figure()
#主要邏輯
x = (domain_random(),domain_random()) #x0
xs = [x] #采樣狀態序列
for i in range(counts):
xs.append(x)
x = metropolis(x) #采樣並判斷是否接受
#在各個狀態之間繪制跳轉的線條幫助可視化
X1 = [x[0] for x in xs]
X2 = [x[1] for x in xs]
if drawPath:
plt.plot(X1, X2, 'k-',linewidth=0.5)
##繪制采樣的點
plt.scatter(X1, X2, c = 'g',marker='.')
plt.show()
testMetropolis(5000)
def metropolis(x):
new_x = domain_random()
#計算接收概率
acc = min(1,f(new_x)/f(x))
#使用一個隨機數判斷是否接受
u = np.random.random()
if u<acc:
return new_x
return x
def testMetropolis(counts = 100,drawPath = False):
plt.figure()
#主要邏輯
x = domain_random()
xs = [x] #采樣狀態序列
for i in range(counts):
xs.append(x)
x = metropolis(x) #采樣並判斷是否接受
#在各個狀態之間繪制跳轉的線條幫助可視化
plt.hist(xs, bins=100, edgecolor='None')
# plt.plot(xs)
plt.show()
testMetropolis(100000)
Gibbs Sampling
def partialSampler(x,dim):
xes = []
for t in range(10): #隨機選擇10個點
xes.append(domain_random())
tilde_ps = []
for t in range(10): #計算這10個點的未歸一化的概率密度值
tmpx = x[:]
tmpx[dim] = xes[t]
tilde_ps.append(get_tilde_p(tmpx))
#在這10個點上進行歸一化操作,然后按照概率進行選擇。
norm_tilde_ps = np.asarray(tilde_ps)/sum(tilde_ps)
u = np.random.random()
sums = 0.0
for t in range(10):
sums += norm_tilde_ps[t]
if sums>=u:
return xes[t]
def gibbs(x):
rst = np.asarray(x)[:]
path = [(x[0],x[1])]
for dim in range(2): #維度輪詢,這里加入隨機也是可以的。
new_value = partialSampler(rst,dim)
rst[dim] = new_value
path.append([rst[0],rst[1]])
#這里最終只畫出一輪輪詢之后的點,但會把路徑都畫出來
return rst,path
def testGibbs(counts = 100,drawPath = False):
plt.figure()
x = (domain_random(),domain_random())
xs = [x]
paths = [x]
for i in range(counts):
xs.append([x[0],x[1]])
x,path = gibbs(x)
paths.extend(path) #存儲路徑
p1 = [x[0] for x in paths]
p2 = [x[1] for x in paths]
xs1 = [x[0] for x in xs]
xs2 = [x[1] for x in xs]
if drawPath:
plt.plot(p1, p2, 'k-',linewidth=0.5)
##繪制采樣的點
plt.scatter(xs1, xs2, c = 'g',marker='.')
plt.show()
testGibbs(5000)