Ubuntu14.04+Beanstalkd1.9最佳實踐


目錄

1、基本概念

1.1、什么是Beanstalkd?

  Beanstalkd 是一個輕量級消息中間件,它最大特點是將自己定位為基於管道 (tube) 和任務 (job) 的工作隊列。
  Beanstalkd 支持任務優先級 (priority), 延時 (delay), 超時重發 (time-to-run) 和預留 (buried), 能夠很好的支持分布式的后台任務和定時任務處理。它的內部采用libevent,服務器-客戶端之間采用類似Memcached的輕量級通訊協議,因此性能很高(enque: 9000 jobs/second, worker: 5200 jobs/second)
  盡管是內存隊列, Beanstalkd 提供了 binlog 機制, 當重啟 beanstalkd 時,當前任務狀態能夠從紀錄的本地 binlog 中恢復。Beanstalkd支持過有9.5 million用戶的Facebook Causes應用。后來開源,現在有PostRank大規模部署和使用,每天處理百萬級任務。Beanstalkd是典型的類Memcached設計,協議和使用方式都是同樣的風格,所以使用過Memcached的用戶會覺得Beanstalkd似曾相識。

  Beanstalkd支持的語言有很多,可以參考這里:https://github.com/kr/beanstalkd/wiki/client-libraries

1.2、Beanstalkd設計的核心概念?

  • job

  一個需要異步處理的任務,是Beanstalkd中的基本單元,需要放在一個tube中。

  • tube

  一個有名的任務隊列,用來存儲統一類型的job,是producer和consumer操作的對象。

  • producer

  Job的生產者,通過put命令來將一個job放到一個tube中。

  • consumer

  Job的消費者,通過reserve/release/bury/delete命令來獲取job或改變job的狀態。

1.3、Beanstalkd中Job的生命周期介紹

  Beanstalkd的流程如下圖:

  http://idoall.org

  • 當producer直接put一個job時,job就處於READY狀態,等待consumer來處理,如果選擇延遲put,job就先到DELAYED狀態,等待時間過后才遷移到READY狀態。
  • consumer獲取了當前READY的job后,該job的狀態就遷移到RESERVED,這樣其他的consumer就不能再操作該job。
  • 當consumer完成該job后,可以選擇delete, release或者bury操作;
    • delete之后,job從系統消亡,之后不能再獲取
    • release操作可以重新把該job狀態遷移回READY(也可以延遲該狀態遷移操作),使其他的consumer可以繼續獲取和執行該job
    • bury操作,可以把該job休眠,等到需要的時候,再將休眠的job kick回READY狀態,也可以delete buride狀態的job

  正是有這些有趣的操作和狀態,才可以基於此做出很多意思的應用,比如要實現一個循環隊列,就可以將RESERVED狀態的job休眠掉,等沒有READY狀態的job時再將BURIED狀態的job一次性kick回READY狀態。

1.4、Beanstalkd有什么不足?

  Beanstalkd 沒有提供主備同步 + 故障切換機制, 在應用中有成為單點的風險。實際應用中,可以用數據庫為任務 (job) 提供持久化存儲。 和 Memcached 類似, Beanstalkd 依賴 libevent 的單線程事件分發機制, 不能有效利用多核 cpu 的性能。這一點可以通過單機部署多個實例克服。

  http://idoall.org

  一個Beanstalkd尚無提供刪除一個tube的操作,只能將tube的job依次刪除,並讓Beanstalkd來自動刪除空tube。還有就是Beanstalkd不支持客戶端認證機制(開發者將應用場景定位在局域網)。

  Beanstalk速度非常快,協議簡單,占用內存空間少,而且支持持久化。唯一的不足是掛了之后恢復慢,3G日志數據恢復了十多分鍾。

2、Beanstalkd的官網在哪里?

  http://kr.github.io/beanstalkd/

3、Beanstalkd在哪里下載?

  http://kr.github.io/beanstalkd/download.html

4、如何安裝Beanstalkd?

  使用下面的命令進行安裝,同時查看版本:

lion@node1:~$ sudo apt-get install beanstalkd
lion@node1:~$ beanstalkd -v
beanstalkd 1.9

  Beanstalkd可以使用以下命令停止和啟動:

lion@node1:~$ sudo service beanstalkd stop	#停止
lion@node1:~$ sudo service beanstalkd start	#啟動

  通過apt-get安裝后的配置文件目錄在/etc/default/beanstalkd,里面描述了Beanstalkd監聽的地址和端口

lion@node1:~$ cat /etc/default/beanstalkd
## Defaults for the beanstalkd init script, /etc/init.d/beanstalkd on
## Debian systems.

BEANSTALKD_LISTEN_ADDR=127.0.0.1
BEANSTALKD_LISTEN_PORT=11300

# You can use BEANSTALKD_EXTRA to pass additional options. See beanstalkd(1)
# for a list of the available options. Uncomment the following line for
# persistent job storage.
#BEANSTALKD_EXTRA="-b /var/lib/beanstalkd"

4.1、安裝beanstool管理工具

  beanstool是一個Beanstalkd管理工具,不需要任何依賴就可以使用,是用Golang編寫的。

lion@node1:~/_app$ wget https://github.com/src-d/beanstool/releases/download/v0.2.0/beanstool_v0.2.0_linux_amd64.tar.gz
lion@node1:~/_app$ tar -xvzf beanstool_v0.2.0_linux_amd64.tar.gz
lion@node1:~/_app$ sudo cp beanstool_v0.2.0_linux_amd64/beanstool /usr/local/bin/

5、Beanstalkd調用案例

  我們是使用Golang語言來調用Beanstalkd的,在官網上可以看到有現成的Golang包Beanstalk,使用下面的命令先安裝Beanstalk

lion@node1:~$ go get github.com/kr/beanstalk

5.1、使用默認的Tube發送、接收消息

  producer_default.go(Produce jobs):

package main

import (
  "fmt"
  "github.com/kr/beanstalk"
  "time"
  "log"
)

const (
  //uri
  uri           =  "127.0.0.1:11300"
  //network
  network =  "tcp"
  //
  content = "hello idoall.org"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}


func main(){
  conn, err := beanstalk.Dial(network, uri)
  failOnError(err, "Failed to connect to beanstalkd")
  defer conn.Close()

  //把job放入tube中並設置優先級
  //
  id, err := conn.Put(
    []byte(content), //內容
    1,    //優先級  0~2^32 個優先級, 0 代表最高優先級,默認優先級為1024。 
    0,    //要延遲發送的時間,0表示不延遲 
    time.Minute)
  failOnError(err, "Failed puts a job into tube")
  fmt.Println("job", id)
}

  consumer_default.go:

package main

import (
  "fmt"
  "github.com/kr/beanstalk"
  "time"
  "log"
)

const (
  //uri
  uri           =  "127.0.0.1:11300"
  //network
  network =  "tcp"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}


func main(){
  conn, err := beanstalk.Dial(network, uri)
  failOnError(err, "Failed to connect to beanstalkd")
  defer conn.Close()


  //
  id, body, err := conn.Reserve(1 * time.Second)
  failOnError(err, "returns a job from one of the tubes failed")
  fmt.Println("job", id)
  fmt.Println(string(body))
  err = conn.Delete(id)
  failOnError(err, "delete jobs failed")
}




  **查看結果 **


  Console1(Procuder):

lion@node1:~/_code/_beanstalk/_golang$ go run producer.go
job 1

  使用beanstool工具查看隊列狀態:

lion@node1:~/_app$ beanstool stats
+---------+----------+----------+----------+----------+----------+----------+----------+
| Name    | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+---------+----------+----------+----------+----------+----------+----------+----------+
| default | 0        | 0        | 1        | 0        | 1        | 0        | 1        |
+---------+----------+----------+----------+----------+----------+----------+----------+

  Console2(Consumer):

lion@node1:~/_code/_beanstalk/_golang$ go run consumer_default.go
job 1
hello idoall.org

5.2、使用指定的Tube發送、接收消息

  producer_tube_idoall.go(Produce jobs):

package main

import (
  "fmt"
  "github.com/kr/beanstalk"
  "time"
  "log"
)

const (
  //uri
  uri           =  "127.0.0.1:11300"
  //network
  network =  "tcp"
  //
  tubeName = "test-idoall-tube"
  //
  content = "hello idoall.org from idoall tube"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}


func main(){
  conn, err := beanstalk.Dial(network, uri)
  failOnError(err, "Failed to connect to beanstalkd")
  defer conn.Close()

  tube := &beanstalk.Tube{conn, tubeName}

  //把job放入tube中並設置優先級
  //
  id, err := tube.Put(
    []byte(content), //內容
    1,    //優先級  0~2^32 個優先級, 0 代表最高優先級,默認優先級為1024。 
    0,    //要延遲發送的時間,0表示不延遲 
    time.Minute)
  failOnError(err, "Failed puts a job into tube")
  fmt.Println("job", id)
  conn.Close()
}

  consumer_tube_idoall.go:

package main

import (
  "fmt"
  "github.com/kr/beanstalk"
  "time"
  "log"
)

const (
  //uri
  uri           =  "127.0.0.1:11300"
  //network
  network =  "tcp"
  //
  tubeName = "test-idoall-tube"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}


func main(){
  conn, err := beanstalk.Dial(network, uri)
  failOnError(err, "Failed to connect to beanstalkd")
  defer conn.Close()


  tubeSet := beanstalk.NewTubeSet(conn, tubeName)
  //
  id, body, err := tubeSet.Reserve(10 * time.Hour)
  failOnError(err, "returns a job from one of the tubes failed")
  fmt.Println("job", id)
  fmt.Println(string(body))
  err = conn.Delete(id)
  failOnError(err, "delete jobs failed")
}




  查看結果


  Console1(Procuder):

lion@node1:~/_code/_beanstalk/_golang$ go run producer_tube_idoall.go
job 5

  使用beanstool工具查看隊列狀態:

lion@node1:~/_app$ beanstool stats
+------------------+----------+----------+----------+----------+----------+----------+----------+
| Name             | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |
+------------------+----------+----------+----------+----------+----------+----------+----------+
| default          | 0        | 0        | 0        | 0        | 0        | 0        | 4        |
| test-idoall-tube | 0        | 0        | 1        | 0        | 1        | 0        | 1        |
+------------------+----------+----------+----------+----------+----------+----------+----------+

  Console2(Consumer):

lion@node1:~/_code/_beanstalk/_golang$ go run consumer_tube_idoall.go
job 5
hello idoall.org from idoall tube

5.3、Beanstalkd的消息持久化

  持久化消息非常簡單,在啟動的時候使用-b參數,指定目錄:

lion@node1:~$ /usr/bin/beanstalkd -l 127.0.0.1 -p 11300 -b /home/lion/tmp

  可以參考文章使用Supervisor3.2.1基於Mac10.10.3對系統進程進行管理將Beanstalkd的命令做為系統進程,在開機的時候隨系統啟動。

6、參考資料

  官方示例




博文作者:迦壹 博客地址:[Ubuntu14.04+Beanstalkd1.9最佳實踐](http://idoall.org/blog/post/lion/16) 轉載聲明:可以轉載, 但必須以超鏈接形式標明文章原始出處和作者信息及版權聲明,謝謝合作!



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM