目錄
1、基本概念
1.1、什么是Beanstalkd?
Beanstalkd 是一個輕量級消息中間件,它最大特點是將自己定位為基於管道 (tube) 和任務 (job) 的工作隊列。
Beanstalkd 支持任務優先級 (priority), 延時 (delay), 超時重發 (time-to-run) 和預留 (buried), 能夠很好的支持分布式的后台任務和定時任務處理。它的內部采用libevent,服務器-客戶端之間采用類似Memcached的輕量級通訊協議,因此性能很高(enque: 9000 jobs/second, worker: 5200 jobs/second)
盡管是內存隊列, Beanstalkd 提供了 binlog 機制, 當重啟 beanstalkd 時,當前任務狀態能夠從紀錄的本地 binlog 中恢復。Beanstalkd支持過有9.5 million用戶的Facebook Causes應用。后來開源,現在有PostRank大規模部署和使用,每天處理百萬級任務。Beanstalkd是典型的類Memcached設計,協議和使用方式都是同樣的風格,所以使用過Memcached的用戶會覺得Beanstalkd似曾相識。
Beanstalkd支持的語言有很多,可以參考這里:https://github.com/kr/beanstalkd/wiki/client-libraries
1.2、Beanstalkd設計的核心概念?
- job
一個需要異步處理的任務,是Beanstalkd中的基本單元,需要放在一個tube中。
- tube
一個有名的任務隊列,用來存儲統一類型的job,是producer和consumer操作的對象。
- producer
Job的生產者,通過put命令來將一個job放到一個tube中。
- consumer
Job的消費者,通過reserve/release/bury/delete命令來獲取job或改變job的狀態。
1.3、Beanstalkd中Job的生命周期介紹
Beanstalkd的流程如下圖:
- 當producer直接put一個job時,job就處於READY狀態,等待consumer來處理,如果選擇延遲put,job就先到DELAYED狀態,等待時間過后才遷移到READY狀態。
- consumer獲取了當前READY的job后,該job的狀態就遷移到RESERVED,這樣其他的consumer就不能再操作該job。
- 當consumer完成該job后,可以選擇delete, release或者bury操作;
- delete之后,job從系統消亡,之后不能再獲取
- release操作可以重新把該job狀態遷移回READY(也可以延遲該狀態遷移操作),使其他的consumer可以繼續獲取和執行該job
- bury操作,可以把該job休眠,等到需要的時候,再將休眠的job kick回READY狀態,也可以delete buride狀態的job
正是有這些有趣的操作和狀態,才可以基於此做出很多意思的應用,比如要實現一個循環隊列,就可以將RESERVED狀態的job休眠掉,等沒有READY狀態的job時再將BURIED狀態的job一次性kick回READY狀態。
1.4、Beanstalkd有什么不足?
Beanstalkd 沒有提供主備同步 + 故障切換機制, 在應用中有成為單點的風險。實際應用中,可以用數據庫為任務 (job) 提供持久化存儲。 和 Memcached 類似, Beanstalkd 依賴 libevent 的單線程事件分發機制, 不能有效利用多核 cpu 的性能。這一點可以通過單機部署多個實例克服。
一個Beanstalkd尚無提供刪除一個tube的操作,只能將tube的job依次刪除,並讓Beanstalkd來自動刪除空tube。還有就是Beanstalkd不支持客戶端認證機制(開發者將應用場景定位在局域網)。
Beanstalk速度非常快,協議簡單,占用內存空間少,而且支持持久化。唯一的不足是掛了之后恢復慢,3G日志數據恢復了十多分鍾。
2、Beanstalkd的官網在哪里?
http://kr.github.io/beanstalkd/
3、Beanstalkd在哪里下載?
http://kr.github.io/beanstalkd/download.html
4、如何安裝Beanstalkd?
使用下面的命令進行安裝,同時查看版本:
lion@node1:~$ sudo apt-get install beanstalkd
lion@node1:~$ beanstalkd -v
beanstalkd 1.9
Beanstalkd可以使用以下命令停止和啟動:
lion@node1:~$ sudo service beanstalkd stop #停止
lion@node1:~$ sudo service beanstalkd start #啟動
通過apt-get安裝后的配置文件目錄在/etc/default/beanstalkd,里面描述了Beanstalkd監聽的地址和端口
lion@node1:~$ cat /etc/default/beanstalkd
## Defaults for the beanstalkd init script, /etc/init.d/beanstalkd on
## Debian systems.
BEANSTALKD_LISTEN_ADDR=127.0.0.1
BEANSTALKD_LISTEN_PORT=11300
# You can use BEANSTALKD_EXTRA to pass additional options. See beanstalkd(1)
# for a list of the available options. Uncomment the following line for
# persistent job storage.
#BEANSTALKD_EXTRA="-b /var/lib/beanstalkd"
4.1、安裝beanstool管理工具
beanstool是一個Beanstalkd管理工具,不需要任何依賴就可以使用,是用Golang編寫的。
lion@node1:~/_app$ wget https://github.com/src-d/beanstool/releases/download/v0.2.0/beanstool_v0.2.0_linux_amd64.tar.gz
lion@node1:~/_app$ tar -xvzf beanstool_v0.2.0_linux_amd64.tar.gz
lion@node1:~/_app$ sudo cp beanstool_v0.2.0_linux_amd64/beanstool /usr/local/bin/
5、Beanstalkd調用案例
我們是使用Golang語言來調用Beanstalkd的,在官網上可以看到有現成的Golang包Beanstalk,使用下面的命令先安裝Beanstalk
lion@node1:~$ go get github.com/kr/beanstalk
5.1、使用默認的Tube發送、接收消息
producer_default.go(Produce jobs):
package main
import (
"fmt"
"github.com/kr/beanstalk"
"time"
"log"
)
const (
//uri
uri = "127.0.0.1:11300"
//network
network = "tcp"
//
content = "hello idoall.org"
)
//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
conn, err := beanstalk.Dial(network, uri)
failOnError(err, "Failed to connect to beanstalkd")
defer conn.Close()
//把job放入tube中並設置優先級
//
id, err := conn.Put(
[]byte(content), //內容
1, //優先級 0~2^32 個優先級, 0 代表最高優先級,默認優先級為1024。
0, //要延遲發送的時間,0表示不延遲
time.Minute)
failOnError(err, "Failed puts a job into tube")
fmt.Println("job", id)
}
consumer_default.go:
package main
import (
"fmt"
"github.com/kr/beanstalk"
"time"
"log"
)
const (
//uri
uri = "127.0.0.1:11300"
//network
network = "tcp"
)
//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
conn, err := beanstalk.Dial(network, uri)
failOnError(err, "Failed to connect to beanstalkd")
defer conn.Close()
//
id, body, err := conn.Reserve(1 * time.Second)
failOnError(err, "returns a job from one of the tubes failed")
fmt.Println("job", id)
fmt.Println(string(body))
err = conn.Delete(id)
failOnError(err, "delete jobs failed")
}
**查看結果 **
Console1(Procuder):
lion@node1:~/_code/_beanstalk/_golang$ go run producer.go
job 1
使用beanstool工具查看隊列狀態:
lion@node1:~/_app$ beanstool stats
+---------+----------+----------+----------+----------+----------+----------+----------+
| Name | Buried | Delayed | Ready | Reserved | Urgent | Waiting | Total |
+---------+----------+----------+----------+----------+----------+----------+----------+
| default | 0 | 0 | 1 | 0 | 1 | 0 | 1 |
+---------+----------+----------+----------+----------+----------+----------+----------+
Console2(Consumer):
lion@node1:~/_code/_beanstalk/_golang$ go run consumer_default.go
job 1
hello idoall.org
5.2、使用指定的Tube發送、接收消息
producer_tube_idoall.go(Produce jobs):
package main
import (
"fmt"
"github.com/kr/beanstalk"
"time"
"log"
)
const (
//uri
uri = "127.0.0.1:11300"
//network
network = "tcp"
//
tubeName = "test-idoall-tube"
//
content = "hello idoall.org from idoall tube"
)
//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
conn, err := beanstalk.Dial(network, uri)
failOnError(err, "Failed to connect to beanstalkd")
defer conn.Close()
tube := &beanstalk.Tube{conn, tubeName}
//把job放入tube中並設置優先級
//
id, err := tube.Put(
[]byte(content), //內容
1, //優先級 0~2^32 個優先級, 0 代表最高優先級,默認優先級為1024。
0, //要延遲發送的時間,0表示不延遲
time.Minute)
failOnError(err, "Failed puts a job into tube")
fmt.Println("job", id)
conn.Close()
}
consumer_tube_idoall.go:
package main
import (
"fmt"
"github.com/kr/beanstalk"
"time"
"log"
)
const (
//uri
uri = "127.0.0.1:11300"
//network
network = "tcp"
//
tubeName = "test-idoall-tube"
)
//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
conn, err := beanstalk.Dial(network, uri)
failOnError(err, "Failed to connect to beanstalkd")
defer conn.Close()
tubeSet := beanstalk.NewTubeSet(conn, tubeName)
//
id, body, err := tubeSet.Reserve(10 * time.Hour)
failOnError(err, "returns a job from one of the tubes failed")
fmt.Println("job", id)
fmt.Println(string(body))
err = conn.Delete(id)
failOnError(err, "delete jobs failed")
}
查看結果
Console1(Procuder):
lion@node1:~/_code/_beanstalk/_golang$ go run producer_tube_idoall.go
job 5
使用beanstool工具查看隊列狀態:
lion@node1:~/_app$ beanstool stats
+------------------+----------+----------+----------+----------+----------+----------+----------+
| Name | Buried | Delayed | Ready | Reserved | Urgent | Waiting | Total |
+------------------+----------+----------+----------+----------+----------+----------+----------+
| default | 0 | 0 | 0 | 0 | 0 | 0 | 4 |
| test-idoall-tube | 0 | 0 | 1 | 0 | 1 | 0 | 1 |
+------------------+----------+----------+----------+----------+----------+----------+----------+
Console2(Consumer):
lion@node1:~/_code/_beanstalk/_golang$ go run consumer_tube_idoall.go
job 5
hello idoall.org from idoall tube
5.3、Beanstalkd的消息持久化
持久化消息非常簡單,在啟動的時候使用-b參數,指定目錄:
lion@node1:~$ /usr/bin/beanstalkd -l 127.0.0.1 -p 11300 -b /home/lion/tmp
可以參考文章使用Supervisor3.2.1基於Mac10.10.3對系統進程進行管理將Beanstalkd的命令做為系統進程,在開機的時候隨系統啟動。
6、參考資料
博文作者:迦壹 博客地址:[Ubuntu14.04+Beanstalkd1.9最佳實踐](http://idoall.org/blog/post/lion/16) 轉載聲明:可以轉載, 但必須以超鏈接形式標明文章原始出處和作者信息及版權聲明,謝謝合作!