之前的幾篇文章介紹了一下RabbitMQ的概念以及環境的搭建和配置,有了RabbitMQ環境就可以基於其實現一些特殊的任務場景了。RabbitMQ官方有個很好的Tutorials基本覆蓋了RabbitMQ的各中常見應用場景,現以代碼加注釋的方式以其Python客戶端pika為例簡單介紹如下。更詳盡的信息可參閱:http://www.rabbitmq.com/getstarted.html 。
之前的幾篇文章:
RabbitMQ概念及環境搭建(一)單節點安裝與配置
RabbitMQ概念及環境搭建(二)RabbitMQ Broker管理
RabbitMQ概念及環境搭建(三)RabbitMQ cluster
RabbitMQ概念及環境搭建(四)RabbitMQ High Availability
RabbitMQ概念及環境搭建(五)與web的整合
RabbitMQ是一個消息代理,從“生產者”接收消息並傳遞消息至“消費者”,期間可根據規則路由、緩存、持久化消息。“生產者”也即message發送者以下簡稱P,相對應的“消費者”乃message接收者以下簡稱C,message通過queue由P到C,queue存在於RabbitMQ,可存儲盡可能多的message,多個P可向同一queue發送message,多個C可從同一個queue接收message。
應用場景1-“Hello Word”
一個P向queue發送一個message,一個C從該queue接收message並打印。
send.py
producer,連接至RabbitMQ Server,聲明隊列,發送message,關閉連接,退出。
- #!/usr/bin/python27
- #encoding:utf8
- import pika
- #與RabbitMQ Server建立連接
- #連接到的broker在本機-localhost上
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- #聲明隊列以向其發送消息消息
- #向不存在的位置發送消息時RabbitMQ將消息丟棄
- #queue='hello'指定隊列名字
- channel.queue_declare(queue='hello', durable=True)
- #message不能直接發送給queue,需經exchange到達queue,此處使用以空字符串標識的默認的exchange
- #使用默認exchange時允許通過routing_key明確指定message將被發送給哪個queue
- #body參數指定了要發送的message內容
- channel.basic_publish(exchange='',
- routing_key='hello',
- body='Hello World!')
- print " [x] Sent 'Hello World!'"
- #關閉與RabbitMq Server間的連接
- connection.close()
receive.py
consumer,連接至RabbitMQ Server,聲明隊列,接收消息並進行處理這里為打印出消息,退出。
- #!/usr/bin/env python
- #encoding:utf8
- import pika
- #建立到達RabbitMQ Server的connection
- #此處RabbitMQ Server位於本機-localhost
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- #聲明queue,確認要從中接收message的queue
- #queue_declare函數是冪等的,可運行多次,但只會創建一次
- #若可以確信queue是已存在的,則此處可省略該聲明,如producer已經生成了該queue
- #但在producer和consumer中重復聲明queue是一個好的習慣
- channel.queue_declare(queue='hello')
- print ' [*] Waiting for messages. To exit press CTRL+C'
- #定義回調函數
- #一旦從queue中接收到一個message回調函數將被調用
- #ch:channel
- #method:
- #properties:
- #body:message
- def callback(ch, method, properties, body):
- print " [x] Received %r" % (body,)
- #從queue接收message的參數設置
- #包括從哪個queue接收message,用於處理message的callback,是否要確認message
- #默認情況下是要對消息進行確認的,以防止消息丟失。
- #此處將no_ack明確指明為True,不對消息進行確認。
- channel.basic_consume(callback,
- queue='hello',
- no_ack=True)
- #開始循環從queue中接收message並使用callback進行處理
- channel.start_consuming()
測試
- python send.py
- python receive.py
應用場景2-work queues
將耗時的消息處理通過隊列分配給多個consumer來處理,我們稱此處的consumer為worker,我們將此處的queue稱為Task Queue,其目的是為了避免資源密集型的task的同步處理,也即立即處理task並等待完成。相反,調度task使其稍后被處理。也即把task封裝進message並發送到task queue,worker進程在后台運行,從task queue取出task並執行job,若運行了多個worker,則task可在多個worker間分配。
new_task.py
建立連接,聲明隊列,發送可以模擬耗時任務的message,斷開連接、退出。
- #!/usr/bin/env python
- #encoding:utf8
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- #僅僅對message進行確認不能保證message不丟失,比如RabbitMQ崩潰了queue就會丟失
- #因此還需使用durable=True聲明queue是持久化的,這樣即便Rabb崩潰了重啟后queue仍然存在
- channel.queue_declare(queue='task_queue', durable=True)
- #從命令行構造將要發送的message
- message = ' '.join(sys.argv[1:]) or "Hello World!"
- #除了要聲明queue是持久化的外,還需聲明message是持久化的
- #basic_publish的properties參數指定message的屬性
- #此處pika.BasicProperties中的delivery_mode=2指明message為持久的
- #這樣一來RabbitMQ崩潰重啟后queue仍然存在其中的message也仍然存在
- #需注意的是將message標記為持久的並不能完全保證message不丟失,因為
- #從RabbitMQ接收到message到將其存儲到disk仍需一段時間,若此時RabbitMQ崩潰則message會丟失
- #況且RabbitMQ不會對每條message做fsync動作
- #可通過publisher confirms實現更強壯的持久性保證
- channel.basic_publish(exchange='',
- routing_key='task_queue',
- body=message,
- properties=pika.BasicProperties(
- delivery_mode = 2, # make message persistent
- ))
- print " [x] Sent %r" % (message,)
- connection.close()
worker.py
建立連接,聲明隊列,不斷的接收message,處理任務,進行確認。
- #!/usr/bin/env python
- #encoding:utf8
- import pika
- import time
- #默認情況RabbirMQ將message以round-robin方式發送給下一個consumer
- #每個consumer接收到的平均message量是一樣的
- #可以同時運行兩個或三個該程序進行測試
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- #僅僅對message進行確認不能保證message不丟失,比如RabbitMQ崩潰了
- #還需使用durable=True聲明queue是持久化的,這樣即便Rabb崩潰了重啟后queue仍然存在其中的message不會丟失
- #RabbitMQ中不允許使用不同的參數定義同名queue
- channel.queue_declare(queue='task_queue', durable=True)
- print ' [*] Waiting for messages. To exit press CTRL+C'
- #回調函數,函數體模擬耗時的任務處理:以message中'.'的數量表示sleep的秒數
- def callback(ch, method, properties, body):
- print " [x] Received %r" % (body,)
- time.sleep( body.count('.') )
- print " [x] Done"
- #對message進行確認
- ch.basic_ack(delivery_tag = method.delivery_tag)
- #若存在多個consumer每個consumer的負載可能不同,有些處理的快有些處理的慢
- #RabbitMQ並不管這些,只是簡單的以round-robin的方式分配message
- #這可能造成某些consumer積壓很多任務處理不完而一些consumer長期處於飢餓狀態
- #可以使用prefetch_count=1的basic_qos方法可告知RabbitMQ只有在consumer處理並確認了上一個message后才分配新的message給他
- #否則分給另一個空閑的consumer
- channel.basic_qos(prefetch_count=1)
- #這里移除了no_ack=True這個參數,也即需要對message進行確認(默認行為)
- #否則consumer在偶然down后其正在處理和分配到該consumer還未處理的message可能發生丟失
- #因為此時RabbitMQ在發送完message后立即從內存刪除該message
- #假如沒有設置no_ack=True則consumer在偶然down掉后其正在處理和分配至該consumer但還未來得及處理的message會重新分配到其他consumer
- #沒有設置no_ack=True則consumer在收到message后會向RabbitMQ反饋已收到並處理了message告訴RabbitMQ可以刪除該message
- #RabbitMQ中沒有超時的概念,只有在consumer down掉后重新分發message
- channel.basic_consume(callback,
- queue='task_queue')
- channel.start_consuming()
測試
- python new_task.py "A very hard task which takes two seconds.."
- python worker.py
應用場景3-Publish/Subscribe
在應用場景2中一個message(task)僅被傳遞給了一個comsumer(worker)。現在我們設法將一個message傳遞給多個consumer。這種模式被稱為publish/subscribe。此處以一個簡單的日志系統為例進行說明。該系統包含一個log發送程序和一個log接收並打印的程序。由log發送者發送到queue的消息可以被所有運行的log接收者接收。因此,我們可以運行一個log接收者直接在屏幕上顯示log,同時運行另一個log接收者將log寫入磁盤文件。
receive_logs.py
日志消息接收者:建立連接,聲明exchange,將exchange與queue進行綁定,開始不停的接收log並打印。
- #!/usr/bin/env python
- #encoding:utf8
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- #作為好的習慣,在producer和consumer中分別聲明一次以保證所要使用的exchange存在
- channel.exchange_declare(exchange='logs',
- type='fanout')
- #在不同的producer和consumer間共享queue時指明queue的name是重要的
- #但某些時候,比如日志系統,需要接收所有的log message而非一個子集
- #而且僅對當前的message 流感興趣,對於過時的message不感興趣,那么
- #可以申請一個臨時隊列這樣,每次連接到RabbitMQ時會以一個隨機的名字生成
- #一個新的空的queue,將exclusive置為True,這樣在consumer從RabbitMQ斷開后會刪除該queue
- result = channel.queue_declare(exclusive=True)
- #用於獲取臨時queue的name
- queue_name = result.method.queue
- #exchange與queue之間的關系成為binding
- #binding告訴exchange將message發送該哪些queue
- channel.queue_bind(exchange='logs',
- queue=queue_name)
- print ' [*] Waiting for logs. To exit press CTRL+C'
- def callback(ch, method, properties, body):
- print " [x] %r" % (body,)
- #從指定地queue中consume message且不確認
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
- channel.start_consuming()
emit_log.py
日志消息發送者:建立連接,聲明fanout類型的exchange,通過exchage向queue發送日志消息,消息被廣播給所有接收者,關閉連接,退出。
- #!/usr/bin/env python
- #encoding:utf8
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- #producer只能通過exchange將message發給queue
- #exchange的類型決定將message路由至哪些queue
- #可用的exchange類型:direct\topic\headers\fanout
- #此處定義一個名稱為'logs'的'fanout'類型的exchange,'fanout'類型的exchange簡單的將message廣播到它所知道的所有queue
- channel.exchange_declare(exchange='logs',
- type='fanout')
- message = ' '.join(sys.argv[1:]) or "info: Hello World!"
- #將message publish到名為log的exchange中
- #因為是fanout類型的exchange,這里無需指定routing_key
- channel.basic_publish(exchange='logs',
- routing_key='',
- body=message)
- print " [x] Sent %r" % (message,)
- connection.close()
測試
- python receive_logs.py
- python emit_log.py "info: This is the log message"
應用場景4-Routing
應用場景3中構建了簡單的log系統,可以將log message廣播至多個receiver。現在我們將考慮只把指定的message類型發送給其subscriber,比如,只把error message寫到log file而將所有log message顯示在控制台。
receive_logs_direct.py
log message接收者:建立連接,聲明direct類型的exchange,聲明queue,使用提供的參數作為routing_key將queue綁定到exchange,開始循環接收log message並打印。
- #!/usr/bin/env python
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- #聲明一個名為direct_logs類型為direct的exchange
- #同時在producer和consumer中聲明exchage或queue是個好習慣,以保證其存在
- channel.exchange_declare(exchange='direct_logs',
- type='direct')
- result = channel.queue_declare(exclusive=True)
- queue_name = result.method.queue
- #從命令行獲取參數:routing_key
- severities = sys.argv[1:]
- if not severities:
- print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],)
- sys.exit(1)
- for severity in severities:
- #exchange和queue之間的binding可接受routing_key參數
- #該參數的意義依賴於exchange的類型
- #fanout類型的exchange直接忽略該參數
- #direct類型的exchange精確匹配該關鍵字進行message路由
- #對多個queue使用相同的binding_key是合法的
- channel.queue_bind(exchange='direct_logs',
- queue=queue_name,
- routing_key=severity)
- print ' [*] Waiting for logs. To exit press CTRL+C'
- def callback(ch, method, properties, body):
- print " [x] %r:%r" % (method.routing_key, body,)
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
- channel.start_consuming()
emit_log_direct.py
log message發送者:建立連接,聲明direct類型的exchange,生成並發送log message到exchange,關閉連接,退出。
- #!/usr/bin/env python
- #encoding:utf8
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- #聲明一個名為direct_logs的direct類型的exchange
- #direct類型的exchange
- channel.exchange_declare(exchange='direct_logs',
- type='direct')
- #從命令行獲取basic_publish的配置參數
- severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
- message = ' '.join(sys.argv[2:]) or 'Hello World!'
- #向名為direct_logs的exchage按照設置的routing_key發送message
- channel.basic_publish(exchange='direct_logs',
- routing_key=severity,
- body=message)
- print " [x] Sent %r:%r" % (severity, message)
- connection.close()
測試:
python receive_logs_direct.py info
python emit_log_direct.py info "The message"
應用場景5-topic
應用場景4中改進的log系統中用direct類型的exchange替換應用場景3中的fanout類型exchange實現將不同的log message發送給不同的subscriber(也即分別通過不同的routing_key將queue綁定到exchange,這樣exchange便可將不同的message根據message內容路由至不同的queue)。但仍然存在限制,不能根據多個規則路由消息,比如接收者要么只能收error類型的log message要么只能收info類型的message。如果我們不僅想根據log的重要級別如info、warning、error等來進行log message路由還想同時根據log message的來源如auth、cron、kern來進行路由。為了達到此目的,需要topic類型的exchange。topic類型的exchange中routing_key中可以包含兩個特殊字符:“*”用於替代一個詞,“#”用於0個或多個詞。
receive_logs_topic.py
log message接收者:建立連接,聲明topic類型的exchange,聲明queue,根據程序參數構造routing_key,根據routing_key將queue綁定到exchange,循環接收並處理message。
- #!/usr/bin/env python
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- #聲明一個名為direct_logs類型為direct的exchange
- #同時在producer和consumer中聲明exchage或queue是個好習慣,以保證其存在
- channel.exchange_declare(exchange='direct_logs',
- type='direct')
- result = channel.queue_declare(exclusive=True)
- queue_name = result.method.queue
- #從命令行獲取參數:routing_key
- severities = sys.argv[1:]
- if not severities:
- print >> sys.stderr, "Usage: %s [info] [warning] [error]" % (sys.argv[0],)
- sys.exit(1)
- for severity in severities:
- #exchange和queue之間的binding可接受routing_key參數
- #該參數的意義依賴於exchange的類型
- #fanout類型的exchange直接忽略該參數
- #direct類型的exchange精確匹配該關鍵字進行message路由
- #對多個queue使用相同的binding_key是合法的
- channel.queue_bind(exchange='direct_logs',
- queue=queue_name,
- routing_key=severity)
- print ' [*] Waiting for logs. To exit press CTRL+C'
- def callback(ch, method, properties, body):
- print " [x] %r:%r" % (method.routing_key, body,)
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
- channel.start_consuming()
emit_log_topic.py
log message發送者:建立連接、聲明topic類型的exchange、根據程序參數構建routing_key和要發送的message,以構建的routing_key將message發送給topic類型的exchange,關閉連接,退出。
- #!/usr/bin/env python
- #encoding:utf8
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- #聲明一個名為topic_logs的topic類型的exchange
- #topic類型的exchange可通過通配符對message進行匹配從而路由至不同queue
- channel.exchange_declare(exchange='topic_logs',
- type='topic')
- routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
- message = ' '.join(sys.argv[2:]) or 'Hello World!'
- channel.basic_publish(exchange='topic_logs',
- routing_key=routing_key,
- body=message)
- print " [x] Sent %r:%r" % (routing_key, message)
- connection.close()
測試:
- python receive_logs_topic.py "*.rabbit"
- python emit_log_topic.py red.rabbit Hello
應用場景6-PRC
在應用場景2中描述了如何使用work queue將耗時的task分配到不同的worker中。但是,如果我們task是想在遠程的計算機上運行一個函數並等待返回結果呢。這根場景2中的描述是一個完全不同的故事。這一模式被稱為遠程過程調用。現在,我們將構建一個RPC系統,包含一個client和可擴展的RPC server,通過返回斐波那契數來模擬RPC service。
rpc_server.py
RPC server:建立連接,聲明queue,定義了一個返回指定數字的斐波那契數的函數,定義了一個回調函數在接收到包含參數的調用請求后調用自己的返回斐波那契數的函數並將結果發送到與接收到message的queue相關聯的queue,並進行確認。開始接收調用請求並用回調函數進行請求處理。
- #!/usr/bin/env python
- #encoding:utf8
- import pika
- #建立到達RabbitMQ Server的connection
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- #聲明一個名為rpc_queue的queue
- channel.queue_declare(queue='rpc_queue')
- #計算指定數字的斐波那契數
- def fib(n):
- if n == 0:
- return 0
- elif n == 1:
- return 1
- else:
- return fib(n-1) + fib(n-2)
- #回調函數,從queue接收到message后調用該函數進行處理
- def on_request(ch, method, props, body):
- #由message獲取要計算斐波那契數的數字
- n = int(body)
- print " [.] fib(%s)" % (n,)
- #調用fib函數獲得計算結果
- response = fib(n)
- #exchage為空字符串則將message發送個到routing_key指定的queue
- #這里queue為回調函數參數props中reply_ro指定的queue
- #要發送的message為計算所得的斐波那契數
- #properties中correlation_id指定為回調函數參數props中co的rrelation_id
- #最后對消息進行確認
- ch.basic_publish(exchange='',
- routing_key=props.reply_to,
- properties=pika.BasicProperties(correlation_id = \
- props.correlation_id),
- body=str(response))
- ch.basic_ack(delivery_tag = method.delivery_tag)
- #只有consumer已經處理並確認了上一條message時queue才分派新的message給它
- channel.basic_qos(prefetch_count=1)
- #設置consumeer參數,即從哪個queue獲取消息使用哪個函數進行處理,是否對消息進行確認
- channel.basic_consume(on_request, queue='rpc_queue')
- print " [x] Awaiting RPC requests"
- #開始接收並處理消息
- channel.start_consuming()
rpc_client.py
RPC client:遠程過程調用發起者:定義了一個類,類中初始化到RabbitMQ Server的連接、聲明回調queue、開始在回調queue上等待接收響應、定義了在回調queue上接收到響應后的處理函數on_response根據響應關聯的correlation_id屬性作出響應、定義了調用函數並在其中向調用queue發送包含correlation_id等屬性的調用請求、初始化一個client實例,以30為參數發起遠程過程調用。
- #!/usr/bin/env python
- #encoding:utf8
- import pika
- import uuid
- #在一個類中封裝了connection建立、queue聲明、consumer配置、回調函數等
- class FibonacciRpcClient(object):
- def __init__(self):
- #建立到RabbitMQ Server的connection
- self.connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- self.channel = self.connection.channel()
- #聲明一個臨時的回調隊列
- result = self.channel.queue_declare(exclusive=True)
- self.callback_queue = result.method.queue
- #此處client既是producer又是consumer,因此要配置consume參數
- #這里的指明從client自己創建的臨時隊列中接收消息
- #並使用on_response函數處理消息
- #不對消息進行確認
- self.channel.basic_consume(self.on_response, no_ack=True,
- queue=self.callback_queue)
- #定義回調函數
- #比較類的corr_id屬性與props中corr_id屬性的值
- #若相同則response屬性為接收到的message
- def on_response(self, ch, method, props, body):
- if self.corr_id == props.correlation_id:
- self.response = body
- def call(self, n):
- #初始化response和corr_id屬性
- self.response = None
- self.corr_id = str(uuid.uuid4())
- #使用默認exchange向server中定義的rpc_queue發送消息
- #在properties中指定replay_to屬性和correlation_id屬性用於告知遠程server
- #correlation_id屬性用於匹配request和response
- self.channel.basic_publish(exchange='',
- routing_key='rpc_queue',
- properties=pika.BasicProperties(
- reply_to = self.callback_queue,
- correlation_id = self.corr_id,
- ),
- #message需為字符串
- body=str(n))
- while self.response is None:
- self.connection.process_data_events()
- return int(self.response)
- #生成類的實例
- fibonacci_rpc = FibonacciRpcClient()
- print " [x] Requesting fib(30)"
- #調用實例的call方法
- response = fibonacci_rpc.call(30)
- print " [.] Got %r" % (response,)
測試:
- python rpc_server.py
- python rpc_client.py