RabbitMQ隊列
首先我們在講rabbitMQ之前我們要說一下python里的queue:二者干的事情是一樣的,都是隊列,用於傳遞消息
在python的queue中有兩個一個是線程queue,一個是進程queue(multiprocessing中的queue)。線程queue不能夠跨進程,用於多個線程之間進行數據同步交互;進程queue只是用於父進程與子進程,或者同屬於同意父進程下的多個子進程 進行交互。也就是說如果是兩個完全獨立的程序,即使是python程序,也依然不能夠用這個進程queue來通信。那如果我們有兩個獨立的python程序,分屬於兩個進程,或者是python和其他語言
安裝:windows下
安裝pika:
之前安裝過了pip,直接打開cmd,運行pip install pika
安裝完畢之后,實現一個最簡單的隊列通信:
producer:
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 4 #聲明一個管道 5 channel = connection.channel() 6 7 #聲明queue 8 channel.queue_declare(queue = 'hello') 9 #routing_key是queue的名字 10 channel.basic_publish(exchange='', 11 routing_key='hello',#queue的名字 12 body='Hello World!', 13 ) 14 print("[x] Send 'Hello World!'") 15 connection.close()
先建立一個基本的socket,然后建立一個管道,在管道中發消息,然后聲明一個queue,起個隊列的名字,之后真正的發消息(basic_publish)
consumer:
1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 3 channel = connection.channel() 4 5 channel.queue_declare(queue='hello') 6 7 8 def callback(ch, method, properties, body):#回調函數 9 print("---->",ch,method,properties) 10 print(" [x] Received %r" % body) 11 12 channel.basic_consume(callback,#如果收到消息,就調用callback來處理消息 13 queue='hello', 14 no_ack=True 15 ) 16 17 print(' [*] Waiting for messages. To exit press CTRL+C') 18 channel.start_consuming()
start_consuming()只要一啟動就一直運行下去,他不止收一條,永遠在這里卡住。
在上面不管是produce還是consume,里面都聲明了一個queue,這個是為什么呢?因為我們不知道是消費者先開始運行還是生產者先運行,這樣如果沒有聲明的話就會報錯。
下面我們來看一下一對多,即一個生產者對應多個消費者:
首先我們運行3個消費者,然后不斷的用produce去發送數據,我們可以看到消費者是通過一種輪詢的方式進行不斷的接受數據,每個消費者消費一個。
那么假如我們消費者收到了消息,然后處理這個消息需要30秒鍾,在處理的過程中,消費者斷電了宕機了,那消費者還沒有處理完,我們設這個任務我們必須處理完,那我們應該有一個確認的信息,說這個任務完成了或者是沒有完成,所以我的生產者要確認消費者是否把這個任務處理完了,消費者處理完之后要給這個生產者服務器端發送一個確認信息,生產者才會把這個任務從消息隊列中刪除。如果沒有處理完,消費者宕機了,沒有給生產者發送確認信息,那就表示沒有處理完,那我們看看rabbitMQ是怎么處理的
我們可以在消費者的callback中添加一個time.sleep()進行模擬宕機。callback是一個回調函數,只要事件一觸發就會調用這個函數。函數執行完了就代表消息處理完了,如果函數沒有處理完,那就說明。。。。
我們可以看到在消費者代碼中的basic_consume()中有一個參數叫no_ack=True,這個意思是這條消息是否被處理完都不會發送確認信息,一般我們不加這個參數,rabbitMQ默認就會給你設置成消息處理完了就自動發送確認,我們現在把這個參數去掉,並且在callback中添加一句話運行:ch.basic_ack(delivery_tag=method.delivery_tag)(手動處理)
def callback(ch, method, properties, body):#回調函數 print("---->",ch,method,properties) #time.sleep(30) print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag)
運行的結果就是,我先運行一次生產者,數據被消費者1接收到了,但是我把消費者1宕機,停止運行,那么消費者2就接到了消息,即只要消費者沒有發送確認信息,生產者就不會把信息刪除。
RabbitMQ消息持久化:
我們可以生成好多的消息隊列,那我們怎么查看消息隊列的情況呢:rabbitmqctl.bat list_queues
現在的情況是,消息隊列中還有消息,但是服務器宕機了,那這個消息就丟了,那我就需要這個消息強制的持久化:
channel.queue_declare(queue='hello2',durable=True)
在每次聲明隊列的時候加上一個durable參數(客戶端和服務器端都要加上這個參數),
在這個情況下,我們把rabbitMQ服務器重啟,發現只有隊列名留下了,但是隊列中的消息沒有了,這樣我們還需要在生產者basic_publish中添加一個參數:properties
producer:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #聲明一個管道 channel = connection.channel() #聲明queue channel.queue_declare(queue = 'hello2',durable=True) #routing_key是queue的名字 channel.basic_publish(exchange='', routing_key='hello2', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2,#make message persistent ) ) print("[x] Send 'Hello World!'") connection.close()
這樣就可以使得消息持久化
現在是一個生產者對應三個消費者,很公平的收發收發,但是實際的情況是,我們機器的配置是不一樣的,有的配置是單核的有的配置是多核的,可能i7處理器處理4條消息的時候和其他的處理器處理1條消息的時間差不多,那差的處理器那里就會堆積消息,而好的處理器那里就會形成閑置,在現實中做運維的,我們會在負載均衡中設置權重,誰的配置高權重高,任務就多一點,但是在rabbitMQ中,我們只做了一個簡單的處理就可以實現公平的消息分發,你有多大的能力就處理多少消息
即:server端給客戶端發送消息的時候,先檢查現在還有多少消息,如果當前消息沒有處理完畢,就不會發送給這個消費者消息。如果當前的消費者沒有消息就發送
這個只需要在消費者端進行修改加代碼:
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello2',durable=True) def callback(ch, method, properties, body):#回調函數 print("---->",ch,method,properties) #time.sleep(30) print(" [x] Received %r" % body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback,#如果收到消息,就調用callback來處理消息 queue='hello2', #no_ack=False ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
我們在生成一個consume2,在callback中sleep20秒來模擬
我先啟動兩個produce,被consume接受,然后在啟動一個,就被consumer2接受,但是因為consumer2中sleep20秒,處理慢,所以這時候在啟動produce,就又給了consume進行處理
Publish\Subscrible(消息發布\訂閱)
前面都是1對1的發送接收數據,那我想1對多,想廣播一樣,生產者發送一個消息,所有消費者都收到消息。那我們怎么做呢?這個時候我們就要用到exchange了
exchange在一端收消息,在另一端就把消息放進queue,exchange必須精確的知道收到的消息要干什么,是否應該發到一個特定的queue還是發給許多queue,或者說把他丟棄,這些都被exchange的類型所定義
exchange在定義的時候是有類型的,以決定到底是那些queue符合條件,可以接受消息:
fanout:所有bind到此exchange的queue都可以接受消息
direct:通過rounroutingKey和exchange決定的那個唯一的queue可以接收消息
topic:所有符合routingKey的routingKey所bind的queue可以接受消息
headers:通過headers來決定把消息發給哪些queue
消息publisher:
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 5 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='log',type = 'fanout') 9 10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!' 11 channel.basic_publish(exchange='logs',routing_key='',body=message) 12 print("[x] Send %r " % message) 13 connection.close()
這里的exchange之前是空的,現在賦值log;在這里也沒有聲明queue,廣播不需要寫queue
消息subscriber:
1 import pika 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 3 channel = connection.channel() 4 channel.exchange_declare(exchange='logs',exchange_type='fanout') 5 6 #exclusive唯一的,不指定queue名字,rabbit會隨機分配一個名字 7 #exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除 8 result = channel.queue_declare(exclusive=True) 9 queue_name = result.method.queue 10 11 channel.queue_bind(exchange='logs',queue=queue_name) 12 13 print('[*] Waiting for logs,To exit press CTRL+C') 14 15 def callback(ch,method,properties,body): 16 print("[X] %r" % body) 17 channel.basic_consume(callback,queue = queue_name,no_ack=True) 18 channel.start_consuming()
在消費者這里我們有定義了一個queue,注意一下注釋中的內容。但是我們在發送端沒有聲明queue,為什么發送端不需要接收端需要呢?在consume里有一個channel.queue_bind()函數,里面綁定了exchange轉換器上,當然里面還需要一個queue_name
運行結果:
就相當於收音機一樣,實時廣播,打開三個消費者,生產者發送一條數據,然后3個消費者同時接收到
有選擇的接收消息(exchange_type = direct)
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據關鍵字判定應該將數據發送到指定的隊列
publisher:
1 import pika 2 import sys 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 4 channel = connection.channel() 5 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct') 7 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info' 9 message = ' '.join(sys.argv[2:]) or 'Hello World!' 10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message) 11 12 print("[X] Send %r:%r" %(severity,message)) 13 connection.close()
subscriber:
import pika import sys connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connect.channel() channel.exchange_declare(exchange='direct_logs',exchange_type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:]# if not severities: sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity) print('[*]Waiting for logs.To exit press CTRL+c') def callback(ch,method,properties,body): print("[x] %r:%r"%(method.routing_key,body)) channel.basic_consume(callback,queue = queue_name,no_ack=True) channel.start_consuming()
更加細致的過濾(exchange_type=topic)
publish:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
subscriber:
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange='topic_logs', 8 exchange_type='topic') 9 10 result = channel.queue_declare(exclusive=True) 11 queue_name = result.method.queue 12 13 binding_keys = sys.argv[1:] 14 if not binding_keys: 15 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 16 sys.exit(1) 17 18 for binding_key in binding_keys: 19 channel.queue_bind(exchange='topic_logs', 20 queue=queue_name, 21 routing_key=binding_key) 22 23 print(' [*] Waiting for logs. To exit press CTRL+C') 24 25 26 def callback(ch, method, properties, body): 27 print(" [x] %r:%r" % (method.routing_key, body)) 28 29 30 channel.basic_consume(callback, 31 queue=queue_name, 32 no_ack=True) 33 34 channel.start_consuming()
以上都是服務器端發消息,客戶端收消息,消息流是單向的,那如果我們想要發一條命令給遠程的客戶端去執行,然后想讓客戶端執行的結果返回,則這種模式叫做rpc
RabbitMQ RPC
rpc server:
1 import pika 2 import time 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 4 channel = connection.channel() 5 6 channel.queue_declare(queue='rpc_queue') 7 def fib(n): 8 if n==0: 9 return 0 10 elif n==1: 11 return 1 12 else: 13 return fib(n-1)+fib(n-2) 14 15 def on_request(ch,method,props,body): 16 n = int(body) 17 print("[.] fib(%s)" %n) 18 response = fib(n) 19 20 ch.basic_publish(exchange='',routing_key=props.reply_to, 21 properties=pika.BasicProperties(correlation_id=props.correlation_id), 22 body = str(response)) 23 ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue') 26 27 print("[x] Awaiting rpc requests") 28 channel.start_consuming()
rpc client:
1 import pika 2 import uuid,time 3 class FibonacciRpcClient(object): 4 def __init__(self): 5 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 6 7 self.channel = self.connection.channel() 8 9 result = self.channel.queue_declare(exclusive=True) 10 self.callback_queue = result.method.queue 11 12 self.channel.basic_consume(self.on_response,#回調函數,只要一收到消息就調用 13 no_ack=True,queue=self.callback_queue) 14 15 def on_response(self,ch,method,props,body): 16 if self.corr_id == props.correlation_id: 17 self.response = body 18 19 def call(self,n): 20 self.response = None 21 self.corr_id = str(uuid.uuid4()) 22 self.channel.basic_publish(exchange='',routing_key='rpc_queue', 23 properties=pika.BasicProperties( 24 reply_to=self.callback_queue, 25 correlation_id=self.corr_id 26 ), 27 body=str(n),#傳的消息,必須是字符串 28 ) 29 while self.response is None: 30 self.connection.process_data_events()#非阻塞版的start_consuming 31 print("no message....") 32 time.sleep(0.5) 33 return int(self.response) 34 fibonacci_rpc = FibonacciRpcClient() 35 print("[x] Requesting fib(30)") 36 response = fibonacci_rpc.call(30) 37 print("[.] Got %r"%response)
之前的start_consuming是進入一個阻塞模式,沒有消息就等待消息,有消息就收過來
self.connection.process_data_events()是一個非阻塞版的start_consuming,就是說發了一個東西給客戶端,每過一點時間去檢查有沒有消息,如果沒有消息,可以去干別的事情
reply_to = self.callback_queue是用來接收反應隊列的名字
corr_id = str(uuid.uuid4()),correlation_id第一在客戶端會通過uuid4生成,第二在服務器端返回執行結果的時候也會傳過來一個,所以說如果服務器端發過來的correlation_id與自己的id相同 ,那么服務器端發出來的結果就肯定是我剛剛客戶端發過去的指令的執行結果。現在就一個服務器端一個客戶端,無所謂缺人不確認。現在客戶端是非阻塞版的,我們可以不讓它打印沒有消息,而是執行新的指令,這樣就兩條消息,不一定按順序完成,那我們就需要去確認每個返回的結果是哪個命令的執行結果。
總體的模式是這樣的:生產者發了一個命令給消費者,不知道客戶端什么時候返回,還是要去收結果的,但是它又不想進入阻塞模式,想每過一段時間看這個消息收回來沒有,如果消息收回來了,就代表收完了。
運行結果:
服務器端開啟,然后在啟動客戶端,客戶端先是等待消息的發送,然后做出反應,直到算出斐波那契