1、RabbitMQ介紹
RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現。AMQP 的出現其實也是應了廣大人民群眾的需求,雖然在同步消息通訊的世界里有很多公開標准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在異步消息處理中卻不是這樣,只有大企業有一些商業實現(如微軟的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等聯合制定了 AMQP 的公開標准。
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。
消息隊列技術是分布式應用間交換信息的一種技術;消息隊列可駐留在內存或磁盤上,隊列存儲消息直到它們被應用程序讀走;通過消息隊列,應用程序可獨立地執行--它們不需要知道彼此的位置、或在繼續執行前不需要等待接收程序接收此消息。
MQ主要作用是接受和轉發消息。你可以想想在生活中的一種場景:當你把信件的投進郵筒,郵遞員肯定最終會將信件送給收件人。我們可以把MQ比作 郵局和郵遞員。
MQ和郵局的主要區別是,它不處理消息,但是,它會接受數據、存儲消息數據、轉發消息。
2、安裝RabbitMQ
linux上安裝:
安裝配置epel源
$ rpm
-
ivh http:
/
/
dl.fedoraproject.org
/
pub
/
epel
/
6
/
i386
/
epel
-
release
-
6
-
8.noarch
.rpm
安裝erlang
$ yum
-
y install erlang
安裝RabbitMQ
$ yum
-
y install rabbitmq
-
server
windows上安裝:
(1)首先,您需要安裝支持的 Windows 版Erlang。運行Erlang Windows安裝程序。Erlang將出現在開始菜單中,設置erlang的環境變量(C:\erl9.2\bin;),測試erlang是否安裝正確:cmd-輸入:erl,能看到eshell版本號,說明安裝成功!
(2)下載rabbitMQ:(安裝下一步完成即可)
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.3/rabbitmq-server-windows-3.7.3.zip
(3)安裝RabbitMQ-Plugins,這個相當於是一個管理界面,方便我們在瀏覽器界面查看RabbitMQ各個消息隊列以及交換的工作情況,安裝方法是:打開命令行cd進入rabbitmq的sbin目錄(我的目錄是:C:\ software \ rabbitmq \ rabbitmq_server-3.6.5 \ sbin),輸入:rabbitmq-plugins enable rabbitmq_management命令,稍等會會發現出現plugins安裝成功的提示,默認是安裝6個插件。
插件安裝完之后,在瀏覽器輸入的http://本地主機:15672進行驗證
如果不能打開頁面解決方法:首先在命令行輸入:rabbitmq-service stop,接着輸入rabbitmq-service remove,再接着輸入rabbitmq-service install,接着輸入rabbitmq-service start,最后重新輸入rabbitmq-plugins enable rabbitmq_management試試,我是這樣解決的。
創建用戶名,密碼,綁定角色:
RabbitMQ報錯解決方法:
C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat status Status of node rabbit@DESKTOP-6JT7D2H ... Error: unable to perform an operation on node 'rabbit@DESKTOP-6JT7D2H'. Please see diagnostics information and suggestions below. Most common reasons for this are: * Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues) * CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server) * Target node is not running In addition to the diagnostics info below: * See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more * Consult server logs on node rabbit@DESKTOP-6JT7D2H DIAGNOSTICS =========== attempted to contact: ['rabbit@DESKTOP-6JT7D2H'] rabbit@DESKTOP-6JT7D2H: * connected to epmd (port 4369) on DESKTOP-6JT7D2H * epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traffic * TCP connection succeeded but Erlang distribution failed * Authentication failed (rejected by the remote node), please check the Erlang cookie Current node details: * node name: 'rabbitmqcli57@DESKTOP-6JT7D2H' * effective user's home directory: C:\Users\Administrator.DESKTOP-6JT7D2H * Erlang cookie hash: RmzKErjVZUcsMU8wSgBGbA== 解決方法: 將C:\Users\tracyclock\.erlang.cookie 文件拷貝到C:\Windows\System32\config\systemprofile替換掉.erlang.cookie文件 重啟rabbitMQ服務:net stop RabbitMQ && net start RabbitMQ
查看用戶及用戶角色:rabbitmqctl.bat list_users
C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat list_users Listing users ... guest [administrator]
新增用戶:rabbitmqctl.bat add_user username password
C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat add_user admin 123456 Adding user "admin" ... C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat list_users Listing users ... admin [] guest [administrator]
用戶角色:
rabbitmq用戶角色可分為五類:超級管理員, 監控者, 策略制定者, 普通管理者以及其他。
(1) 超級管理員(administrator)
可登陸管理控制台(啟用management plugin的情況下),可查看所有的信息,並且可以對用戶,策略(policy)進行操作。
(2) 監控者(monitoring)
可登陸管理控制台(啟用management plugin的情況下),同時可以查看rabbitmq節點的相關信息(進程數,內存使用情況,磁盤使用情況等)
(3) 策略制定者(policymaker)
可登陸管理控制台(啟用management plugin的情況下), 同時可以對policy進行管理。
(4) 普通管理者(management)
僅可登陸管理控制台(啟用management plugin的情況下),無法看到節點信息,也無法對策略進行管理。
(5) 其他的
無法登陸管理控制台,通常就是普通的生產者和消費者。
授權用戶為超級管理員:rabbitmqctl.bat set_user_tags admin administrator
C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat set_user_tags admin administrator Setting tags for user "admin" to [administrator] ... C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat list_users Listing users ... admin [administrator] guest [administrator]
用戶可以設置多個角色:rabbitmqctl.bat set_user_tags username tag1 tag2 ...
修改用戶密碼:rabbitmqctl change_password userName newPassword
C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat change_password admin 888888 Changing password for user "admin" ...
刪掉用戶:rabbitmqctl.bat delete_user username
C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat delete_user guest Deleting user "guest" ... C:\RabbitMQ Server\rabbitmq_server-3.7.3\sbin>rabbitmqctl.bat list_users Listing users ... admin [administrator]
權限相關命令為:
(1) 設置用戶權限
rabbitmqctl set_permissions -p VHostPath User ConfP WriteP ReadP
(2) 查看(指定hostpath)所有用戶的權限信息
rabbitmqctl list_permissions [-p VHostPath]
(3) 查看指定用戶的權限信息
rabbitmqctl list_user_permissions User
(4) 清除用戶的權限信息
rabbitmqctl clear_permissions [-p VHostPath] User
3、python3使用pika python客戶端
發出消息(生產者):
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/22 15:51 # @Author : Py.qi # @File : rabbitMQ_send_1.py # @Software: PyCharm import pika,sys print('send....start....') while True: inputso=input('soinsideto:') if inputso == 'quit': break #與RabbitMQ服務器建立鏈接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) #建立隧道 channel = connection.channel() #創建隊列名稱zhang channel.queue_declare(queue='zhang') #發送信息:exchange指定交換,routing_key指定隊列名,body指定消息內容
channel.basic_publish(exchange='',routing_key='zhang',body=inputso) #關閉鏈接 connection.close()
接收者(消費者):
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/22 15:57 # @Author : Py.qi # @File : rabbitMQ_rescv_1.py # @Software: PyCharm import pika #創建鏈接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) #建立隧道 channel = connection.channel() #創建隊列名,此處也可省略,在找不到隊列時創建 channel.queue_declare(queue='zhang') #使用回調函數callback來接收消息並打印消息 def callback(ch,method,properties,body): print('recived:',body) #指定隊列接收消息,callback接收消息,queue指定隊列,no_ack不給發送者發送確認消息 channel.basic_consume(callback,queue='zhang',no_ack=True) print('waiting for message,to exit press ctrl+c') #持續接收消息,阻塞 channel.start_consuming()
默認情況下,RabbitMQ會按順序將每條消息發送到下一個使用者,每個消費者按順序獲得同樣數量的消息,這種分配方式稱為循環法。
(1)消息持久化:
當RabbitMQ退出或者崩潰時,它會忘記隊列和消息,需要做兩件事來確保消息不會丟失:我們需要將隊列和消息標記為持久化。
在創建隊列時指定durable為True來標記隊列持久化:
channel.queue_declare(queue='hehe',durable=True)
將消息標記為持久化,通過提供值為2的delivery_mode屬性
channel.basic_publish(exchange='', routing_key='hehe', body=inputso, properties=pika.BasicProperties( delivery_mode=2, # make message persistent ))
(2)消息公平分發
如果RabbitMQ只管按順序把消息發送到每個消費者上,不考慮消費者的負載,很可能出現一個機器配置不高的消費者那里堆積很多消息處理不完,同時配置高的機器卻很輕松。為了解決這個問題,可以在每個消費者端,配置perfetch=1,意識就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再發新消息了。
為了解決這個問題,我們可以使用basic.qos方法和 prefetch_count = 1設置。這告訴RabbitMQ一次不要向工作人員發送多於一條消息。或者換句話說,不要向工作人員發送新消息,直到它處理並確認了前一個消息。相反,它會將其分派給不是仍然忙碌的下一個工作人員。
channel.basic_qos(prefetch_count = 1)
完整代碼:
生產者:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/22 15:51 # @Author : Py.qi # @File : rabbitMQ_send_1.py # @Software: PyCharm import pika print('send....start....') while True: inputso=input('soinsideto:') if inputso == 'quit': break connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129')) channel = connection.channel() channel.queue_declare(queue='hehe',durable=True) channel.basic_publish(exchange='', routing_key='hehe', body=inputso, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) connection.close()
消費者:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/22 15:57 # @Author : Py.qi # @File : rabbitMQ_rescv_1.py # @Software: PyCharm import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129')) channel = connection.channel() channel.queue_declare(queue='hehe',durable=True) def callback(ch,method,properties,body): print('recived:',body) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) #確認發送消息個數 channel.basic_consume(callback,queue='hehe',no_ack=True) print('waiting for message,to exit press ctrl+c') channel.start_consuming()
(3)消息發布與訂閱
發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。
發布消息類似於廣播效果,需要用到exchange,在定義exchange時指定類型來決定哪些queue符合條件,可以接收消息:
有幾種可用的交換類型:direct, topic, headers 和fanout。我們將關注最后一個fanout。我們創建該類型的交換,並將其稱為logs:
channel.exchange_declare(exchange='logs', exchange_type='fanout')
fanout:所有bind到此exchange的queue都可以接收到消息
direct:通過routingKey和exchange決定的哪個唯一的queue可以接收消息
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息。
表達式符號:#代表一個或多個字符,*代表任何字符
fanout交換,它只是將收到的所有消息廣播到它所知道的所有隊列中。
要列出服務器上的交換,可以使用命令rabbitmqctl:
sudo rabbitmqctl list_exchanges
廣播消息,fanout實例:
生產者:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/24 17:05 # @Author : Py.qi # @File : rabbit_send_fanout.py # @Software: PyCharm import pika import sys connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129')) channel=connection.channel() #指定交換類型 channel.exchange_declare(exchange='logs',exchange_type='fanout') #message = ' '.join(sys.argv[1:]) or 'info:hello world!' for i in range(10): channel.basic_publish(exchange='logs',routing_key='',body=str(i)) print('[x]sent %r'%i) connection.close()
消費者:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/24 17:12 # @Author : Py.qi # @File : rabbit_recv_fanout.py # @Software: PyCharm import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.146.129')) channel = connection.channel() #指定交換類型 channel.exchange_declare(exchange='logs', exchange_type='fanout') #讓服務器隨機生成隊列名,一旦消費者鏈接關閉,隊列將被刪除 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue #將隊列綁定到交換上 channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
接收消息時指定隊列, exchange type = direct
RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據關鍵字判斷應該將數據發送至指定隊列。
生產者:發布消息
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/26 9:25 # @Author : Py.qi # @File : rabbit_send_direct.py # @Software: PyCharm import pika import sys conn = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129')) channel = conn.channel() #綁定exchange類型為指定接收 channel.exchange_declare(exchange='direct_logs',exchange_type='direct') #判斷參數個數 severity=sys.argv[1] if len(sys.argv) > 1 else 'info' #發送消息 message = ' '.join(sys.argv[2:]) or 'hello world' #消息發送到exchange交換隊列 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message) print('[x]sent %r:%r'%(severity,message)) conn.close()
消費者:接收消息
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/26 9:54 # @Author : Py.qi # @File : rabbit_recver_direct.py # @Software: PyCharm import pika,sys conn = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.146.129')) channel = conn.channel() channel.exchange_declare(exchange='direct_logs',exchange_type='direct') #隨機隊列名,結束即刪除 result = channel.queue_declare(exclusive=True) queue_name=result.method.queue #接收隊列消息關鍵參數 severities=sys.argv[1:] if not severities: sys.stderr.write('Usage:%s[info][warning][error]\n'%sys.argv[0]) sys.exit(1) #循環將消息發送到有關鍵字隊列中 for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print('[*]waiting for logs,To exit press CTRL+C') def callback(ch,method,properites,body): print('[x]%r:%r'%(method.routing_key,body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
模糊匹配:
exchange type = topic
在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。
#:表示可以匹配0個或多個單詞
*:表示只能匹配一個單詞
生產者:發送消息
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/26 10:42 # @Author : Py.qi # @File : rabbit_send_topic.py # @Software: PyCharm import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.146.129')) channel = connection.channel() #指定exchange為模糊匹配topic channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
消費者:接收消息
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/2/26 10:43 # @Author : Py.qi # @File : rabbit_recve_topic.py # @Software: PyCharm import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.146.129')) channel = connection.channel() #exchange模式topic channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()