rabbitmq 的exchange 三種使用方式


運行rabbitmq服務必須先啟動rabbitmq,服務夯住了才能進行 發送--接受 操作

rabbitMQ常用命令:

rabbitmq-server          (啟動rabbitmq, 必須進入rabbitmq安裝sbin目錄下執行)

rabbitmqctl list_queues (查看所有隊列信息)

rabbitmqctl stop_app     (關閉應用)

rabbitmqctl start_app   (開啟應用,啟動應用,和上述關閉命令配合使用,達到清空隊列的目的)
 
rabbitmqctl reset       (清除所有隊列)

rabbitmqctl status       (查看運行信息)

rabbitmqctl list_exchanges   (查看 exchange(過濾) 模式)

rabbitmqctl list_bindings   (查看 捆綁 信息 )

 

如何保證數據不丟失?

  1. 1. 在隊列里,設置durable=true  #代表 隊列持久化,就算斷電隊列也不會消失,但是消息會丟失
    2. 在生產者端,
      properties = pika.BasicProperties(
            delivery_mode=2, #確保生產者生產的消息持久化到隊列里面,就算斷電消息也不會消失
        )
    3. 在消費者端 設置auto_ack
    channel.basic_consume(
      #auto_ack=False, 確認報消費者取到消息,每次消費者獲取消息的時候都會和生產這進行確認; #auto_ack=True,不進行確認,
      queue='test', on_message_callback=callback, auto_ack=False)
       
    ps:白話 關於auto_ack的設置,為True時 負責任的態度,生產者會關注消費者是否拿到消息(消費者拿取得時候,會告訴生產者已經拿到),為False時,不關注消費者是否消費,(不負責任的態度,只管生產不管你是否消費)

exchange的三種模式:

fanout : 廣播 -----所有捆綁了得客戶都能收到消息。 direct : 組播 -------只有同一組的客戶才能收到。 topic : 規則波 ------ 滿足某一要求的所有可客戶都能收到。

 

 

關於公司月活兩等問題

QPS: 每秒訪問的次數 DAU: 日活躍用戶數 MAU: 月活躍用戶數

關於:QPS  、DAU、這些統計圖表都是運維做的,我們不做,他們做好后有一個鏈接,我們點進去就能直接看到了。 

 

1、linux ----- docker

2、shell 腳本語言

3、numpy,pandas

4、后端面試常問: mysql、隊列相關、緩存相關、服務器相關

常規使用隊列:

producer.py

import pika
#建立連接
connection = pika.BlockingConnection(
   pika.ConnectionParameters(host='localhost'))
#實例化一個對象
channel = connection.channel()

### 聲明隊列 ,durable=True 開啟隊列持久化,就算斷電隊列也不會消失
channel.queue_declare(queue='test',durable=True)
#發送的信息
channel.basic_publish(exchange='',
                     routing_key='test',
                     body='Hello World!',
                     #確保生產者生產的消息持久化到隊列里面,就算斷電消息也不會消失
                     properties = pika.BasicProperties(
                       delivery_mode=2,  # make message persistent
                    )
                    )

print(" [x] Sent 'Hello World!'")
#回收資源
connection.close()

consumer.py

import pika

connection = pika.BlockingConnection(
   pika.ConnectionParameters(host='localhost'))

channel = connection.channel()
#建立連接
channel.queue_declare(queue='test', durable=True)
#定義回調方法,
def callback(ch, method, properties, body):
   print(" [x] Received %r" % body)
   ch.basic_ack(delivery_tag=method.delivery_tag)
#配置參數
channel.basic_consume(
   #auto_ack=False, 確認報消費者取到消息,每次消費者獲取消息的時候都會和生產這進行確認。auto_ack=True,不進行確認,
   queue='test', on_message_callback=callback, auto_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
#開始接收
channel.start_consuming()

exchange 的三種方式實現:

第一種:fanout

producer.py

import sys
import pika

connection = pika.BlockingConnection(
   pika.ConnectionParameters(host='localhost'))

channel = connection.channel()
#設置消息過濾條件與 發送級別   條件logs 級別fanout---廣播
channel.exchange_declare(exchange='logs', exchange_type='fanout')

#會自動創建一個連接着 logs 的 唯一的隊列:queue_name
result = channel.queue_declare('', exclusive=True) ### exclusive 排他的 唯一的

queue_name = result.method.queue
print("queue_name:", queue_name)

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
   print(" [x] %r" % body)
#打包發送
channel.basic_consume(
   queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

consumer.py

import sys
import pika

connection = pika.BlockingConnection(
   pika.ConnectionParameters(host='localhost'))

channel = connection.channel()
#確定接收消息 條件與logs級別fanout
channel.exchange_declare(exchange='logs',exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"

channel.basic_publish(exchange='logs', routing_key='', body=message)

print(" [x] Sent %r" % message)

connection.close()
第二種:direct --- 組播

producer.py

import sys
import pika

connection = pika.BlockingConnection(
   pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
#設置 過濾 級別
log_levels = sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(
   exchange='direct_logs', routing_key=log_levels, body=message)
print(" [x] Sent %r:%r" % (log_levels, message))
connection.close()

consumer.py

import sys
import pika

connection = pika.BlockingConnection(
   pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

#拿到接收的 級別
log_levels = sys.argv[1:]
if not log_levels:
   sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
   sys.exit(1)
#可能有多個級別
for severity in log_levels:
   channel.queue_bind(
       exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
   print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(
   queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
第三種:topic - ------規則播

PS: 過濾條件用 正則表達式 表示

producer.py

import sys
import pika
connection = pika.BlockingConnection(
   pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

#這里上面不同,這里是用 點 距隔
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(
   exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

consumer.py

import pika
import sys

connection = pika.BlockingConnection(
   pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]

if not binding_keys:
   sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
   sys.exit(1)

for binding_key in binding_keys:
   channel.queue_bind(
       exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
   print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
   queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM