消息存储:
message主要存储在RAM和disk里面。
所有队列中的消息都以append的方式写到一个文件中,当这个文件的大小超过指定的限制大小后,关闭这个文件再创建一个新的文件供消息的写入。文件名(*.rdq)从0开始然后依次累加。
在进行消息的存储时,rabbitmq会在ets表中记录消息在文件中的映射,以及文件的相关信息。消息读取时,根据消息ID找到该消息所存储的文件,在文件中的偏移量,然后打开文件进行读取。
rabbitmq在启动时会创建msg_store_persistent,msg_store_transient两个进程,一个用于持久消息的存储,一个用于内存不够时,将存储在内存中的非持久化数据转存到磁盘中。所有队列的消息的写入和删除最终都由这两个进程负责处理,而消息的读取则可能是队列本身直接打开文件进行读取,也可能是发送请求由msg_store_persisteng/msg_store_transient进程进行处理。
Ets数据结构:
-record(msg_location,{ msg_id, //消息ID ref_count, //引用计数 file, //消息存储的文件名 offset, //消息在文件中的偏移量 total_size //消息的大小 }).
GC过程
消息的删除只是从flying_ets表删除指定消息的相关信息,同时更新消息对应存储的文件的相关信息、更新文件有效数据大小。当垃圾数据超过一定比例后(默认比例为40%),rabbitmq触发垃圾回收。垃圾回收会先找到符合要求的两个文件(根据#file_summary{}中left,right找逻辑上相邻的两个文件,并且两个文件的有效数据可在一个文件中存储),然后锁定这两个文件,并先对左边文件的有效数据进行整理,再将右边文件的有效数据写入到左边文件,同时更新消息的相关信息(存储的文件,文件中的偏移量)、文件的相关信息(文件的有效数据,左边文件,右边文件),最后将右边的文件删除。
消息队列结构:
通常队列由两部分组成:一部分是AMQQueue,负责AMQP协议相关的消息处理,即接收生产者发布的消息、向消费者投递消息、处理消息confirm、acknowledge等等;另一部分是BackingQueue,它提供了相关的接口供AMQQueue调用,完成消息的存储以及可能的持久化工作等。
基本经历RAM->DISK->RAM这样的过程。这样设计的好处是:当队列负载很高的情况下,能够通过将一部分消息由磁盘保存来节省内存空间,当负载降低的时候,这部分消息又渐渐回到内存,被消费者获取,使得整个队列具有很好的弹性。
RabbitMQ高可用时采用镜像队列:
集群原理
1. 基本概念
1) 镜像队列(Mirrored Queue)
RabbitMQ集群的队列(Queue)在默认的情况下只存在单一节点(node)上。我们也可以把队列配置成同时存在在多个节点上,也就是说队列可以被镜像到多个节点上。发布(publish)到镜像队列上的消息(message)会被复制(replicated)到所有的节点上。一个镜像队列包含一个主(master)和多个从(slave)。
2) 非同步的Slave(unsynchronised slave)
在rabbitmq中同步(synchronised)是用来描述master和slave之间的数据状态是否一致的。如果slave包含master中的所有message,则这个slave是synchronised,如果这个slave并没有包含master中所有的message,则这个slave是unsynchronised。
3) 在什么情况下会出现unsynchronisedslave?
当一个新slave加入到一个镜像队列时,这时这个新slave是空的,而master中这时可能包含之前接收到的消息。假设这时master包含了N条消息,这是第N+1条消息被添加到这个镜像队列中,这个新slave会从这个第N+1条消息开始接收。此时这个slave就是unsynchronised slave。随着前5条消息从镜像队列中被消费掉(consumed), 这个slave变成了synchronised。
slave 重新加入(rejoin)到镜像队列时,也会出现非同步的情况。一个slave要重新加入镜像队列之前,slave可能已经接收了一些消息,要重新加入镜像队列,就要清空自己之前已经接收的所有消息,好像自己是第一次加入队列一样。(slave在很多情况下会需要重新加入镜像队列,例如:网络分区(networkpartition))