使用redis作為消息隊列的用法


背景

最近項目有個需求需要動態更新規則,當時腦中想到的第一個方案是利用zk的監聽機制,管理人員更新完規則將狀態寫入zk,集群中的機器監聽zk的狀態,當有狀態變更后,集群中的機器開始拉取最新的配置。但由於公司技術選型,沒有專門搭建zk集群,因此也不可能為這一個小需求去搭建zk集群。圖為使用zk監聽狀態變化的流程。

最后只好退而求其次,想到了使用redis的隊列來做規則的更新

消息隊列

首先做簡單的引入。

  1. 隊列(來自百度百科):是一種特殊的線性表,特殊之處在於它只允許在表的前端(front)進行刪除操作,而在表的后端(rear)進行插入操作,和棧一樣,隊列是一種操作受限制的線性表。進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭。
  2. 消息隊列(來自百度百科):是在消息的傳輸過程中保存消息的容器。

從隊列和消息隊列的定義看來,看不出什么相似之處。但我理解它們的作用是相似的,只是使用環境不同。隊列和消息隊列 本質上都可以用於解決“生產者”和“消費者”問題,在二者這間建立橋梁,it中專業術語是對“生產者”和“消費者”進行解耦。可以動態的通過調整“生產者”和“消費者”線程數或服務器實例數,在正常情況使消費和生產到達一個平衡;在高峰情況下(生產者大於消費者)可以保護消費者不被拖垮的同時,還可以對把積壓的數據保存下來,消費者可以延遲消費這些數據進行處理。

隊列 一般指的是單個服務實例內部使用,比如,在java中的一個jvm實例內部可以使用Queue的子類(Deque:雙端隊列,是Queue的子接口),比如:單線程情況下使用LinkedList(無界)、PriorityQueue(優先隊列);多線程情況下可以阻塞隊列ArrayBlockingQueue(有界)、LinkedBlockingQueue(無界)、DelayQueue(延遲隊列 無界)、PriorityBlockingQueue(優先 無界)、SynchronousQueue(沒有容量的隊列)。可以看到java的api已經很強大了,可以根據自己的業務需求選擇使用。使用方法:生產者從一端放入消息,消費者從另一端取出消息進行處理,消息放到隊列里(感覺是不是有點像“消息隊列”的定義)。

MQ主要是用來:

  1. 解耦應用、
  2. 異步化消息
  3. 流量削峰填谷

目前使用的較多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。

另外上面提到的“有界”和“無界”,指的是隊列的容量大小。有界 指的是創建隊列時必須指定隊列的容量;無界 創建隊列時無需指定隊列的容量,容量大小取決於jvm實例分配的內存空間大小。在海量業務場景里,我們期望隊列的容量是無限的,但單個jvm實例 即便是使用“無界”隊列 由於單個實例內存是有限的,最終無法容納下海量的消息數據。聰明的程序員就想 能不能使用一個第三方的隊列來存儲這些數據呢?當然是可以的,這就產生了“消息隊列”。

消息隊列 一般是采用一個獨立的集群專門用於消息存儲,可以存儲在內存里 也可以直接存儲在磁盤中。比如常見的:RabbitMQ、kafka、rocketMQ、ActiveMQ、zeromq等等,它們有不同的特性,以及采用了各種不同的實現,適用於各種場景的消息任務分發。但他們本質作用跟上面講的單實例環境中java“隊列”沒什么兩樣:在消息的傳輸過程中保存消息的容器。只是這里轉換到“分布式”環境中而已。

可以看到這里這里提到的“傳統”消息隊列,都是一個很重型的集群。如果這個分布式環境中的消息數量有限,我們可以不必引入這種重型的mq框架。比如:本次分享的主題 如何使用redis實現“消息隊列”。

redis 實現消息隊列

redis有一個數據類型叫list(列表),它的每個子元素都是 string 類型的雙向鏈表。我們可以通過 push,pop 操作從鏈表的頭部或者尾部添加刪除元素。這使得 list 既可以用作棧,也可以用作隊列。

假如,我們有一個隊列系統,把一個個任務放到隊列中,另一個進程就把隊列中的任務取出來執行。

放到隊列我們使用LPUSH,也就是往雙向鏈表的尾部填充一個元素,這一端也叫生產者,是產生內容的一端。

另一個進程使用RPOP往頭部取出元素來執行,這一端也叫消費者。

如果僅僅是這種方式來實現隊列,它就是需要進程不斷地循環隊列,判斷隊列是不是有新元素,有的話就取出來執行,沒有的話,就繼續循環,但是這個總有一個時間間隔,你總得規定每隔一段時間去循環,雖然這個時間很小,但總有延遲,這種方式叫作輪循。有沒有一種方式就是讓不斷執行一個redis命令,而redis中的列隊有值就會通過命令通知程序呢?有的,那就是阻塞操作的RPOP,它叫作BRPOP。

我們來演示一下它是如何實現的。

$ redis-cli

127.0.0.1:6379> BRPOP list1 0

先執行BRPOP,假如隊列list1沒有值,它會返回nil,並且阻塞在那,在等另一個程序或進程往list1中填值。

我們開啟另一個redis端終。

$ redis-cli

127.0.0.1:6379> LPUSH list1 a

(integer) 1

我們再來看之前的結果。

127.0.0.1:6379> BRPOP list1 0

1) "list1"

2) "a"

(16.99s)

這樣就能把列表的值給取到了。

優點

  1. 能夠實現持久化
  2. 采用 Master-Slave 數據復制模式。隊列操作都是寫操作,Master任務繁重,能讓Slave分擔的持久化工作,就不要Master做。RDB和AOF兩種方法都用上,多重保險。
  3. 支持集群
  4. 接口使用簡單

不足

  1. Redis上消息只會被一個消費者消費,不會有多個訂閱者消費同一個消息,簡單一對一
  2. 生產者或者消費者崩潰后的處理機制,需要自己實現
  3. 生產者寫入太快,消費者消費太慢,導致Redis的內存問題,處理機制需要自己實現

通過pub/sub來實現

實現機制

訂閱,取消訂閱和發布實現了發布/訂閱消息范式,發布者不是計划發送消息給特定的訂閱者。而是發布的消息分到不同的頻道,不需要知道什么樣的訂閱者訂閱。訂閱者對一個或多個頻道感興趣,只需接收感興趣的消息,不需要知道什么樣的發布者發布的。

這是一種基於非持久化的消息機制,消息發布者和訂閱者必須同時在線,否則一旦消息訂閱者由於各種異常情況而被迫斷開連接,在其重新連接后,其離線期間的消息是無法被重新通知的(即發即棄)。

Redis中的消息可以提供兩種不同的功能。一類是基於Channel的消息,這一類消息和Redis中存儲的Keys沒有太多關聯,也就是說即使不在Redis中存儲任何Keys信息,這類消息也可以獨立使用。另一類消息可以對(也可以不對)Redis中存儲的Keys信息的變化事件進行通知,可以用來向訂閱者通知Redis中符合訂閱條件的Keys的各種事件。

通過springboot 構建redis消息隊列

首先springboot配置文件配置如下:

spring.redis.host=localhost

spring.redis.port=6379

消息生產者,注入redisTemplate,用convertAndSend發送消息

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.StringRedisTemplate;

import org.springframework.stereotype.Service;

@Service

public class PublishService {

@Autowired

private StringRedisTemplate stringRedisTemplate;

public void sendMsg(String channel, String msg) {

stringRedisTemplate.convertAndSend(channel, msg);

}

}

消費者:創建一個接收消息的類,繼承MessageListener,也可以不繼承

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Service;

@Slf4j

@Service

public class RedisReceiver {

public void receiveMessage(String message) {

log.info("receive message is {}",message);

}

}

消息訂閱者配置類:

import com.wuzy.queue.RedisReceiver;

import org.springframework.context.annotation.Bean;

import org.springframework.data.redis.connection.RedisConnectionFactory;

import org.springframework.data.redis.core.StringRedisTemplate;

import org.springframework.data.redis.listener.PatternTopic;

import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

import org.springframework.stereotype.Component;

/**

* redis 監聽配置

*/

@Configuration

public class RedisSubListenerConfig {

/**

* 初始化監聽器

*

* @param connectionFactory

* @param listenerAdapter

* @return

*/

@Bean

RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,

MessageListenerAdapter listenerAdapter) {

RedisMessageListenerContainer container = new RedisMessageListenerContainer();

container.setConnectionFactory(connectionFactory);

container.addMessageListener(listenerAdapter, new PatternTopic("channel_1")); // new PatternTopic("這里是監聽的通道的名字") 通道要和發布者發布消息的通道一致

return container;

}

/**

* 綁定消息監聽者和接收監聽的方法

*

* @param redisReceiver

* @return

*/

@Bean

MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {

// redisReceiver 消息接收者

// receiveMessage 消息接收后的方法

MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();

messageListenerAdapter.setDefaultListenerMethod("receiveMessage");

messageListenerAdapter.setDelegate(redisReceiver);

return messageListenerAdapter;

}

@Bean

StringRedisTemplate template(RedisConnectionFactory connectionFactory) {

return new StringRedisTemplate(connectionFactory);

}

}

優點

  1. 一個生產者能夠對應多個消費者
  2. 支持集群
  3. 接口使用簡單

不足

 

  1. Redis提供的訂閱/發布功能並不完美,更不能和ActiveMQ/RabbitMQ提供的訂閱/發布功能相提並論。
  2. 首先這些消息並沒有持久化機制,屬於即發即棄模式。也就是說它們不能像ActiveMQ中的消息那樣保證持久化消息訂閱者不會錯過任何消息,無論這些消息訂閱者是否隨時在線。
  3. 由於本來就是即發即棄的消息模式,所以Redis也不需要專門制定消息的備份和恢復機制。
  4. 也是由於即發即棄的消息模式,所以Redis也沒有必要專門對使用訂閱/發布功能的客戶端連接進行識別,用來明確該客戶端連接的ID是否在之前已經連接過Redis服務了。ActiveMQ中保持持續通知的功能的前提,就是能夠識別客戶端連接ID的歷史連接情況,以便確定哪些訂閱消息這個客戶端還沒有處理。
  5. Redis也沒有為發布者和訂閱者准備保證消息性能的任何方案,例如在大量消息同時到達Redis服務是,如果消息訂閱者來不及完成消費,就可能導致消息堆積。而ActiveMQ中有專門針對這種情況的慢消息機制。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM