之前在微博上調查過大家正在使用的分布式內存隊列系統,反饋有Memcacheq,Fqueue, RabbitMQ, Beanstalkd以及linkedin的kafka。RabbitMQ使用比較廣泛,Beanstalkd是后起之秀。Beanstalkd之於RabbitMQ,就好比Nginx之於Apache,Varnish之於Squid。后面在項目中使用Beanstalkd的過程中,更發現其簡單、輕量級、高性能、易使用等特點,以及優先級、多隊列、持久化、分布式容錯、超時控制等特性。下面就簡單介紹一下Beanstalkd,不足之處請大家指正。
設計思想
高性能離不開異步,異步離不開隊列,而其內部都是Producer-Comsumer模式的原理。

圖1 Producer-Comsumer模式
應用
Beanstalkd,一個高性能、輕量級的分布式內存隊列系統,最初設計的目的是想通過后台異步執行耗時的任務來降低高容量Web應用系統的頁面訪問延遲,支持過有9.5 million用戶的Facebook Causes應用。后來開源,現在有PostRank大規模部署和使用,每天處理百萬級任務。Beanstalkd是典型的類Memcached設計,協議和使用方式都是同樣的風格,所以使用過memcached的用戶會覺得Beanstalkd似曾相識。
核心概念
Beanstalkd設計里面的核心概念:
◆ job
一個需要異步處理的任務,是Beanstalkd中的基本單元,需要放在一個tube中。
◆ tube
一個有名的任務隊列,用來存儲統一類型的job,是producer和consumer操作的對象。
◆ producer
Job的生產者,通過put命令來將一個job放到一個tube中。
◆ consumer
Job的消費者,通過reserve/release/bury/delete命令來獲取job或改變job的狀態。
Beanstalkd中一個job的生命周期如圖2所示。一個job有READY, RESERVED, DELAYED, BURIED四種狀態。當producer直接put一個job時,job就處於READY狀態,等待consumer來處理,如果選擇延遲put,job就先到DELAYED狀態,等待時間過后才遷移到READY狀態。consumer獲取了當前READY的job后,該job的狀態就遷移到RESERVED,這樣其他的consumer就不能再操作該job。當consumer完成該job后,可以選擇delete, release或者bury操作;delete之后,job從系統消亡,之后不能再獲取;release操作可以重新把該job狀態遷移回READY(也可以延遲該狀態遷移操作),使其他的consumer可以繼續獲取和執行該job;有意思的是bury操作,可以把該job休眠,等到需要的時候,再將休眠的job kick回READY狀態,也可以delete BURIED狀態的job。正是有這些有趣的操作和狀態,才可以基於此做出很多意思的應用,比如要實現一個循環隊列,就可以將RESERVED狀態的job休眠掉,等沒有READY狀態的job時再將BURIED狀態的job一次性kick回READY狀態。

圖2 Beanstalkd中job的生命周期
特性
Beanstalkd基於的源碼安裝和使用很簡單,在此略過。這里重點介紹一下其幾個很nice的特性。
◆ 優先級
支持0到2**32的優先級,值越小,優先級越高,默認優先級為1024。
◆ 持久化
可以通過binlog將job及其狀態記錄到文件里面,在Beanstalkd下次啟動時可以通過讀取binlog來恢復之前的job及狀態。
◆ 分布式容錯
分布式設計和Memcached類似,beanstalkd各個server之間並不知道彼此的存在,都是通過client來實現分布式以及根據tube名稱去特定server獲取job。
◆ 超時控制
為了防止某個consumer長時間占用任務但不能處理的情況,Beanstalkd為reserve操作設置了timeout時間,如果該consumer不能在指定時間內完成job,job將被遷移回READY狀態,供其他consumer執行。
不足
在使用中發現一個Beanstalkd尚無提供刪除一個tube的操作,只能將tube的job依次刪除,並讓Beanstalkd來自動刪除空tube。還有就是Beanstalkd不支持客戶端認證機制(開發者將應用場景定位在局域網)。
后續工作
1.介紹Beanstalkd的命令和使用
2. 翻譯Beanstalkd協議
3. 分析Beanstalkd源碼
原文:http://rdc.taobao.com/blog/cs/?p=1201
最近在做一個項目,需要用戶在提交相關信息后,分析信息內容,然后將分析結果推送到相關的用戶的信息模塊中,用到了beanstalk這個隊列系統。
beanstalkd介紹:
Beanstalkd,一個高性能、輕量級的分布式內存隊列系統,最初設計的目的是想通過后台異步執行耗時的任務來降低高容量Web應用系統的頁面訪問延遲,支持過有9.5 million用戶的Facebook Causes應用。后來開源,現在有PostRank大規模部署和使用,每天處理百萬級任務。Beanstalkd是典型的類Memcached設計,協議和使用方式都是同樣的風格,所以使用過memcached的用戶會覺得Beanstalkd似曾相識。
Beanstalkd中一個job的生命周期如圖所示。一個job有READY, RESERVED, DELAYED, BURIED四種狀態。當producer直接put一個job時,job就處於READY狀態,等待consumer來處理,如果選擇延遲put,job就先到DELAYED狀態,等待時間過后才遷移到READY狀態。consumer獲取了當前READY的job后,該job的狀態就遷移到RESERVED,這樣其他的consumer就不能再操作該job。當consumer完成該job后,可以選擇delete, release或者bury操作;delete之后,job從系統消亡,之后不能再獲取;release操作可以重新把該job狀態遷移回READY(也可以延遲該狀態遷移操作),使其他的consumer可以繼續獲取和執行該job;有意思的是bury操作,可以把該job休眠,等到需要的時候,再將休眠的job kick回READY狀態,也可以delete BURIED狀態的job。正是有這些有趣的操作和狀態,才可以基於此做出很多意思的應用,比如要實現一個循環隊列,就可以將RESERVED狀態的job休眠掉,等沒有READY狀態的job時再將BURIED狀態的job一次性kick回READY狀態。
特性:
為了防止某個consumer長時間占用任務但不能處理的情況,Beanstalkd為reserve操作設置了timeout時間,如果該consumer不能在指定時間內完成job,job將被遷移回READY狀態,供其他consumer執行。
下載:
服務端:http://kr.github.io/beanstalkd/download.html
客戶端:https://github.com/kr/beanstalkd/wiki/client-libraries
安裝:
ubuntu
sudo apt-get install beanstalkd
centos
yum install beanstalkd
源碼安裝
tar -zxvf /usr/bin/beanstalkd/beanstalkd-1.9.tar.gz cd beanstalkd make install PERFIX=/usr/bin/beanstalkd
后台啟動:
beanstalkd -l 地址 -p 端口號 -z 最大的任務大小(byte) -c &
如果是外部客戶端連接,ip地址要寫外網地址,這樣才能連接上
啟動選項
-b DIR wal directory
-f MS fsync at most once every MS milliseconds (use -f0 for “always fsync”)
-F never fsync (default)
-l ADDR listen on address (default is 0.0.0.0)
-p PORT listen on port (default is 11300)
-u USER become user and group
-z BYTES set the maximum job size in bytes (default is 65535)
-s BYTES set the size of each wal file (default is 10485760)
(will be rounded up to a multiple of 512 bytes)
-c compact the binlog (default)
-n do not compact the binlog
-v show version information
-V increase verbosity
-h show this help
php客戶端的使用:我使用的是這個簡易的類 https://github.com/davidpersson/beanstalk
發送任務:
<?php //發送任務 require_once 'src/Socket/Beanstalk.php'; //實例化beanstalk $beanstalk = new Socket_Beanstalk(array( 'persistent' => false, //是否長連接 'host' => 'ip地址', 'port' => 11600, //端口號默認11300 'timeout' => 3 //連接超時時間 )); if (!$beanstalk->connect()) { exit(current($beanstalk->errors())); } //選擇使用的tube $beanstalk->useTube('test'); //往tube中增加數據 $put = $beanstalk->put( 23, // 任務的優先級. 0, // 不等待直接放到ready隊列中. 60, // 處理任務的時間. 'hello, beanstalk' // 任務內容 ); if (!$put) { exit('commit job fail'); } $beanstalk->disconnect();
處理任務:
<?php require_once 'src/Socket/Beanstalk.php'; //實例化beanstalk $beanstalk = new Socket_Beanstalk(array( 'persistent' => false, //是否長連接 'host' => 'ip地址', 'port' => 11600, //端口號默認11300 'timeout' => 3 //連接超時時間 )); if (!$beanstalk->connect()) { exit(current($beanstalk->errors())); } //查看beanstalkd狀態 //var_dump($beanstalk->stats()); //查看有多少個tube //var_dump($beanstalk->listTubes()); $beanstalk->useTube('test'); //設置要監聽的tube $beanstalk->watch('test'); //取消對默認tube的監聽,可以省略 $beanstalk->ignore('default'); //查看監聽的tube列表 //var_dump($beanstalk->listTubesWatched()); //查看test的tube當前的狀態 //var_dump($beanstalk->statsTube('test')); while (true) { //獲取任務,此為阻塞獲取,直到獲取有用的任務為止 $job = $beanstalk->reserve(); //返回格式array('id' => 123, 'body' => 'hello, beanstalk') //處理任務 $result = doJob($job['body']); if ($result) { //刪除任務 $beanstalk->delete($job['id']); } else { //休眠任務 $beanstalk->bury($job['id']); } //跳出無限循環 if (file_exists('shutdown')) { file_put_contents('shutdown', 'beanstalkd在'.date('Y-m-d H:i:s').'關閉'); break; } } $beanstalk->disconnect();
原文:
http://blog.chedushi.com/archives/8026