概況:
Beanstalkd,一個高性能、輕量級的分布式內存隊列系統,最初設計的目的是想通過后台異步執行耗時的任務來降低高容量Web應用系統的頁面訪問延遲,支持過有9.5 million用戶的Facebook Causes應用。后來開源,現在有PostRank大規模部署和使用,每天處理百萬級任務。Beanstalkd是典型的類Memcached設計,協議和使用方式都是同樣的風格,所以使用過memcached的用戶會覺得Beanstalkd似曾相識。
beanstalk核心概念:
job:一個需要異步處理的任務,需要放在一個tube中。
tube:一個有名的任務隊列,用來存儲統一類型的job
producer:job的生產者
consumer:job的消費者
簡單來說流程就一句話:
由 producer 產生一個任務 job ,並將 job 推進到一個 tube 中,
然后由 consumer 從 tube 中取出 job 執行(當然了,這一切的操作的前提是beanstalk服務正在運行中)。
一個job有READY(時刻准備着被消費者取出), RESERVED(任務正在被一個消費者處理中), DELAYED(延遲任務,設定的延遲時間后進入ready狀態), BURIED(休眠中,需要轉移狀態后才能操作)四種狀態。當producer直接put一個job時,job就處於READY狀態,等待consumer來處理,如果選擇延遲put,job就先到DELAYED狀態,等待時間過后才遷移到READY狀態。consumer獲取了當前READY的job后,該job的狀態就遷移到RESERVED,這樣其他的consumer就不能再操作該job。當consumer完成該job后,可以選擇delete, release或者bury操作;delete之后,job從系統消亡,之后不能再獲取;release操作可以重新把該job狀態遷移回READY(也可以延遲該狀態遷移操作),使其他的consumer可以繼續獲取和執行該job;有意思的是bury操作,可以把該job休眠,等到需要的時候,再將休眠的job kick回READY狀態,也可以delete BURIED狀態的job。正是有這些有趣的操作和狀態,才可以基於此做出很多意思的應用,比如要實現一個循環隊列,就可以將RESERVED狀態的job休眠掉,等沒有READY狀態的job時再將BURIED狀態的job一次性kick回READY狀態。
beanstalkd擁有的一些特性:
++ producer產生的任務可以給他分配一個優先級,支持0到2**32的優先級,值越小,優先級越高,默認優先級為1024。
優先級高的會被消費者首先執行
++ 持久化,可以通過binlog將job及其狀態記錄到文件里面,在Beanstalkd下次啟動時可以
通過讀取binlog來恢復之前的job及狀態。
++ 分布式容錯,分布式設計和Memcached類似,beanstalkd各個server之間並不知道彼此的存在,
都是通過client來實現分布式以及根據tube名稱去特定server獲取job。
++ 超時控制,為了防止某個consumer長時間占用任務但不能處理的情況,
Beanstalkd為reserve操作設置了timeout時間,如果該consumer不能在指定時間內完成job,
job將被遷移回READY狀態,供其他consumer執行。
枯燥的文字介紹結束,舉個例子看看如何在實際項目中應用
如果你有疑問為什么要用消息隊列,請跳到文末【使用消息隊列的10個理由】
這里還是要提醒一句:
消息隊列有很多用途,也在很多大型網站中有不同程度的使用,使用靈活,能解決很多
問題,但是! 在靈活的背后也是有很多潛在代價的,比如隊列意外掛掉,系統維護難度增大,濫用隊列導致
系統性能下降等。 所以,請確保你的業務邏輯適合使用消息隊列,並且你能處理好意外情況。
測試環境 ( Ubuntu Server 14.04 + PHP5.5 + Beanstalk V1.10 )
例子分析:微博是一個很典型的例子:
1,發一個微博
2,推送給他的粉絲 (如果有100w個粉絲,這個地方會堵塞很久,用戶感受到的就是延遲)
在微博上發布一條內容要做上面兩件事情才算完整,發一條微博只需要進行一次簡單的數據庫操作,
但是推送給他的粉絲卻要操作100w次數據庫,導致用戶發一個微博要等待很長的延遲才能返回結果發送成功。
采用隊列的方式,用戶發送一條微博立馬返回結果,發送成功,剩下的推送就放到隊列里面異步執行,
推送並不需要特別及時,延遲過幾秒幾十秒都是可以接受的。
下面用代碼來實現上面的思路:
首先在ubuntu上安裝beanstalkd服務,
apt-get install beanstalkd
(官網地址:http://kr.github.io/beanstalkd/download.html)
運行beanstalk /etc/init.d/beanstalkd start
如果啟動失敗,在 /etc/default/beanstalkd 中添加 START=yes
需要三個文件:
a.php (任務生產者)
b.php (任務消費者)
beanstalk.php (beanstalkd的php客戶端,這里選的是 https://github.com/davidpersson/beanstalk)
beanstalk支持的客戶端下載:
https://github.com/kr/beanstalkd/wiki/client-libraries
// a.php // 偽代碼如下: // require_once "beanstalk.php"; //接受參數 $user = $_POST['uid']; //發微博的用戶 $content = $_POST['content']; //發送內容 // 插入微博 // 取出當前插入微博的id => wid // 查找出用戶的所有粉絲uid => fensi // 把推送放進隊列 newtask($wid,$fensi); function newtask($wid,$fensi) { $beanstalk = new Client(); $beanstalk->connect(); $beanstalk->useTube('test'); foreach ($fensi as $key => $value) { //參數說明 優先級 延遲執行 執行超時 任務字符串 $task = $value . "#" . $wid; // 粉絲id和微博id $beanstalk->put( 1024, 0, 60, $task ); } $beanstalk->disconnect(); }
//b.php // 偽代碼如下: // require_once "beanstalk.php"; $beanstalk = new Client(); $beanstalk->connect(); $beanstalk->watch('test'); while(true) { $job = $beanstalk->reserve(0); if( !$job ) { // 這個地方一定要休眠 如果不休眠 while循環會導致cpu跑滿 sleep(1); continue; } $result = $job['body']; $pos = explode("#", $result); $weibo = $pos[1]; $uid = $pos[0]; // 插入粉絲推送數據 // ................. // ................. $beanstalk->delete($job['id']); } $beanstalk->disconnect();
beanstalk.php 在這里: https://github.com/davidpersson/beanstalk
使用消息隊列的10個理由:
1. 解耦
在項目啟動之初來預測將來項目會碰到什么需求,是極其困難的。消息隊列在處理過程中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2. 冗余
有時在處理數據的時候處理過程會失敗。除非數據被持久化,否則將永遠丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。在被許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理過程明確的指出該消息已經被處理完畢,確保你的數據被安全的保存直到你使用完畢。
3. 擴展性
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的;只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴展就像調大電力按鈕一樣簡單。
4. 靈活性 & 峰值處理能力
當你的應用上了Hacker News的首頁,你將發現訪問流量攀升到一個不同尋常的水平。在訪問量劇增的情況下,你的應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見;如果為以能處理這類峰值訪問為標准來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住增長的訪問壓力,而不是因為超出負荷的請求而完全崩潰。請查看我們關於峰值處理能力的博客文章了解更多此方面的信息。
5. 可恢復性
當體系的一部分組件失效,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。而這種允許重試或者延后處理請求的能力通常是造就一個略感不便的用戶和一個沮喪透頂的用戶之間的區別。
6. 送達保證
消息隊列提供的冗余機制保證了消息能被實際的處理,只要一個進程讀取了該隊列即可。在此基礎上,IronMQ提供了一個"只送達一次"保證。無論有多少進程在從隊列中領取數據,每一個消息只能被處理一次。這之所以成為可能,是因為獲取一個消息只是"預定"了這個消息,暫時把它移出了隊列。除非客戶端明確的表示已經處理完了這個消息,否則這個消息會被放回隊列中去,在一段可配置的時間之后可再次被處理。
7.排序保證
在許多情況下,數據處理的順序都很重要。消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。IronMO保證消息漿糊通過FIFO(先進先出)的順序來處理,因此消息在隊列中的位置就是從隊列中檢索他們的位置。
8.緩沖
在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列通過一個緩沖層來幫助任務最高效率的執行--寫入隊列的處理會盡可能的快速,而不受從隊列讀的預備處理的約束。該緩沖有助於控制和優化數據流經過系統的速度。
9. 理解數據流
在一個分布式系統里,要得到一個關於用戶操作會用多長時間及其原因的總體印象,是個巨大的挑戰。消息系列通過消息被處理的頻率,來方便的輔助確定那些表現不佳的處理過程或領域,這些地方的數據流都不夠優化。
10. 異步通信
很多時候,你不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許你把一個消息放入隊列,但並不立即處理它。你想向隊列中放入多少消息就放多少,然后在你樂意的時候再去處理它們。
補充一個: 多語言通信,比如用php生產一個job,用python或者其他語言作為消費者來處理