RabbitMQ消息隊列


  一、簡介

  RabbitMQ是一個在AMQP基礎上完整的、可復用的企業消息系統,遵循Mozilla Public License開源協議。MQ全稱Message Queue(消息隊列),它是一種應用程序對應用程序的通信方式。應用程序通過讀寫入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接他們。消息傳遞指的是程序之間通過在消息中發送數據通信,而不是直接調用彼此來通信,直接調用通常用於諸如遠程過程調用的技術。排隊指的是應用程序通過隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。

  應用場景:

    1,系統集成,分布式系統的設計。各種子系統通過消息來對接,這種解決方案也逐步發展成一種架構風格,即‘通過消息傳遞的架構’

    2,當系統中的同步處理方式嚴重影響了吞吐量,比如日志記錄。假如需要記錄系統中所有的用戶行為日志,如果通過同步的方式記錄日志勢必會影響系統的響應速度,當我們將日志消息發送到消息隊列,記錄日志的子系統就會通過異步的方式拿到日志消息。

    3,系統的高可用性,比如電商的秒殺場景,當某一時刻應用服務器或數據庫服務器收到大量請求,將會出現系統宕機。如果能夠將請求轉發到消息隊列,再由服務器去拿到這些消息,將會使得請求平穩,提高系統的可用性。

  二、下載及安裝

  1,安裝erlang,在官網下載,然后一直點下一步進行安裝

  2,安裝RabbitMQ,也是官網下載,直接安裝

  3,配置

  用cmd進入到RabbitMQ Server\rabbitmg_server-3.6.5\sbin目錄下,輸入:rabbitmg-plugins enable rabbitmg-management,這樣就配置好了。此時我們就可以以管理員身份打開cmd,輸入:net start rabbitmq 啟動服務;輸入:net stop rabbitmq 關閉服務

  三、RabbitMQ簡單模式

  在使用過程中,始終貫穿着三個部分,一是生產者,二是消費者,三是RabbitMQ Server(是運行在某個服務器上的),生產者是往消息隊列中放數據的,而消費者是從消息隊列中取數據的。我們是在python中實現的,所以得安裝一個pika的模塊,幫我們連接隊列。

  1,基本代碼

  生產者,producer.py

import pika
#連接rabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
channel = connection.channel()
#創建隊列,隊列名為‘hello’,這個名字隨意
channel.queue_declare(queue='hello')
#往隊列里添加值,routing_key是表示我們要往‘hello’隊列放數據,body表示我們這次放入的數據為‘hello world’
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
#這是關閉連接 connection.close()

  消費者,consumer.py

import pika
#連接rabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
channel = connection.channel()
#創建隊列,這里也是創建隊列的意思,消費者和生產者說不定哪一個先啟動,所以誰先啟動就誰創建,當另一個進來后。如果隊列存在了,就不創建了
channel.queue_declare(queue='hello')
#回調函數
def callback(ch, method, properties, body):
    print(body)
#確定監聽隊列事件,當隊列里有值,就會取值,然后返回給回調函數
channel.basic_consume( callback,
                       queue='hello',
                       no_ack=True)
#這才是真正的開始監聽
channel.start_consuming()

  2,no_ack參數

  2.1 no_ack=True時,為無應答模式,這里的應答指的是消費者不給隊列回應。這種情況下,消費者從隊列中拿走一條數據,隊列會立即把這條數據刪掉,當消費者在處理這條數據時出現錯誤導致消費者斷開而沒有完成任務時,消費者是不可能再次從隊列里拿到剛才的那條數據,也就意味着這條數據沒有處理但是消失了,從而這條數據永遠也得不到處理了。

  2.2 no-ack=false,為應答模式,消費者每取一條數據,當處理成功后會給隊列一個應答,此時,隊列收到應答才會把數據刪除;當消費者處理數據失敗而沒有給隊列應答,隊列是不會刪除這條數據,等着下一個消費者再次來取這個數據,當收到應答后才會刪除這條數據

  2.3 代碼,這過程只是消費者與隊列的關系變化,所以只用改變消費者的代碼既可

  消費者,consumer_ack.py

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='10.211.55.4'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
    print(body)
#在這加上一句應答 ch.basic_ack(delivery_tag
= method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) #把no-ack設置為False channel.start_consuming()

  消費者在處理過程中由於某種原因(比如bug等)斷開連接后,消息是不會丟失的,這個數據會給下一個來拿去數據的消費者

  3,durable參數,也就是數據持久化存儲

  生產者把數據放在隊列中,當消費者還沒拿取數據,隊列所在的服務器崩了,此時,隊列里面的數據就會消失了。我們要想吹這種情況,那只有讓隊列里的數據持久化存儲了,這需要我們在定義隊列是就應該聲明。

  生產者,producer_durable.py

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
#給durable賦為True既可,也就是讓其持久化存儲 channel.queue_declare(queue
='hello', durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!',
              #這里要把模式設置為2 properties
=pika.BasicProperties( delivery_mode=2, )) connection.close()

  消費者,consumer_durable.py

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
#durable設置為True channel.queue_declare(queue
='hello', durable=True) def callback(ch, method, properties, body): print(body) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) channel.start_consuming()

  rabbitMQ服務器宕機,數據不丟失

  4,消息獲取順序

  隊列的數據默認是按照先后順序取值,也就是有三個消費者,假如第一波取值順序為a-b-c,那以后的順序都是a-b-c,不管a處理數據的快慢,比如說a還在處理數據,然而b已經處理完了,但b還是不能拿值,必須a先拿值,然后b才能拿值。這種形式效率太低。

  channel.basic_qos(prefetch_count=1)設置這個參數后,就不是按順序取值,而是誰先來誰取值。這只是消費者有關的設置。

  消費者,consumer_prefetch.py

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
    print(body)
    ch.basic_ack(delivery_tag = method.delivery_tag)
#加上這句就行
channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)
channel.start_consuming()

  四、RabbitMQ的exchange模式

  1,發布訂閱模式

  簡單模式下,一條數據只會給一個消費者;發布訂閱模式下,一條消息給所有訂閱的消費者。

  生產者把消息放在一個指定的exchange里面,然后每個消費者創建一個隊列跟這個exchange綁定,從而消費者就可以拿到訂閱的數據了。

  發布者,

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.137.208'))
channel = connection.channel()
#模式要更改
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = "Hello World!"
#這里發布者發送消息到exchange channel.basic_publish(exchange='logs', routing_key='', body=message) connection.close()

  訂閱者

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.137.208'))
channel = connection.channel()
#模式也要改
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')
# 隨機創建隊列
result = channel.queue_declare(exclusive=True)
#拿到隊列名字 queue_name
= result.method.queue # 把隊列綁定到exchange channel.queue_bind(exchange='logs', queue=queue_name) def callback(ch, method, properties, body): print( body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

  2,關鍵字模式

  在發布者發布消息時,會含有關鍵字;而訂閱者這次不單單只是把隊列跟exchange綁定,還要綁定關鍵字,當發布者的關鍵字和綁定的關鍵字相同時,訂閱者才能拿到消息,然而一個隊列可以跟一個exchange綁定多個關鍵字。

  發布者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

# 聲明一個交換機
channel.exchange_declare(exchange='direct_logs',exchange_type="direct")

message ="warning: Hello World!"
channel.basic_publish(exchange='direct_logs',
                      routing_key='warning',    #這是發布者發送消息的帶的關鍵字
                      body=message)
connection.close()

  訂閱者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='direct_logs',
                   queue=queue_name,
                   routing_key="error")#這是隊列跟exchange綁定的關鍵字def callback(ch, method, properties, body):
    print(body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

  3,模糊匹配

  這是基於關鍵字的,但這次不是要相同了,而是用模糊匹配,‘#’代表匹配0或多個字符,‘*’表示匹配一個任意字符

  發布者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

# 聲明一個交換機
channel.exchange_declare(exchange='topic_logs',exchange_type="topic")

message ="Hello World!"
channel.basic_publish(exchange='topic_logs',
                      routing_key='banana.apple.xigua.juzi',     #這是發布時帶着的關鍵字
                      body=message)
connection.close()

  訂閱者

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='topic_logs',
                   queue=queue_name,
                   routing_key="*.apple.#")    #這是隊列跟exchange綁定的關鍵字,但這里是模糊匹配,能匹配上,就可以拿到值
def callback(ch, method, properties, body): print(body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM