一、安裝
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum install socat logrotate -y
修改repo文件
vi /etc/yum.repos.d/rabbitmq.repo
##
## Zero dependency Erlang
##
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/$basearch
repo_gpgcheck=1
gpgcheck=1
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
##
## RabbitMQ server
##
[rabbitmq_server]
name=rabbitmq_server
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/$basearch
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
[rabbitmq_server-source]
name=rabbitmq_server-source
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
安裝:
yum install erlang rabbitmq-server
安裝后啟動:
rabbitmq-server
二、概念
2.1 生產者
生成消息的進程
2.2 消費者
消費消息的進程
2.3 exchange交換器
在rabbitmq中,加入了exchange概念,即交換器,生產者實際是把消息發給交換器,然后由交換器來決定怎么把消息發到隊列,然后消費者從隊列中獲取消息。
交換器的類型有:
- fanout 扇區。會把消息發送給所有綁定了該交換器的隊列
- direct 在fanout基礎上支持全匹配過濾
- topic 在fanout基礎上支持模糊匹配過濾
- headers
查看server注冊的所有交換器:rabbitmqctl list_bindings
空字符串表示默認交換器
2.4 queue隊列
一個隊列里面的消息,只會發給一個消費者,如果需要多個消費者同時接收同一個消息,就需要為每個消費者定義一個隊列,然后把消息,發送多份到不同的隊列。
二、Python使用
安裝庫:
pip install pika
三、隊列模式
- 多個消費者監聽同一個隊列
- 消息只發送一份到隊列
- 一個消息只會被一個消費者消費
- 如果消費者拿了消息,沒有消費成功,會給另一個消費者消費
生產者
# encoding=utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 定義rabbitmq服務的ip和端口
channel = connection.channel()
queue_name = 'hello' # 隊列名
channel.queue_declare(queue=queue_name) # 創建隊列,如果發送到一個不存在的隊列,消息會被丟棄
channel.basic_publish(exchange='',
routing_key=queue_name,
body='Hello World!11')
print(" [x] Sent 'Hello World!'")
消費者
import pika
def callback(ch, method, properties, body):
print('callback', ch, method, properties, body)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
queue_name = 'hello'
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
四、發布訂閱模式
-
一個消息會有屬性:exchange和routing_key
-
消息會發布給所有訂閱了該exchange和routing_key的消費者
-
這個模式需要用到fanout類型的交換器
-
生產者需要定義交換器名字(類似其他MQ里的topic)和類型,也就是
channel.exchange_declare(exchange='logs', exchange_type='fanout')
-
然后發布消息到該交換器
-
消費者需要指定自己的隊列ID,可以自己定義,也可以讓server隨機分配一個
channel.queue_declare(queue='', exclusive=True)
queue傳空就是隨機分配。 -
然后把自己的隊列綁定到交換器
queue_bind(exchange='logs',queue=result.method.queue)
-
然后消費者從該隊列接收消息
-
生產者發送消息到交換器后,交換器會把這個消息發送到所有綁定了該交換器的隊列。
生產者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 定義rabbitmq服務的ip和端口
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout') # 聲明交換器,名字是logs 類型是fanout
channel.basic_publish(exchange='logs', routing_key='', body='body111')
print(" [x] Sent 'Hello World!'")
消費者
import pika
import time
def callback(ch, method, properties, body):
print('callback', ch, method, properties, body)
time.sleep(10)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
result = channel.queue_declare(queue='', exclusive=True) # exclusive=True表示當消費者關閉后,隊列會相應被刪除
print(result.method.queue) # 分配到的隊列ID
channel.queue_bind(exchange='logs',queue=result.method.queue)
channel.basic_consume(queue=result.method.queue,
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
五、帶過濾的發布訂閱模式
在發布訂閱模式中,可以加上過濾功能,就是某個消費者,只消費這個交換器里面的某一部分消息。過濾方法分為全匹配過濾和模糊匹配過濾
5.1全匹配過濾
這里需要使用到direct類型的交換器。全匹配是指消息的routing_key等於隊列綁定的routing_key,才會發送消息到該隊列
步驟:
- 生產者定義消息的routing_key值
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.basic_publish(exchange='direct_logs', routing_key='black', body='body111')
- 消費者綁定隊列時指定routing_key值,一個消費者可以綁定多個routing_key
channel.queue_bind(exchange='direct_logs', queue=result.method.queue, routing_key='black')
channel.queue_bind(exchange='direct_logs', queue=result.method.queue, routing_key='black1')
5.2模糊匹配過濾
這里需要使用到topic類型的交換器。模糊配是指消息的routing_key可以模糊匹配上隊列綁定的routing_key,才會發送消息到該隊列。
消息的routing_key不支持通配符
隊列的routing_key支持通配符:
*
表示任意一個字符#
表示0或多個字符
實驗發現*和#基本是一樣的,都是0-n個任意字符。不知道為什么。
- 生產者定義交換器和消息的routing_key
channel.exchange_declare(exchange='topic_logs', exchange_type='topic') # 聲明交換器,名字是logs 類型是fanout
channel.basic_publish(exchange='topic_logs', routing_key='logs1.ab', body='body111')
- 消費者定義接收的routing_key
channel.queue_bind(exchange='topic_logs', queue=result.method.queue, routing_key='logs.*')
channel.queue_bind(exchange='topic_logs', queue=result.method.queue, routing_key='logs.#')
當消息的routing_key等於
- logs.error 可以接收
- logs.a 可以接收
- logs1.error 不可以接收
六、RPC模式
- 客戶端通過隊列,發送任務給服務端
- 服務端執行任務后,通過另一個隊列,發送結果給客戶端
客戶端:
#!/usr/bin/env python
import pika
import uuid
class FibonacciRpcClient(object):
#!/usr/bin/env python
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
# 接收結果的隊列
self.channel.basic_consume( queue=self.callback_queue,on_message_callback=self.on_response,auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='', routing_key='rpc_queue',
properties=pika.BasicProperties(reply_to=self.callback_queue,
correlation_id=self.corr_id, ),body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
服務端:
# encoding=utf8
import pika
import time
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='', routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
七、高可用和高並發
1. 普通集群模式
- 假如集群有3個節點,ABC
- ABC都會保存元數據,元數據包括哪個隊列在哪個節點
- 隊列的數據,也就是消息,只會保存在其中一個節點
- 所以
- 當生產者發送消息M,M只會被保存在其中一個節點,例如A
- 如果消費者連接到節點B,要消費M,發現B沒有M,但是因為B有元數據,知道M在A節點,所以會引導消費者去連接A
- 特點:
- 能解決高並發問題,也就是可以橫向拓展
- 不能解決高可用問題,因為其中一個節點掛了,數據也就丟失了
2. 鏡像集群模式
- 假如集群有3個節點,ABC
- 每個節點都會保存元數據和隊列的數據
- 所以:
- 當A掛了,生產者消費者可以連接到B,數據不會丟失
- 特點:
- 能解決高可用問題
- 不能解決高並發問題,因為當隊列很多,並不能通過橫向拓展來提升吞吐量。可以通過HA同步策略來解決
HA同步策略來解決
HA-mode | HA-params | 說明 |
---|---|---|
all | 無 | 隊列會復制到所有的節點 |
exactly | count | 隊列只會復制到count個節點。例如count=2,消息會保存在節點AB,C不會有。當A掛了,會把消息同步到C,使鏡像數量依然等於2 |
nodes | node_name | 指定復制到哪個節點,例如node_name=AB,會把消息復制到AB兩個節點。如果AB都不存在,保存在生產者當前連接的節點 |
當用上HA同步策略,可以實現:
- 高可用,隊列會被復制到多個節點,保證其中一個節點掛了,隊列依然正常服務
- 高並發,只要mode不是all,就可以實現橫向拓展
八、實踐
1. 異步服務
用於削峰,或者提升降低接口時延。例如發微博后,讀優先的分發邏輯可以放到異步。
使用隊列模式,因為一個消息只能被消費一次。消費失敗需要重試。
2.系統解耦
例如用戶注冊后,告訴其他幾個微服務,讓它們處理新用戶注冊邏輯,例如金錢服務要發送新用戶禮包。
使用發布訂閱模式,因為有多個微服務,都需要消費該消息。
七、問題
常見問題:
- 高可用
- 可靠傳輸
- 生產者到MQ
- 通過confirm機制
- MQ自身
- 持久化
- 鏡像復制
- MQ到消費者
- ack
- 生產者到MQ
- 順序消費
- 消息堆積
- 積壓幾個消息,消費滿
- 擴容,增加消費者
- MQ消息過期
- 設置消息不過期
- MQ滿了
- 先把消息導出,保存到硬盤,用腳本定期消費,或者在低鋒時間把消息重新發到MQ
- 積壓幾個消息,消費滿
其他:
- 消費失敗,重試問題
- 有ack機制,如果長時間沒有ack,server會把任務發給另一個消費者,所以底層最好做好冪等
- 發送任務失敗
- 只能重試發消息
- server宕機,啟用從庫
- 通過集群模式,數據有多份鏡像
- 主從不同步問題
- 發送消息后,保證消費都保存到多個鏡像,才返回成功
- 消息丟失
- server收到消息后,是否持久化?
- 有鏡像
- 如果不持久化,server宕機怎么辦?
- 通過鏡像實現持久化
- server收到消息后,是否持久化?
- 怎么橫行拓展,高並發?
- 集群模式