一種消息和任務隊列——beanstalkd


 

 

beanstalkd 是一個輕量級消息中間件,其主要特性:
  • 基於管道  (tube) 和任務 (job) 的工作隊列 (work-queue):d
      管道(tube),tube類似於消息主題(topic),在一個beanstalkd中可以支持多個tube,每個tube都有自己的producer和consumer;
          任務(job),beanstalkd用job代替了message的概念,與消息不同,job有一系列狀態: 
  • 內部實現采用了 libevent, 服務器-客戶端之間用類似 memcached 的輕量級通訊協議,具有有很高的性能。
  • 盡管是內存隊列,beanstalkd 提供了 binlog 機制,當重啟 beanstalkd 時,當前任務狀態能夠從紀錄的本地 binlog 中恢復。
  • 優先級(priority):job可以有0~2^32個優先級,0代表最高優先級,beanstalkd使用最大最小堆處理job的優先級排序,因此reserve命令的時間復雜度是O(logN);
  • 延時(delay),有兩種方式可以執行延時任務:producer發布任務時指定延時;或者當任務處理完畢后, consumer再次將任務放入隊列延時執行 (RELEASE with <delay>);
  • 超時重發(time-to-run),Beanstalkd 把job返回給consumer以后:consumer必須在預設的 TTR (time-to-run) 時間內發送 delete / release/ bury 改變任務狀態;否則 Beanstalkd 會認為消息處理失敗,然后把job交給另外的消費者節點執行。如果consumer預計在 TTR (time-to-run) 時間內無法完成任務, 也可以發送 touch 命令, 它的作用是讓 Beanstalkd 從系統時間重新計算 TTR ;
  • 任務預留(buried),如果job因為某些原因無法執行, consumer可以把任務置為 buried 狀態讓 Beanstalkd 保留這些任務。管理員可以通過 peek buried 命令查詢被保留的任務,並且進行人工干預。簡單的, kick <n> 能夠一次性把 n 條被保留的任務踢回隊列。
 
 
 
 

job的狀態

  • READY,需要立即處理的任務,當延時 (DELAYED) 任務到期后會自動成為當前任務;
  • DELAYED,延遲執行的任務, 當消費者處理任務后,可以用將消息再次放回 DELAYED 隊列延遲執行;
  • RESERVED,已經被消費者獲取, 正在執行的任務,Beanstalkd 負責檢查任務是否在 TTR(time-to-run) 內完成;
  • BURIED,保留的任務: 任務不會被執行,也不會消失,除非有人把它 "踢" 回隊列;
  • DELETED,消息被徹底刪除。Beanstalkd 不再維持這些消息。
 
如下,是一個典型任務的生命周期:
producer執行put命令將job放入隊列,consumer執行reserve命令從隊列取出job,執行完畢后發送delete命令告訴beanstalkd刪除該job。
如果沒有執行delete命令,beanstalkd將在一個TTR周期(默認120s)后重新將該job加入隊列;
 
 put            reserve               delete
  -----> [READY] ---------> [RESERVED] --------> *poof*
 
下面是一個使用 beanstalkc(python客戶端)操作beanstalkd的例子:
#!/usr/bin/env python
import beanstalkc

beanstalk=beanstalkc.Connection(host="127.0.0.1",port=11300)

# pruducer
beanstalk.put('hello')
beanstalk.put('world')

# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body
job1.delete()

job2=beanstalk.reserve(timeout=1)
print "job2: " + job2.body
job2.delete()

job3=beanstalk.reserve(timeout=1)       # Error,隊列中已經沒有job了
print "job3: " + job3.body
job3.delete()

 

 

tube的管理

beanstalkd通過tube維護多個隊列,每個tube都是一個獨立的queue,可以使用use命令切換tube,如果切換的tube不存在,會自動創建一個:
print beanstalk.tubes()     # default
print beanstalk.using()     # default 

beanstalk.use('queue1')
print beanstalk.using()     # queue1

beanstalk.use('queue2')     # queue2
print beanstalk.using()

print beanstalk.tubes()     # default, queue2

如上面的例子,在最后的tubes()命令打印所有tube的時候,並沒有看到queue1,這是因為沒有任何client 在using或者watching的tube會自動消失。

可以使用watch命令讓client同時處理多個tube,而不用擔心tube會被銷毀:

print beanstalk.tubes()     # default
print beanstalk.using()     # default

beanstalk.use('queue1')
beanstalk.watch('queue1')
print beanstalk.using()     # queue1

beanstalk.use('queue2')     # queue2
print beanstalk.using()

print beanstalk.tubes()     # default, queue1, queue2
print beanstalk.watching()  # default, queue1

 

watch的tube如果不存在,會被自動創建,可以用ignore命令取消關注tube:

beanstalk.watch('queue3')
print beanstalk.watching()  # default, queue3
beanstalk.ignore('queue3')
print beanstalk.watching()  # default

 

注意,watch和use是兩個獨立的動作,use一個tube不代表watching它了,反之watch一個tube也不代表using它;
 
 
 

beanstalkc命令

如下,是一個job更完整的狀態變遷和生命周期:
 
put with delay               release with delay
  ----------------> [DELAYED] <------------.
                        |                   |
                        | (time passes)     |
                        |                   |
   put                  v     reserve       |       delete
  -----------------> [READY] ---------> [RESERVED] --------> *poof*
                       ^  ^                |  |
                       |   \  release      |  |
                       |    `-------------'   |
                       |                      |
                       | kick                 |
                       |                      |
                       |       bury           |
                    [BURIED] <---------------'
                       |
                       |  delete
                        `--------> *poof*
 
一些例子:
put的時候加上delay參數,可以延遲發布job:
# pruducer
beanstalk.put('hello', delay=10)
beanstalk.put('world')

# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body  # World
job1.delete()

job2=beanstalk.reserve(timeout=0) 
print "job2: " + job2.body  # Error,

 

put命令也支持優先級參數:

# pruducer
beanstalk.put('hello', priority=10)
beanstalk.put('world', priority=9)

# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body  # world
job1.delete()

job2=beanstalk.reserve(timeout=0) 
print "job2: " + job2.body  # hello
job2.delete()

 

 

release命令可以釋放job回隊列:

# pruducer
beanstalk.put('hello') 
beanstalk.put('world')

# consumer
job1=beanstalk.reserve()
print "job1: " + job1.body  # hello
job1.release()

job2=beanstalk.reserve(timeout=0) 
print "job2: " + job2.body  # hello

 

 bury命令將job放到一個特殊的FIFO隊列中,之后不能被reserve命令獲取,但可以用kick命令扔回工作隊列中,之后就能被消費了:

# pruducer
beanstalk.put('hello')
beanstalk.put('world')

# consumer
job1=beanstalk.reserve(timeout=0)
job1.bury()
print job1.stats()['state'] # buried

job2=beanstalk.reserve(timeout=0)
print "job2: " + job2.body  # world
job2.delete()

beanstalk.kick()

job3=beanstalk.reserve(timeout=0)
print "job3: " + job3.body  # hello
job3.delete()

 

peek命令允許查看一個job,但不會reserve它;

# pruducer
beanstalk.put('hello')
beanstalk.put('world')

#print(beanstalk.stats())  

# consumer
job1=beanstalk.reserve(timeout=0)
job1_id=job1.stats()['id'] 
print job1_id
job1_r=beanstalk.peek(job1_id)
print "job1 " + job1_r.body  # hello

job2=beanstalk.reserve(timeout=0)
print "job2: " + job2.body  # hello
job2.delete()

job3=beanstalk.reserve(timeout=0) 
print "job3: " + job3.body  # world
job3.delete()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM