python3之rabbitMQ


1、RabbitMQ介紹

  RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現。AMQP 的出現其實也是應了廣大人民群眾的需求,雖然在同步消息通訊的世界里有很多公開標准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在異步消息處理中卻不是這樣,只有大企業有一些商業實現(如微軟的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等聯合制定了 AMQP 的公開標准。

MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。

消息隊列技術是分布式應用間交換信息的一種技術;消息隊列可駐留在內存或磁盤上,隊列存儲消息直到它們被應用程序讀走;通過消息隊列,應用程序可獨立地執行--它們不需要知道彼此的位置、或在繼續執行前不需要等待接收程序接收此消息。

MQ主要作用是接受和轉發消息。你可以想想在生活中的一種場景:當你把信件的投進郵筒,郵遞員肯定最終會將信件送給收件人。我們可以把MQ比作 郵局和郵遞員

MQ和郵局的主要區別是,它不處理消息,但是,它會接受數據、存儲消息數據、轉發消息。

2、安裝RabbitMQ

linux上安裝:

安裝配置epel源

    $ rpm  - ivh http: / / dl.fedoraproject.org / pub / epel / 6 / i386 / epel - release - 6 - 8.noarch .rpm 
安裝erlang
    $ yum  - y install erlang
安裝RabbitMQ
    $ yum  - y install rabbitmq - server

windows上安裝:

(1)首先,您需要安裝支持的 Windows 版Erlang運行Erlang Windows安裝程序Erlang將出現在開始菜單中,設置erlang的環境變量(C:\erl9.2\bin;),測試erlang是否安裝正確:cmd-輸入:erl,能看到eshell版本號,說明安裝成功!

(2)下載rabbitMQ:(安裝下一步完成即可)

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.3/rabbitmq-server-windows-3.7.3.zip

(3)安裝RabbitMQ-Plugins,這個相當於是一個管理界面,方便我們在瀏覽器界面查看RabbitMQ各個消息隊列以及交換的工作情況,安裝方法是:打開命令行cd進入rabbitmq的sbin目錄(我的目錄是:C:\ software \ rabbitmq \ rabbitmq_server-3.6.5 \ sbin),輸入:rabbitmq-plugins enable rabbitmq_management命令,稍等會會發現出現plugins安裝成功的提示,默認是安裝6個插件。

插件安裝完之后,在瀏覽器輸入的http://本地主機:15672進行驗證

如果不能打開頁面解決方法:首先在命令行輸入:rabbitmq-service stop,接着輸入rabbitmq-service remove,再接着輸入rabbitmq-service install,接着輸入rabbitmq-service start,最后重新輸入rabbitmq-plugins enable rabbitmq_management試試,我是這樣解決的。

創建用戶名,密碼,綁定角色:

RabbitMQ報錯解決方法:

C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat status
Status of node rabbit@DESKTOP-6JT7D2H ...
Error: unable to perform an operation on node 'rabbit@DESKTOP-6JT7D2H'. Please see diagnostics information and suggestions below.

Most common reasons for this are:

 * Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues)
 * CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server)
 * Target node is not running

In addition to the diagnostics info below:

 * See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more
 * Consult server logs on node rabbit@DESKTOP-6JT7D2H

DIAGNOSTICS
===========

attempted to contact: ['rabbit@DESKTOP-6JT7D2H']

rabbit@DESKTOP-6JT7D2H:
  * connected to epmd (port 4369) on DESKTOP-6JT7D2H
  * epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traffic
  * TCP connection succeeded but Erlang distribution failed

  * Authentication failed (rejected by the remote node), please check the Erlang cookie


Current node details:
 * node name: 'rabbitmqcli57@DESKTOP-6JT7D2H'
 * effective user's home directory: C:\Users\Administrator.DESKTOP-6JT7D2H
 * Erlang cookie hash: RmzKErjVZUcsMU8wSgBGbA==

解決方法:
將C:\Users\tracyclock\.erlang.cookie 文件拷貝到C:\Windows\System32\config\systemprofile替換掉.erlang.cookie文件
重啟rabbitMQ服務:net stop RabbitMQ && net start RabbitMQ

查看用戶及用戶角色:rabbitmqctl.bat list_users

C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat list_users
Listing users ...
guest   [administrator]

新增用戶:rabbitmqctl.bat add_user username password

C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat add_user admin 123456
Adding user "admin" ...

C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat list_users
Listing users ...
admin   []
guest   [administrator]

用戶角色:

rabbitmq用戶角色可分為五類:超級管理員, 監控者, 策略制定者, 普通管理者以及其他。

(1) 超級管理員(administrator)

可登陸管理控制台(啟用management plugin的情況下),可查看所有的信息,並且可以對用戶,策略(policy)進行操作。

(2) 監控者(monitoring)

可登陸管理控制台(啟用management plugin的情況下),同時可以查看rabbitmq節點的相關信息(進程數,內存使用情況,磁盤使用情況等) 

(3) 策略制定者(policymaker)

可登陸管理控制台(啟用management plugin的情況下), 同時可以對policy進行管理。

(4) 普通管理者(management)

僅可登陸管理控制台(啟用management plugin的情況下),無法看到節點信息,也無法對策略進行管理。

(5) 其他的

無法登陸管理控制台,通常就是普通的生產者和消費者。

授權用戶為超級管理員:rabbitmqctl.bat set_user_tags admin administrator

C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...

C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat list_users
Listing users ...
admin   [administrator]
guest   [administrator]

用戶可以設置多個角色:rabbitmqctl.bat set_user_tags username tag1 tag2 ...

修改用戶密碼:rabbitmqctl change_password userName newPassword

C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat change_password admin 888888
Changing password for user "admin" ...

刪掉用戶:rabbitmqctl.bat delete_user username

C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat delete_user guest
Deleting user "guest" ...

C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat list_users
Listing users ...
admin   [administrator]

權限相關命令為:

(1) 設置用戶權限

rabbitmqctl  set_permissions  -p  VHostPath  User  ConfP  WriteP  ReadP

(2) 查看(指定hostpath)所有用戶的權限信息

rabbitmqctl  list_permissions  [-p  VHostPath]

(3) 查看指定用戶的權限信息

rabbitmqctl  list_user_permissions  User

(4)  清除用戶的權限信息

rabbitmqctl  clear_permissions  [-p VHostPath]  User

3、python3使用pika python客戶端

發出消息(生產者):

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/2/22 15:51
# @Author  : Py.qi
# @File    : rabbitMQ_send_1.py
# @Software: PyCharm

import pika,sys
print('send....start....')
while True:
    inputso=input('soinsideto:')
    if inputso == 'quit':
        break
#與RabbitMQ服務器建立鏈接
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
#建立隧道
    channel = connection.channel()
#創建隊列名稱zhang
    channel.queue_declare(queue='zhang')
#發送信息:exchange指定交換,routing_key指定隊列名,body指定消息內容    
  channel.basic_publish(exchange='',routing_key='zhang',body=inputso)
#關閉鏈接 connection.close()

接收者(消費者):

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/2/22 15:57
# @Author  : Py.qi
# @File    : rabbitMQ_rescv_1.py
# @Software: PyCharm

import pika
#創建鏈接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
#建立隧道
channel = connection.channel()
#創建隊列名,此處也可省略,在找不到隊列時創建
channel.queue_declare(queue='zhang')
#使用回調函數callback來接收消息並打印消息
def callback(ch,method,properties,body):
    print('recived:',body)
#指定隊列接收消息,callback接收消息,queue指定隊列,no_ack不給發送者發送確認消息
channel.basic_consume(callback,queue='zhang',no_ack=True)
print('waiting for message,to exit press ctrl+c')
#持續接收消息,阻塞
channel.start_consuming()

 默認情況下,RabbitMQ會按順序將每條消息發送到下一個使用者,每個消費者按順序獲得同樣數量的消息,這種分配方式稱為循環法。

(1)消息持久化:

當RabbitMQ退出或者崩潰時,它會忘記隊列和消息,需要做兩件事來確保消息不會丟失:我們需要將隊列和消息標記為持久化。

在創建隊列時指定durable為True來標記隊列持久化:

channel.queue_declare(queue='hehe',durable=True)

將消息標記為持久化,通過提供值為2的delivery_mode屬性

channel.basic_publish(exchange='',
                          routing_key='hehe',
                          body=inputso,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                          ))

(2)消息公平分發

如果RabbitMQ只管按順序把消息發送到每個消費者上,不考慮消費者的負載,很可能出現一個機器配置不高的消費者那里堆積很多消息處理不完,同時配置高的機器卻很輕松。為了解決這個問題,可以在每個消費者端,配置perfetch=1,意識就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再發新消息了。

為了解決這個問題,我們可以使用basic.qos方法和 prefetch_count = 1設置。這告訴RabbitMQ一次不要向工作人員發送多於一條消息。或者換句話說,不要向工作人員發送新消息,直到它處理並確認了前一個消息。相反,它會將其分派給不是仍然忙碌的下一個工作人員。

channel.basic_qos(prefetch_count = 1)

完整代碼:

生產者:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/2/22 15:51
# @Author  : Py.qi
# @File    : rabbitMQ_send_1.py
# @Software: PyCharm

import pika
print('send....start....')
while True:
    inputso=input('soinsideto:')
    if inputso == 'quit':
        break
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
    channel = connection.channel()
    channel.queue_declare(queue='hehe',durable=True)
    channel.basic_publish(exchange='',
                          routing_key='hehe',
                          body=inputso,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                          ))
    connection.close()

消費者:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/2/22 15:57
# @Author  : Py.qi
# @File    : rabbitMQ_rescv_1.py
# @Software: PyCharm

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
channel = connection.channel()
channel.queue_declare(queue='hehe',durable=True)
def callback(ch,method,properties,body):
    print('recived:',body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) #確認發送消息個數
channel.basic_consume(callback,queue='hehe',no_ack=True)
print('waiting for message,to exit press ctrl+c')
channel.start_consuming()

(3)消息發布與訂閱

發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。

發布消息類似於廣播效果,需要用到exchange,在定義exchange時指定類型來決定哪些queue符合條件,可以接收消息:

有幾種可用的交換類型:direct, topic, headers 和fanout。我們將關注最后一個fanout。我們創建該類型的交換,並將其稱為logs

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

fanout:所有bind到此exchange的queue都可以接收到消息

direct:通過routingKey和exchange決定的哪個唯一的queue可以接收消息

topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。

  表達式符號:#代表一個或多個字符,*代表任何字符

fanout交換,它只是將收到的所有消息廣播到它所知道的所有隊列中。

要列出服務器上的交換,可以使用命令rabbitmqctl:

sudo rabbitmqctl list_exchanges

廣播消息,fanout實例:

生產者:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/2/24 17:05
# @Author  : Py.qi
# @File    : rabbit_send_fanout.py
# @Software: PyCharm
import pika
import sys
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
channel=connection.channel()
#指定交換類型
channel.exchange_declare(exchange='logs',exchange_type='fanout')
#message = ' '.join(sys.argv[1:]) or 'info:hello world!'
for i in range(10):
    channel.basic_publish(exchange='logs',routing_key='',body=str(i))
    print('[x]sent %r'%i)
connection.close()

消費者:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/2/24 17:12
# @Author  : Py.qi
# @File    : rabbit_recv_fanout.py
# @Software: PyCharm

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.146.129'))
channel = connection.channel()
#指定交換類型
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')
#讓服務器隨機生成隊列名,一旦消費者鏈接關閉,隊列將被刪除
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
#將隊列綁定到交換上
channel.queue_bind(exchange='logs',
                   queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

接收消息時指定隊列, exchange type = direct

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據關鍵字判斷應該將數據發送至指定隊列。

生產者:發布消息

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/2/26 9:25
# @Author  : Py.qi
# @File    : rabbit_send_direct.py
# @Software: PyCharm

import pika
import sys

conn = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
channel = conn.channel()
#綁定exchange類型為指定接收
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
#判斷參數個數
severity=sys.argv[1] if len(sys.argv) > 1 else 'info'
#發送消息
message = ' '.join(sys.argv[2:]) or 'hello world'
#消息發送到exchange交換隊列
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
print('[x]sent %r:%r'%(severity,message))
conn.close()

消費者:接收消息

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/2/26 9:54
# @Author  : Py.qi
# @File    : rabbit_recver_direct.py
# @Software: PyCharm

import pika,sys
conn = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129'))
channel = conn.channel()
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
#隨機隊列名,結束即刪除
result = channel.queue_declare(exclusive=True)
queue_name=result.method.queue
#接收隊列消息關鍵參數
severities=sys.argv[1:]
if not severities:
    sys.stderr.write('Usage:%s[info][warning][error]\n'%sys.argv[0])
    sys.exit(1)
#循環將消息發送到有關鍵字隊列中
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)
print('[*]waiting for logs,To exit press CTRL+C')
def callback(ch,method,properites,body):
    print('[x]%r:%r'%(method.routing_key,body))
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

模糊匹配:

exchange type = topic

在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。

#:表示可以匹配0個或多個單詞

*:表示只能匹配一個單詞

生產者:發送消息

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/2/26 10:42
# @Author  : Py.qi
# @File    : rabbit_send_topic.py
# @Software: PyCharm

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.146.129'))
channel = connection.channel()
#指定exchange為模糊匹配topic
channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

消費者:接收消息

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/2/26 10:43
# @Author  : Py.qi
# @File    : rabbit_recve_topic.py
# @Software: PyCharm
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.146.129'))
channel = connection.channel()
#exchange模式topic
channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM