rabbitMQ常用命令:
rabbitmq-server (啟動rabbitmq, 必須進入rabbitmq安裝sbin目錄下執行)
rabbitmqctl list_queues (查看所有隊列信息)
rabbitmqctl stop_app (關閉應用)
rabbitmqctl start_app (開啟應用,啟動應用,和上述關閉命令配合使用,達到清空隊列的目的)
rabbitmqctl reset (清除所有隊列)
rabbitmqctl status (查看運行信息)
rabbitmqctl list_exchanges (查看 exchange(過濾) 模式)
rabbitmqctl list_bindings (查看 捆綁 信息 )
如何保證數據不丟失?
-
1. 在隊列里,設置durable=true #代表 隊列持久化,就算斷電隊列也不會消失,但是消息會丟失
2. 在生產者端,
properties = pika.BasicProperties(
delivery_mode=2, #確保生產者生產的消息持久化到隊列里面,就算斷電消息也不會消失
)
3. 在消費者端 設置auto_ack
channel.basic_consume(
#auto_ack=False, 確認報消費者取到消息,每次消費者獲取消息的時候都會和生產這進行確認; #auto_ack=True,不進行確認,
queue='test', on_message_callback=callback, auto_ack=False)
ps:白話 關於auto_ack的設置,為True時 負責任的態度,生產者會關注消費者是否拿到消息(消費者拿取得時候,會告訴生產者已經拿到),為False時,不關注消費者是否消費,(不負責任的態度,只管生產不管你是否消費)
exchange的三種模式:
fanout : 廣播 -----所有捆綁了得客戶都能收到消息。 direct : 組播 -------只有同一組的客戶才能收到。 topic : 規則波 ------ 滿足某一要求的所有可客戶都能收到。
關於公司月活兩等問題
QPS: 每秒訪問的次數 DAU: 日活躍用戶數 MAU: 月活躍用戶數
關於:QPS 、DAU、這些統計圖表都是運維做的,我們不做,他們做好后有一個鏈接,我們點進去就能直接看到了。
1、linux ----- docker
2、shell 腳本語言
3、numpy,pandas
4、后端面試常問: mysql、隊列相關、緩存相關、服務器相關
常規使用隊列:
producer.py
import pika
#建立連接
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
#實例化一個對象
channel = connection.channel()
### 聲明隊列 ,durable=True 開啟隊列持久化,就算斷電隊列也不會消失
channel.queue_declare(queue='test',durable=True)
#發送的信息
channel.basic_publish(exchange='',
routing_key='test',
body='Hello World!',
#確保生產者生產的消息持久化到隊列里面,就算斷電消息也不會消失
properties = pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(" [x] Sent 'Hello World!'")
#回收資源
connection.close()
consumer.py
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
#建立連接
channel.queue_declare(queue='test', durable=True)
#定義回調方法,
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
#配置參數
channel.basic_consume(
#auto_ack=False, 確認報消費者取到消息,每次消費者獲取消息的時候都會和生產這進行確認。auto_ack=True,不進行確認,
queue='test', on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
#開始接收
channel.start_consuming()
exchange 的三種方式實現:
第一種:fanout
producer.py
import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
#設置消息過濾條件與 發送級別 條件logs 級別fanout---廣播
channel.exchange_declare(exchange='logs', exchange_type='fanout')
#會自動創建一個連接着 logs 的 唯一的隊列:queue_name
result = channel.queue_declare('', exclusive=True) ### exclusive 排他的 唯一的
queue_name = result.method.queue
print("queue_name:", queue_name)
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
#打包發送
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
consumer.py
import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
#確定接收消息 條件與logs級別fanout
channel.exchange_declare(exchange='logs',exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
第二種:direct --- 組播
producer.py
import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
#設置 過濾 級別
log_levels = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='direct_logs', routing_key=log_levels, body=message)
print(" [x] Sent %r:%r" % (log_levels, message))
connection.close()
consumer.py
import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
#拿到接收的 級別
log_levels = sys.argv[1:]
if not log_levels:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
#可能有多個級別
for severity in log_levels:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
第三種:topic - ------規則播
PS: 過濾條件用 正則表達式 表示
producer.py
import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
#這里上面不同,這里是用 點 距隔
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
consumer.py
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()