消息隊列,RabbitMQ、Kafka、RocketMQ


1、消息列隊概述

1.1消息隊列MQ

MQ全稱為Messages Queue ,消息隊列是應用程序和應用程序之間的通信方法
為什么使用MQ?
在項目中,可將一些無需耗時的操作提取出來,進行異步操作,而這種異步處理方式大大的姐生了服務器的請求響應時間,從而提高了系統的吞吐量。

開發中消息列隊通常有以下應用場景

1.任務異步處理
	將不需要同步處理的斌且好使長的操作由消息隊列通知消息接收方進行異步處理。提高了應用的響應時間
2.應用程序解耦合
	MQ相當於一個中介,生產放通過MQ與消費方交互,它將應用程序進行解耦合

1.2AMQP和JMS

MQ是消息通信模型;實現MQ的大致有兩種主流方式:AMQP、JMS

1.2.1AMQP

六種消息模式

AMQP是高級消息隊列協議,是一個進程間傳遞異步消息的網絡協議,更准確的說是一種binary wire-level protocol(鏈接協議)。這是和JMS的本質區別,AMQP不從API層進行限定,而是直接定義網絡交換的數據格式

1.2.2JMS

兩種消息模式

JMS即JAVA消息服務應用程序接口,是一個java平台中關於面向消息中間件的API,用於在兩個應用程序之間,或分布系統中發送消息,進行異步通信

1.2.3AMOP 與 JMS 區別

· JMS 是定義了統的接口,來對消息攝作進行統一,AMQP是通過規定協議來統一統一數據交互的格式 
· JMS 限定了必須使用 Java 語宮: AMQP只是協議,不規定實現方式.因此是跨語宮的. 
· JMS 規定了兩種消息模式(點對點模式,訂閱模式):而 AMQP的消息模式更加豐富

1.3消息隊列產品

市場上常見的息隊列有如下:
目前市面上成熟主流的MQ有Kafka、RocketMQ、RabbitMQ,我們這里對每款款MQ做一個簡介紹。

1.3.1 Kafka

所有開源的MQ,吞吐量最強的,天然支持集群,消息堆積能力非常強悍
Apache下的一個子項目,使用scala語言實現的一個高性能分布式Publish/Subscribe消息隊列系統
1.快速持久化:通過磁盤順序讀寫與零拷貝機制,可以在0(1)的系統開銷下進行消息持久化
2.高吞吐:在一台普通的服務器上即可以達到10W/s的吞吐速率。
3.高堆積:支持topoc下消費者較長時間離線,消息堆積量大
4.完全的分布式系統:Brocker、Producer、Consumer都原生自動支持分布式,依賴zookeeper自動實現復雜均衡
5.支持Hadoop數據並行加載:對於像Hadoop的一樣的日志數據和離線分系統,但又要求實時處理的限制,這是一個可行的解決方案

1.3.2 RocketMQ

RocketMQ國產阿里的,經過雙十一的檢驗。也非常強悍,基於java語言寫的
RocketMQ的前身是Metaq,當Metaq3.0發布時,產品名稱改為RocketMQ.RocketMQ是一款分布式、隊列模型的消息中間件
    1.能夠保證嚴格的消息順序
    2.提供豐富的消息拉取模式
    3.高效的訂閱水平擴展能力
    4.實時的消息訂閱機制
    5.支持事務消息
    6.億級消息堆積能力

1.3.3RabbitMQ

使用Erlang編寫的一個開源的消息隊列,本身支持很多的協議:AMQP,XMPP,SMTP,STOMP,正是如此使它變得非常重量級,更適合於企業級的開發。同時實現了Broker架構,核心思想是生產者不會將消息直接發送給隊列,消息在發送給客戶端時先在中心隊列排隊。對路由,負載均衡、數據持久化都有很好的支持。多用於進行企業級的ESB整合。

1.4RabbitMQ

RabbitMQ是erlang語言開發,基於AMQP(Advanced Message Queue高級消息隊列協議)協議實現的消息隊列,它是一種應用程序之間的涌信方法,消息隊列在分布式系統開發中應用非常廣乏
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6種模式:
	1.簡單模式
	2.work工作模式
	3.publish/Subscribe發布與訂閱模式
	4.Routing路由模式
	5.Topics主題模式
	6.RPC遠調用模式(遠程調用,不太算MQ;不作介紹)

簡單模式

一個生產者,一個消費者

工作模式

一個生產者、2個消費者。
一個消息只能被一個消費者獲取。

訂閱模式

解讀:
1、1個生產者,多個消費者
2、每一個消費者都有自己的一個隊列
3、生產者沒有將消息直接發送到隊列,而是發送到了交換機。交換機分別都發送給隊列
4、每個隊列都要綁定到交換機
5、生產者發送的消息,經過交換機,到達隊列,實現,一個消息被多個消費者獲取的目的
注意:一個消費者隊列可以有多個消費者實例,只有其中一個消費者實例會消費

交換機(圖中X):只負責轉發消息,不具備儲備消息的能力。一方面接收生產者P的消息,另一方面處理消息(發送給隊列,或者特定隊列,或者丟棄),取決於交換機的類別
交換機的類別:
	Fanout:廣播,將消息交給所有綁定到交換機的隊列
	Direct:定向,把消息交給符合指定的隊列中
	Topic:通配符,把消息交給符合routing pattern(路由模式)的隊列

路由模式

1.隊列與交換機的綁定,不能是任意綁定了,而是要指定一個routingkey(路由key)
2.消息的發送方在 向 Exchange(交換機圖中X)發送消息時,也必須指定消息的RoutingKey
3.交換機不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routing與消息的Routing Key 完全一致,才會接收到消息

圖解:
P:生產者
X:交換機
C1:消費者,指定所在隊列需要routing key為error的消息
C2:消費者,指定所在隊列需要routing key為info,erroe,warning的消息

通配符模式(主題模式)

可以根據Routingkey把消息路由到不同的隊列。只不過通配符類型交換機可以讓隊列在綁定routing key的時候使用通配符
routingkey一般都是由一個或多個單詞組成,多個單詞之間以‘.’點分割,例如:item.add.hello

通配符規則:
	#:匹配一個或多個詞
	*:匹配一個詞
	
舉例:
	item.#:能夠匹配item.index.add或者item.index
	itrm.*:只能匹配item.index或者item.xxx

2.安裝及配置RabbitMQ

用戶角色

1、超級管理員(administrator)
可登陸管理控制台,可查看所有的信息,並且可以對用戶,策略(policy)進行操作。
2、監控者(monitoring)
可登陸管理控制台,同時可以查看rabbitmq節點的相關信息(進程數,內存使用情況,磁盤使用情況等)
3、策略制定者(policymaker)
可登陸管理控制台, 同時可以對policy進行管理。但無法查看節點的相關信息(上圖紅框標識的部分)。
4、普通管理者(management)
僅可登陸管理控制台,無法看到節點信息,也無法對策略進行管理。
5、其他
無法登陸管理控制台,通常就是普通的生產者和消費者。

3.python使用RabbitMQ

輪詢消費模式

此模式下,發送隊列的一方把消息存入mq的指定隊列后,若有消費者端聯入相應隊列,即會獲取到消息,並且隊列中的消息會被消費掉。

若有多個消費端同時連接着隊列,則會已輪詢的方式將隊列中的消息消費掉。

接下來是代碼實例:

producer生產者

# !/usr/bin/env python
import pika
credentials = pika.PlainCredentials('admin','123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.56.19',5672,'/',credentials))
channel = connection.channel()

# 聲明queue
channel.queue_declare(queue='balance')

# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
                      routing_key='balance',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

發送過隊列后,可在MQ服務器中查看隊列狀態

[root@localhost ~]# rabbitmqctl list_queues
Listing queues ...
hello    1

consumer消費者

# _*_coding:utf-8_*_
__author__ = 'Alex Li'
import pika

credentials = pika.PlainCredentials('admin','123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.56.19',5672,'/',credentials))
channel = connection.channel()

# You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
# was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='balance')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


channel.basic_consume(callback,
                      queue='balance',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

接收隊列后,查看一下隊列狀態

[root@localhost ~]#  rabbitmqctl list_queues
Listing queues ...
hello    0

隊列持久化

當rabbitMQ意外宕機時,可能會有持久化保存隊列的需求(隊列中的消息不消失)。

producer

# Cheng
# !/usr/bin/env python
import pika

credentials = pika.PlainCredentials('admin','123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.56.19',5672,'/',credentials))
channel = connection.channel()

# 聲明queue
channel.queue_declare(queue='durable',durable=True)

# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
                      routing_key='durable',
                      body='Hello cheng!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      )
                      )
print(" [x] Sent 'Hello cheng!'")
connection.close()

執行后查看隊列,記下隊列名字與隊列中所含消息的數量

[root@localhost ~]# rabbitmqctl list_queues
Listing queues ...
durable    1
#重啟rabbitmq
[root@localhost ~]# systemctl restart rabbitmq-server
#重啟完畢后再次查看
[root@localhost ~]# rabbitmqctl list_queues
Listing queues ...
durable   #隊列以及消息並未消失

執行消費者代碼

cunsumer

# Cheng
# _*_coding:utf-8_*_
__author__ = 'Alex Li'
import pika

credentials = pika.PlainCredentials('admin','123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.56.19',5672,'/',credentials))
channel = connection.channel()

# You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
# was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='durable',durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,
                      queue='durable',
                      #no_ack=True
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

可正確接收到信息。

再次查看隊列的情況。

[root@localhost ~]# rabbitmqctl list_queues
Listing queues ...
durable    0

廣播模式

當producer發送消息到隊列后,所有的consumer都會收到消息,需要注意的是,此模式下producer與concerned之間的關系類似與廣播電台與收音機,如果廣播后收音機沒有接受到,那么消息就會丟失。

建議先執行concerned

concerned

# _*_coding:utf-8_*_
__author__ = 'Alex Li'
import pika

credentials = pika.PlainCredentials('admin','123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.56.19',5672,'/',credentials))
channel = connection.channel()

channel.exchange_declare(exchange='Clogs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
queue_name = result.method.queue

channel.queue_bind(exchange='Clogs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

producer

import pika
import sys

credentials = pika.PlainCredentials('admin','123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.56.19',5672,'/',credentials))
channel = connection.channel()

channel.exchange_declare(exchange='Clogs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='Clogs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM