RabbitMQ隊列


官方文檔

http://www.rabbitmq.com/install-rpm.html

 

二、安裝rabbitmq
1、下載rabbitmq-server-generic-unix-3.6.5.tar.xz
2、tar xvf rabbitmq-server-generic-unix-3.6.5.tar.xz
3、mv rabbitmq_server-3.6.5/ /usr/local/rabbitmq
4、啟動:
    #啟動rabbitmq服務
    /usr/local/rabbitmq/sbin/rabbitmq-server
    #后台啟動
    /usr/local/rabbitmq/sbin/rabbitmq-server -detached
    #關閉rabbitmq服務
    /usr/local/rabbitmq/sbin/rabbitmqctl stop
    或
    ps -ef | grep rabbit 和 kill -9 xxx

    #開啟插件管理頁面
    /usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management

    #創建用戶
    /usr/local/rabbitmq/sbin/rabbitmqctl add_user rabbitadmin 123456
    usr/local/rabbitmq/sbin/rabbitmqctl set_user_tags rabbitadmin administrator
5、登錄
    #WEB登錄
    http://10.10.3.63:15672
    用戶名:rabbitadmin
    密碼:123456
    
    授權url
    /usr/local/rabbitmq/sbin/rabbitmqctl set_permissions -p / rabbitadmin ".*" ".*" ".*"

    
View Code

url訪問權限  /usr/local/rabbitmq/sbin/rabbitmqctl set_permssions -p / rabbitadmin ".*"".*"".*"

命令行查看消息   /usr/local/rabbitmq/sbin/rabbitmqctl lixt_queues

 開機啟動  chkconfig rabbitmq-server on

實現最簡單的隊列通信

 

這是一個公平的一次分發,就是每個人輪

send端

import pika

credentials=pika.PlainCredentials("rabbitadmin","123456")
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.31.128',credentials=credentials))


channel=connection.channel()#建立的rabbit 協議的通道
#聲明queue
channel.queue_declare(queue='hello')#聲明隊列

#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.

channel.basic_publish(exchange='',
                      routing_key="hello",#把消息發到這個隊列
                      body='Hello world'#發送的內容
                      )

print("[x]Sent 'Hello World!")
connection.close()  #把隊列關掉

receive端

import pika
import time
credentials=pika.PlainCredentials("rabbitadmin","123456")
connection=pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.31.128',credentials=credentials))

channel=connection.channel()    #通道的實例
#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='hello')    #當消費者先來的時候消費者申明列隊
#消費者先來,但包子鋪還沒有,消費者就先建一個包子鋪等包子
#ch=通道的實例 method=服務器發來的頭部消息 body=發來的內容 def callback(ch, method, properties, body): print("received msg...start processing....",body) time.sleep(20) print(" [x] Received %r" % body) channel.basic_consume(callback, #拿到消息后調用callback函數 queue="hello", #從這個隊列里拿數據 no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()

Work Queues

 

在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差不多

 

假如說處理任務需要花一分鍾,處理的時候處理到一半死掉了,怎么辦?應該為了安全起見讓另外一個消費者處理。不是消費者把包子放回去的,而是包子鋪嚴格的檢測看看你吃完之后會不會死,為了防止接受任務沒處理完,就讓消費者吃完包子給一個回饋說你真吃完了證明安全的完整的處理完了還沒死,如果吃了一半沒給回復那就認為已經死了。所以這個機制就是:必須讓消費者把任務處理完,必須給客戶端有一個響應說任務處理完了。包子鋪才會真正的在記賬本上記一筆賬說包子被真正的消費掉了

只要鏈接斷了沒給反應就是死了

在RabbitMQ里消息被拿走之后隊列里並沒有把它刪除,雖然沒在隊列里但是存到了server的其他地方,等消費者給出響應確認消息執行完了才會把消息從RabbitMQ里刪掉。如果沒有響應就會把消息再放到隊列里,其他的客戶端就會再收到這條消息了

消息提供者代碼

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()#實例對象
 
# 聲明queue
channel.queue_declare(queue='task_queue',durble=True)    #durble=True讓隊列持久化,讓隊列永久保存
 
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
import sys
 
#把腳本收到的參數當作參數 沒有參數就發hello world message
= ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time() channel.basic_publish(exchange='', routing_key='task_queue', #往這個隊列發 body=message, #發的內容 properties=pika.BasicProperties( delivery_mode=2, # make message persistent #消息持久,保留消息 ) ) print(" [x] Sent %r" % message) connection.close()

消費者代碼

import pika, time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()    #實例對象
 
 
def callback(ch, method, properties, body):    #函數
    print(" [x] Received %r" % body)    #內容
    time.sleep(20)
    print(" [x] Done")
    print("method.delivery_tag",method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)#唯一標記,標識符    no_ack 注掉
     #ackownledgement確認的意思
 
channel.basic_consume(callback,
                      queue='task_queue',
                      no_ack=True    #false 沒處理完則一直保留在列隊 消息持久化   和上面的標識符一起用    #執行完了把標識符返回給客服端確認消息處理完了
                      )
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 

此時,先啟動消息生產者,然后再分別啟動3個消費者,通過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上  

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.

If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.

Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True flag. It's time to remove this flag and send a proper acknowledgment from the worker, once we're done with a task.

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_consume(callback,
                      queue='hello')

  

消息持久化  

We have learned how to make sure that even if the consumer dies, the task isn't lost(by default, if wanna disable  use no_ack=True). But our tasks will still be lost if RabbitMQ server stops.

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:

channel.queue_declare(queue='hello', durable=True)

  Although this command is correct by itself, it won't work in our setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for exampletask_queue:

channel.queue_declare(queue='task_queue', durable=True)

This queue_declare change needs to be applied to both the producer and consumer code.

At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

消息公平分發

如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

channel.basic_qos(prefetch_count=1)

帶消息持久化+公平分發的完整代碼

生產者端

import pika
import time
credentials=pika.PlainCredentials("rabbitadmin","123456")
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.31.128',credentials=credentials))
channel = connection.channel()

# 聲明queue                       #durable可以讓隊列永久保存
channel.queue_declare(queue='task_queue',durable=True)   #創建隊列

# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
import sys

message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
#把腳本的參數拼接起來
channel.basic_publish(exchange='',
                      routing_key='task_queue', #往這個隊列發
                      body=message, #發的內容
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent#消息持久,保留消息
                      )
                      )
print(" [x] Sent %r" % message)
connection.close()

 消費者端

import pika, time

credentials=pika.PlainCredentials("rabbitadmin","123456")
connection=pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.31.128',credentials=credentials))

channel = connection.channel()

# 聲明queue                       #durable可以讓隊列永久保存
# channel.queue_declare(queue='task_queue')   #創建隊列     #如果另一頭聲明了這這邊不能聲明了


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(20)
    print(" [x] Done")
    print("method.delivery_tag", method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)#唯一標記,標識符    no_ack 注掉
    #ackownledgement確認的意思

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue',
                      # no_ack=True     #false 沒處理完則一直保留在列隊   和上面的標識符一起用
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

直播流程

要確保全國各地都不卡,所有人都跑到北京的機房就癱了,

中心節點,然后外面圍一圈,把直播的視頻切片,每幾秒鍾就切一次片推到外面那一圈上面去,再傳到各個地方去,用戶連的是各個地方的而不是北京的是視頻鏡像,鏡像不斷地同步

Publish\Subscribe(消息發布\訂閱) 

之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了,

An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

Exchange在定義的時候是有類型的,以決定到底是哪些Queue符合條件,可以接收消息


fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息

   表達式符號說明:#代表一個或多個字符,*代表任何字符
      例:#.a會匹配a.a,aa.a,aaa.a等
          *.a會匹配a.a,b.a,c.a等
     注:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout 

headers: 通過headers 來決定把消息發給哪些queue

  

廣播

消息publisher

fanout就是廣播,不用聲明隊列,不直接往隊列里發消息

import pika
import sys

credentials=pika.PlainCredentials("rabbitadmin","123456")
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.31.128',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='logs',   #級別
                         type='fanout')     #廣播

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',  #廣播發給所有人
                      routing_key='',   #之前轉發到那個隊列,現在是廣播不用指定
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消息subscriber

每個人都要聲明一個隊列,隊列的名字不能重復,每次斷開Q就會自動刪掉

把自己聲明的Q綁定到交換機上,才能接受到廣播

import pika

credentials=pika.PlainCredentials("rabbitadmin","123456")
connection=pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.31.128',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='logs',type='fanout')#聲明隊列,怕另一端沒有q

result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
queue_name = result.method.queue    #Q的名字

channel.queue_bind(exchange='logs',queue=queue_name)#把自己綁定到交換機上

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(callback,
                      queue=queue_name, #從這個隊列取
                      no_ack=True)

channel.start_consuming()

有選擇的接收消息(exchange type=direct) 

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

publisher

#組播
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',type='direct')    #類型  生成了一個新的交換機

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'#嚴重程度,級別的意思
#python direct_send.py info

message = ' '.join(sys.argv[2:]) or 'Hello World!'    #第一個參數是級別,第二個參數才是內容
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity, #廣播的時候是空的,現在是發到這個級別,哪些隊列綁定了這個級別哪些隊列就能收到消息
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

subscriber

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

#生成隨機Q
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]   #有幾個參數就取幾個
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
#python direct_receive.py info warning error

#有幾個參數就綁定幾個
for severity in severities:
    channel.queue_bind(exchange='direct_logs',#級別
                       queue=queue_name,
                       routing_key=severity)#綁定隊列在這綁定

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

更細致的消息過濾

Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.

In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).

That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.

publisher

#和組播差不多
#但是可以加條件過濾
import pika
import sys

credentials=pika.PlainCredentials("rabbitadmin","123456")
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.31.128',credentials=credentials))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'#匿名
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

subscriber

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:] #
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:    #也循環綁定
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

To receive all the logs run:

python receive_logs_topic.py "#"

To receive all logs from the facility "kern":

python receive_logs_topic.py "kern.*"

Or if you want to hear only about "critical" logs:

python receive_logs_topic.py "*.critical"

You can create multiple bindings:

python receive_logs_topic.py "kern.*" "*.critical" 

And to emit a log with a routing key "kern.critical" type:

python emit_log_topic.py "kern.critical" "A critical kernel error"

  

Remote procedure call (RPC)

To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)

RPC server

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')    #聲明Q


def fib(n):     #執行函數返命令回結果
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)   #執行任務,拿到結果

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,    #放到消息頭里
                     properties=pika.BasicProperties(correlation_id= \
                                                         props.correlation_id),#把唯一標識符也寫進去
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)  #確認消息


channel.basic_qos(prefetch_count=1)     #當有任務沒處理完時,不接新任務
channel.basic_consume(on_request, queue='rpc_queue')    #接受任務,調函數

print(" [x] Awaiting RPC requests")
channel.start_consuming()

 RPC client

import pika
import uuid


class FibonacciRpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials("rabbitadmin", "123456")
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            '192.168.31.128', credentials=credentials))
        channel = self.connection.channel()
        self.channel = self.connection.channel()    #連上遠程的Q

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue   #生成隨機一個Q

        self.channel.basic_consume(self.on_response,#當收到消息時調用的函數
                                   no_ack=True,#准備接受命令結果
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):#有消息是執行
        if self.corr_id == props.correlation_id:    #判斷發出去的id和接受的id是否相等
            self.response = body    #把執行的結果賦給body

    def call(self, n):
        self.response = None    #標識符,有結果則改變
        self.corr_id = str(uuid.uuid4())    #唯一標識符
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue', #聲明一個Q
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,    #把結果發到這里
                                       correlation_id=self.corr_id,     #發的時候加唯一標識符
                                   ),
                                   body=str(n))     #發的消息
        while self.response is None:
            self.connection.process_data_events()   #檢查隊列里有沒有新消息,但是不會阻塞,沒有則就、繼續往下走
        return int(self.response)   #不為Noen時返回結果


fibonacci_rpc = FibonacciRpcClient()    #實例化

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)   #執行call     #拿到self.response的值
print(" [.] Got %r" % response)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM