RabbitMQ


一、安裝

curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum install socat logrotate -y

修改repo文件

vi /etc/yum.repos.d/rabbitmq.repo

##
## Zero dependency Erlang
##

[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/$basearch
repo_gpgcheck=1
gpgcheck=1
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
       https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

##
## RabbitMQ server
##

[rabbitmq_server]
name=rabbitmq_server
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/$basearch
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

[rabbitmq_server-source]
name=rabbitmq_server-source
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

安裝:

yum install   erlang rabbitmq-server

安裝后啟動:

rabbitmq-server

二、概念

2.1 生產者

生成消息的進程

2.2 消費者

消費消息的進程

2.3 exchange交換器

在rabbitmq中,加入了exchange概念,即交換器,生產者實際是把消息發給交換器,然后由交換器來決定怎么把消息發到隊列,然后消費者從隊列中獲取消息。
交換器的類型有:

  • fanout 扇區。會把消息發送給所有綁定了該交換器的隊列
  • direct 在fanout基礎上支持全匹配過濾
  • topic 在fanout基礎上支持模糊匹配過濾
  • headers

查看server注冊的所有交換器:rabbitmqctl list_bindings

空字符串表示默認交換器

2.4 queue隊列

一個隊列里面的消息,只會發給一個消費者,如果需要多個消費者同時接收同一個消息,就需要為每個消費者定義一個隊列,然后把消息,發送多份到不同的隊列。

二、Python使用

教程

安裝庫:

  pip install pika

三、隊列模式

教程

  • 多個消費者監聽同一個隊列
  • 消息只發送一份到隊列
  • 一個消息只會被一個消費者消費
  • 如果消費者拿了消息,沒有消費成功,會給另一個消費者消費

生產者

# encoding=utf8
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 定義rabbitmq服務的ip和端口
channel = connection.channel()
queue_name = 'hello'  # 隊列名
channel.queue_declare(queue=queue_name)  # 創建隊列,如果發送到一個不存在的隊列,消息會被丟棄
channel.basic_publish(exchange='',
                      routing_key=queue_name,
                      body='Hello World!11')
print(" [x] Sent 'Hello World!'")

消費者

import pika


def callback(ch, method, properties, body):
    print('callback', ch, method, properties, body)


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
queue_name = 'hello'
channel.basic_consume(queue='hello',
                      auto_ack=True,
                      on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

四、發布訂閱模式

  • 一個消息會有屬性:exchange和routing_key

  • 消息會發布給所有訂閱了該exchange和routing_key的消費者

  • 這個模式需要用到fanout類型的交換器

  • 生產者需要定義交換器名字(類似其他MQ里的topic)和類型,也就是channel.exchange_declare(exchange='logs', exchange_type='fanout')

  • 然后發布消息到該交換器

  • 消費者需要指定自己的隊列ID,可以自己定義,也可以讓server隨機分配一個channel.queue_declare(queue='', exclusive=True)queue傳空就是隨機分配。

  • 然后把自己的隊列綁定到交換器queue_bind(exchange='logs',queue=result.method.queue)

  • 然后消費者從該隊列接收消息

  • 生產者發送消息到交換器后,交換器會把這個消息發送到所有綁定了該交換器的隊列。

生產者

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 定義rabbitmq服務的ip和端口
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')  # 聲明交換器,名字是logs 類型是fanout
channel.basic_publish(exchange='logs', routing_key='', body='body111')
print(" [x] Sent 'Hello World!'")

消費者

import pika
import time

def callback(ch, method, properties, body):
    print('callback', ch, method, properties, body)
    time.sleep(10)


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
result = channel.queue_declare(queue='', exclusive=True)  # exclusive=True表示當消費者關閉后,隊列會相應被刪除
print(result.method.queue) # 分配到的隊列ID
channel.queue_bind(exchange='logs',queue=result.method.queue)
channel.basic_consume(queue=result.method.queue,
                      auto_ack=True,
                      on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

五、帶過濾的發布訂閱模式

在發布訂閱模式中,可以加上過濾功能,就是某個消費者,只消費這個交換器里面的某一部分消息。過濾方法分為全匹配過濾和模糊匹配過濾

5.1全匹配過濾

這里需要使用到direct類型的交換器。全匹配是指消息的routing_key等於隊列綁定的routing_key,才會發送消息到該隊列
步驟:

  1. 生產者定義消息的routing_key值
channel.exchange_declare(exchange='direct_logs', exchange_type='direct') 
channel.basic_publish(exchange='direct_logs', routing_key='black', body='body111')
  1. 消費者綁定隊列時指定routing_key值,一個消費者可以綁定多個routing_key
channel.queue_bind(exchange='direct_logs', queue=result.method.queue, routing_key='black')
channel.queue_bind(exchange='direct_logs', queue=result.method.queue, routing_key='black1')

5.2模糊匹配過濾

這里需要使用到topic類型的交換器。模糊配是指消息的routing_key可以模糊匹配上隊列綁定的routing_key,才會發送消息到該隊列。
消息的routing_key不支持通配符
隊列的routing_key支持通配符:

  • * 表示任意一個字符
  • # 表示0或多個字符

實驗發現*和#基本是一樣的,都是0-n個任意字符。不知道為什么。

  1. 生產者定義交換器和消息的routing_key
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')  # 聲明交換器,名字是logs 類型是fanout
channel.basic_publish(exchange='topic_logs', routing_key='logs1.ab', body='body111')
  1. 消費者定義接收的routing_key
channel.queue_bind(exchange='topic_logs', queue=result.method.queue, routing_key='logs.*')
channel.queue_bind(exchange='topic_logs', queue=result.method.queue, routing_key='logs.#')

當消息的routing_key等於

  • logs.error 可以接收
  • logs.a 可以接收
  • logs1.error 不可以接收

六、RPC模式

  • 客戶端通過隊列,發送任務給服務端
  • 服務端執行任務后,通過另一個隊列,發送結果給客戶端

客戶端:

#!/usr/bin/env python
import pika
import uuid


class FibonacciRpcClient(object):
#!/usr/bin/env python
import pika
import uuid


class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue
        # 接收結果的隊列
        self.channel.basic_consume(  queue=self.callback_queue,on_message_callback=self.on_response,auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='', routing_key='rpc_queue',
                                   properties=pika.BasicProperties(reply_to=self.callback_queue,
                                                                   correlation_id=self.corr_id, ),body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

服務端:

# encoding=utf8
import pika
import time

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='', routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

七、高可用和高並發

教程

1. 普通集群模式

  • 假如集群有3個節點,ABC
  • ABC都會保存元數據,元數據包括哪個隊列在哪個節點
  • 隊列的數據,也就是消息,只會保存在其中一個節點
  • 所以
    • 當生產者發送消息M,M只會被保存在其中一個節點,例如A
    • 如果消費者連接到節點B,要消費M,發現B沒有M,但是因為B有元數據,知道M在A節點,所以會引導消費者去連接A
  • 特點:
    • 能解決高並發問題,也就是可以橫向拓展
    • 不能解決高可用問題,因為其中一個節點掛了,數據也就丟失了

2. 鏡像集群模式

  • 假如集群有3個節點,ABC
  • 每個節點都會保存元數據和隊列的數據
  • 所以:
    • 當A掛了,生產者消費者可以連接到B,數據不會丟失
  • 特點:
    • 能解決高可用問題
    • 不能解決高並發問題,因為當隊列很多,並不能通過橫向拓展來提升吞吐量。可以通過HA同步策略來解決

HA同步策略來解決

HA-mode HA-params 說明
all 隊列會復制到所有的節點
exactly count 隊列只會復制到count個節點。例如count=2,消息會保存在節點AB,C不會有。當A掛了,會把消息同步到C,使鏡像數量依然等於2
nodes node_name 指定復制到哪個節點,例如node_name=AB,會把消息復制到AB兩個節點。如果AB都不存在,保存在生產者當前連接的節點

當用上HA同步策略,可以實現:

  1. 高可用,隊列會被復制到多個節點,保證其中一個節點掛了,隊列依然正常服務
  2. 高並發,只要mode不是all,就可以實現橫向拓展

八、實踐

1. 異步服務

用於削峰,或者提升降低接口時延。例如發微博后,讀優先的分發邏輯可以放到異步。
使用隊列模式,因為一個消息只能被消費一次。消費失敗需要重試。

2.系統解耦

例如用戶注冊后,告訴其他幾個微服務,讓它們處理新用戶注冊邏輯,例如金錢服務要發送新用戶禮包。
使用發布訂閱模式,因為有多個微服務,都需要消費該消息。

七、問題

說明

常見問題:

  1. 高可用
  2. 可靠傳輸
    1. 生產者到MQ
      1. 通過confirm機制
    2. MQ自身
      1. 持久化
      2. 鏡像復制
    3. MQ到消費者
      1. ack
  3. 順序消費
  4. 消息堆積
    1. 積壓幾個消息,消費滿
      1. 擴容,增加消費者
    2. MQ消息過期
      1. 設置消息不過期
    3. MQ滿了
      1. 先把消息導出,保存到硬盤,用腳本定期消費,或者在低鋒時間把消息重新發到MQ

其他:

  • 消費失敗,重試問題
    • 有ack機制,如果長時間沒有ack,server會把任務發給另一個消費者,所以底層最好做好冪等
  • 發送任務失敗
    • 只能重試發消息
  • server宕機,啟用從庫
    • 通過集群模式,數據有多份鏡像
  • 主從不同步問題
    • 發送消息后,保證消費都保存到多個鏡像,才返回成功
  • 消息丟失
    • server收到消息后,是否持久化?
      • 有鏡像
    • 如果不持久化,server宕機怎么辦?
      • 通過鏡像實現持久化
  • 怎么橫行拓展,高並發?
    • 集群模式


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM