Dyno-queues 分布式延遲隊列 之 基本功能
0x00 摘要
本系列我們會以設計分布式延遲隊列時重點考慮的模塊為主線,穿插灌輸一些消息隊列的特性實現方法,通過分析Dyno-queues 分布式延遲隊列的源碼來具體看看設計實現一個分布式延遲隊列的方方面面。
0x01 Dyno-queues分布式延遲隊列
Dyno-queues 是 Netflix 實現的基於 Dynomite 和 Redis 構建的隊列。
Dynomite是一種通用的實現,可以與許多不同的 key-value 存儲引擎一起使用。目前它提供了對Redis序列化協議(RESP)和Memcached寫協議的支持。
1.1 設計目標
具體設計目標依據業務系統不同而不同。
Dyno-queues 的業務背景是:在 Netflix 的平台上運行着許多的業務流程,這些流程的任務是通過異步編排進行驅動,現在要實現一個分布式延遲隊列,這個延遲隊列具有如下特點:
- 分布式;
- 不用外部的鎖機制;
- 高並發;
- 至少一次語義交付;
- 不遵循嚴格的FIFO;
- 延遲隊列(消息在將來某個時間之前不會從隊列中取出);
- 優先級;
1.2 選型思路
Netflix 選擇 Dynomite,是因為:
- 其具有性能,多數據中心復制和高可用性的特點;
- Dynomite提供分片和可插拔的數據存儲引擎,允許在數據需求增加垂直和水平擴展;
Netflix選擇Redis作為構建隊列的存儲引擎是因為:
- Redis架構通過提供構建隊列所需的數據結構很好地支持了隊列設計,同時Redis的性能也非常優秀,具備低延遲的特性;
- Dynomite在Redis之上提供了高可用性、對等復制以及一致性等特性,用於構建分布式集群隊列;
0x02 總體設計
2.1 系統假設
查詢模型:基於Key-Value模型,而不是SQL,即關系模型。存儲對象比較小。
ACID屬性:傳統的關系數據庫中,用ACID(A原子性、C一致性、I 隔離性、D持久性)來保證事務,在保證ACID的前提下往往有很差的可用性。Dynamo用弱一致性C來達到高可用,不提供數據隔離 I,只允許單Key更新。
2.2 高可用
其實所有的高可用,是可以依賴於RPC和存儲的高可用來實現的。
- 先來看RPC的高可用,比如美團的基於MTThrift的RPC框架,阿里的Dubbo等,其本身就具有服務自動發現,負載均衡等功能。
- 而消息隊列的高可用,只要保證broker接受消息和確認消息的接口是冪等的,並且consumer的幾台機器處理消息是冪等的,這樣就把消息隊列的可用性,轉交給RPC框架來處理了。
Netflix 選擇 Dynomite,是因為:
- 其具有高性能,多數據中心復制和高可用性的特點;
- Dynomite 提供分片和可插拔的數據存儲引擎,允許在數據需求增加垂直和水平擴展;
所以 Dyno-queues 的高可用就自動解決了。
2.3 冪等
怎么保證冪等呢?最簡單的方式莫過於共享存儲。broker多機器共享一個DB或者一個分布式文件/kv系統,則處理消息自然是冪等的。就算有單點故障,其他節點可以立刻頂上。
對於不共享存儲的隊列,如Kafka使用分區加主備模式,就略微麻煩一些。需要保證每一個分區內的高可用性,也就是每一個分區至少要有一個主備且需要做數據的同步。
Dynomite 使用 redis 集群這個共享存儲 做了冪等保證。
2.4 承載消息堆積
消息到達服務端后,如果不經過任何處理就到接收者,broker就失去了它的意義。為了滿足我們錯峰/流控/最終可達等一系列需求,把消息存儲下來,然后選擇時機投遞就顯得是順理成章的了。
這個存儲可以做成很多方式。比如存儲在內存里,存儲在分布式KV里,存儲在磁盤里,存儲在數據庫里等等。但歸結起來,主要有持久化和非持久化兩種。
持久化的形式能更大程度地保證消息的可靠性(如斷電等不可抗外力),並且理論上能承載更大限度的消息堆積(外存的空間遠大於內存)。
但並不是每種消息都需要持久化存儲。很多消息對於投遞性能的要求大於可靠性的要求,且數量極大(如日志)。這時候,消息不落地直接暫存內存,嘗試幾次failover,最終投遞出去也未嘗不可。
Dynomite 使用 redis 集群這個共享存儲 在一定程度上緩解了消息堆積問題。
2.5 存儲子系統
我們來看看如果需要數據落地的情況下各種存儲子系統的選擇。理論上,從速度來看,文件系統 > 分布式KV(持久化)> 分布式文件系統 > 數據庫,而可靠性卻截然相反。還是要從支持的業務場景出發作出最合理的選擇。
如果你們的消息隊列是用來支持支付/交易等對可靠性要求非常高,但對性能和量的要求沒有這么高,而且沒有時間精力專門做文件存儲系統的研究,DB是最好的選擇。
但是DB受制於IOPS,如果要求單broker 5位數以上的QPS性能,基於文件的存儲是比較好的解決方案。整體上可以采用數據文件 + 索引文件的方式處理。
分布式KV(如MongoDB,HBase)等,或者持久化的Redis,由於其編程接口較友好,性能也比較可觀,如果在可靠性要求不是那么高的場景,也不失為一個不錯的選擇。
因為 場景是 可靠性要求不那么高,所以 Dynomite 使用 redis 集群這個存儲子系統 也是可以的。
2.6 消費關系解析
下一個重要的事情就是解析發送接收關系,進行正確的消息投遞了。拋開現象看本質,發送接收關系無外乎是單播與廣播的區別。所謂單播,就是點到點;而廣播,是一點對多點。
一般比較通用的設計是支持組間廣播,不同的組注冊不同的訂閱。組內的不同機器,如果注冊一個相同的ID,則單播;如果注冊不同的ID(如IP地址+端口),則廣播。
至於廣播關系的維護,一般由於消息隊列本身都是集群,所以都維護在公共存儲上,如 config server、zookeeper等。維護廣播關系所要做的事情基本是一致的:
- 發送關系的維護。
- 發送關系變更時的通知。
本文后續會介紹如何維護發送關系。
2.7 數據分片
數據分片的邏輯既可以實現在客戶端,也可以實現在 Proxy 層,取決於你的架構如何設計。
傳統的數據庫中間件大多將分片邏輯實現在客戶端,通過改寫物理 SQL 訪問不同的 MySQL 庫;而在 NewSQL 數據庫倡導的計算存儲分離架構中,通常將分片邏輯實現在計算層,即 Proxy 層,通過無狀態的計算節點轉發用戶請求到正確的存儲節點。
在 Dynomite 之中,隊列根據可用區域進行分片,將數據推送到隊列時,通過輪訓機制確定分片,這種機制可以確保所有分片的數據是平衡的,每個分片都代表Redis中的有序集合,有序集中的 key 是 queueName 和 AVAILABILITY _ZONE 的組合。
public class RoundRobinStrategy implements ShardingStrategy {
private final AtomicInteger nextShardIndex = new AtomicInteger(0);
/**
* Get shard based on round robin strategy.
* @param allShards
*/
@Override
public String getNextShard(List<String> allShards, Message message) {
int index = nextShardIndex.incrementAndGet();
if (index >= allShards.size()) {
nextShardIndex.set(0);
index = 0;
}
return allShards.get(index);
}
}
0x03 Dynomite 特性
3.1 可用分區和機架
Dyno-queues 隊列是在 Dynomite 的JAVA客戶端 Dyno 之上建立的,Dyno 為持久連接提供連接池,並且可以配置為拓撲感知。關於 Dyno 具體可以參見前文:
源碼分析] Dynomite 分布式存儲引擎 之 DynoJedisClient(1)
源碼分析] Dynomite 分布式存儲引擎 之 DynoJedisClient(2)
3.1.1 機架
Dyno為應用程序提供特定的本地機架(在AWS中,機架是一個區域,例如 us-east-1a、us-east-1b等),us-east-1a的客戶端將連接到相同區域的Dynomite/Redis節點,除非該節點不可用,在這種情況下該客戶端將進行故障轉移。這個屬性被用於通過區域划分隊列。
3.1.2 分片
隊列根據可用區域進行分片,將數據推送到隊列時,通過輪訓機制確定分片,這種機制可以確保所有分片的數據是平衡的,每個分片都代表Redis中的有序集合,有序集中的key是queueName和AVAILABILITY _ZONE的組合。
具體機制舉例如下:
public class RoundRobinStrategy implements ShardingStrategy {
private final AtomicInteger nextShardIndex = new AtomicInteger(0);
/**
* Get shard based on round robin strategy.
* @param allShards
*/
@Override
public String getNextShard(List<String> allShards, Message message) {
int index = nextShardIndex.incrementAndGet();
if (index >= allShards.size()) {
nextShardIndex.set(0);
index = 0;
}
return allShards.get(index);
}
}
3.2 Quorum
在分布式系統中有個CAP理論,對於P(分區容忍性)而言,是實際存在 從而無法避免的。因為分布系統中的處理不是在本機,而是網絡中的許多機器相互通信,故網絡分區、網絡通信故障問題無法避免。因此,只能盡量地在C 和 A 之間尋求平衡。
對於數據存儲而言,為了提高可用性(Availability),采用了副本備份,比如對於HDFS,默認每塊數據存三份。某數據塊所在的機器宕機了,就去該數據塊副本所在的機器上讀取(從這可以看出,數據分布方式是按“數據塊”為單位分布的)
但是問題來了,當需要修改數據時,就需要更新所有的副本數據,這樣才能保證數據的一致性(Consistency)。因此,就需要在 C(Consistency) 和 A(Availability) 之間權衡。
而Quorum機制,就是這樣的一種權衡機制,一種將“讀寫轉化”的模型。
3.2.1 數據一致性
- 強一致性:在任意時刻,從任意不同副本取出的值都是一樣的。
- 弱一致性:有時泛指最終一致性,是指在任意時刻,可能由於網絡延遲或者設備異常等原因,不同副本中的值可能會不一樣,但經過一段時間后,最終會變成一樣。
顯然,我們更想要做到強一致性的這種效果,那么有哪些方式可以實現呢,其中最為簡單直接的就是 WARO,也就是Write All Read one。
3.2.1.1 WARO 協議
WARO 是一種簡單的副本控制協議,當 Client 請求向某副本寫數據時(更新數據),只有當所有的副本都更新成功之后,這次寫操作才算成功,否則視為失敗。這樣的話,只需要讀任何一個副本上的數據即可。但是WARO帶來的影響是寫服務的可用性較低,因為只要有一個副本更新失敗,此次寫操作就視為失敗了。
3.2.1.2 Quorum機制
Quorum 的定義如下:假設有 N 個副本,更新操作 wi 在 W 個副本中更新成功之后,則認為此次更新操作 wi 成功,把這次成功提交的更新操作對應的數據叫做:“成功提交的數據”。對於讀操作而言,至少需要讀 R 個副本,其中,W+R>N ,即 W 和 R 有重疊,一般,W+R=N+1。
- N = 存儲數據副本的數量;
- W = 更新成功所需的副本;
- R = 一次數據對象讀取要訪問的副本的數量;
Quorum機制認為每次寫入的機器數目達到大多數(W)時,就認為本次寫操作成功了。即Quorum機制能夠不需要更新完全部的數據,但又保證返回給用戶的是有效數據的解決方案。
3.2.2 ES 的quorum
我們以 ES 為例。
3.2.2.1 寫一致性
我們在發送任何一個增刪改操作的時候,都可以帶上一個consistency參數,指明我們想要的寫一致性是什么。
- one:要求寫操作只要primay shard是active可用的,就可以執行;
- all:要求寫操作必須所有的shard和replica都是active,才可以執行;
- quorum(默認):所有shard中必須是大部分是可用的(一半及以上),才可以執行;
3.2.2.2 quorum機制
quorum = int((primary shard+number_of_replicas)/2)+1
如果節點數少於quorum,可能導致querum不齊全,進而導致無法執行任何寫操作。quorum不齊全時,會進行等待。默認等待時間為1分鍾,期待活躍的shard數量可以增加,最后實在不行,就會timeout。
3.3 DC_QUORUM
3.3.1 配置
Dynomite 能夠將最終一致性(eventual consistency)擴展為協調一致性(tunable consistency)。
關於QUORUM,Dynomite有如下配置:
- DC_ONE 本節點讀寫入完成及請求完成,其他的rack異步寫入。使用DC_ONE模式,讀寫行為在local Availability Zone(AZ)下是同步的;
- DC_QUORUM 同步寫入到指定個數的rack中,其他的節點異步寫入。使用DC_QUORUM模式,本地區域特定數量結點下的操作是同步的。
- DC_SAFE_QUORUM 和DC_QUORUM類似,不過這個參數讀寫都要在指定個數的rack中成功並且數據校驗同步,才算請求成功,不然會報錯。
由測試得到的結果,Dynomite能從3,6,12,24一路擴展到48個節點,在DC_ONE和DC_QUORUM模式下,吞吐率都能線性地增長。與此同時,Dynomite在延遲方面只增加了很少的開支,即便在DC_QUORUM模式下,(延遲)也只有幾毫秒。DC_QUORUM模式在延遲和吞吐量方面處於劣勢,但是能為客戶提供更好的讀寫保證。
3.3.2 實現
對於Dyno-queues來說,則是在實現中有所體現。比如在 RedisQueues 中,有如下成員變量:
private final JedisCommands quorumConn;
private final JedisCommands nonQuorumConn;
在構建 RedisQueues 時,就需要注明使用哪一種。
而從注釋中我們可知,
@param quorumConnDyno connection with dc_quorum enabled,就是 采用了Quorum的Redis;@param nonQuorumConnDyno connection to local Redis,就是本地Redis;
生成 RedisQueues 的代碼如下(注意其中注釋):
/**
* @param quorumConn Dyno connection with dc_quorum enabled
* @param nonQuorumConn Dyno connection to local Redis
*/
public RedisQueues(JedisCommands quorumConn, JedisCommands nonQuorumConn, String redisKeyPrefix, ShardSupplier shardSupplier, int unackTime, int unackHandlerIntervalInMS, ShardingStrategy shardingStrategy) {
this(Clock.systemDefaultZone(), quorumConn, nonQuorumConn, redisKeyPrefix, shardSupplier, unackTime, unackHandlerIntervalInMS, shardingStrategy);
}
3.3.3 使用
在有分片時,就從nonQuorumConn(就是本地Redis)提取。
使用nonQuorumConn來預取的原因是:最終一致性(eventual consistency)。
因為 replication lag,在某一時刻不同分片的數據可能不一樣,所以需要先預取。這就需要使用 nonQuorumConn 來預取,因為本地 redis 的數據才是正確的。
private Set<String> doPeekIdsFromShardHelper(final String queueShardName, final double peekTillTs, final int offset,final int count) {
return nonQuorumConn.zrangeByScore(queueShardName, 0, peekTillTs, offset, count);
}
再比如處理沒有 ack 的消息時,先從 nonQuorumConn 讀取信息ID,再從 quorumConn 讀取消息內容。
這就是因為一致性導致的,所以如下:
@Override
public void processUnacks() {
execute("processUnacks", keyName, () -> {
Set<Tuple> unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize);
for (Tuple unack : unacks) {
double score = unack.getScore();
String member = unack.getElement();
String payload = quorumConn.hget(messageStoreKey, member);
long added_back = quorumConn.zadd(localQueueShard, score, member);
}
});
}
再比如從本地提取消息就使用了 nonQuorumConn。
@Override
public Message localGet(String messageId) {
try {
return execute("localGet", messageStoreKey, () -> {
String json = nonQuorumConn.hget(messageStoreKey, messageId);
Message msg = om.readValue(json, Message.class);
return msg;
});
}
}
再比如 popWithMsgIdHelper 也是先讀取 nonQuorumConn,再從 quorumConn 讀取其他內容。
public Message popWithMsgIdHelper(String messageId, String targetShard, boolean warnIfNotExists) {
try {
return execute("popWithMsgId", targetShard, () -> {
String queueShardName = getQueueShardKey(queueName, targetShard);
double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();
String unackShardName = getUnackKey(queueName, targetShard);
ZAddParams zParams = ZAddParams.zAddParams().nx();
Long exists = nonQuorumConn.zrank(queueShardName, messageId);
// If we get back a null type, then the element doesn't exist.
if (exists == null) {
// We only have a 'warnIfNotExists' check for this call since not all messages are present in
// all shards. So we want to avoid a log spam. If any of the following calls return 'null' or '0',
// we may have hit an inconsistency (because it's in the queue, but other calls have failed),
// so make sure to log those.
monitor.misses.increment();
return null;
}
String json = quorumConn.hget(messageStoreKey, messageId);
if (json == null) {
monitor.misses.increment();
return null;
}
long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams);
if (added == 0) {
monitor.misses.increment();
return null;
}
long removed = quorumConn.zrem(queueShardName, messageId);
if (removed == 0) {
monitor.misses.increment();
return null;
}
Message msg = om.readValue(json, Message.class);
return msg;
});
}
}
0x04 外層封裝
RedisQueues是為用戶提供的外部接口,從其成員變量可以看出來其內部機制,比如各種策略。
public class RedisQueues implements Closeable {
private final Clock clock;
private final JedisCommands quorumConn;
private final JedisCommands nonQuorumConn;
private final Set<String> allShards;
private final String shardName;
private final String redisKeyPrefix;
private final int unackTime;
private final int unackHandlerIntervalInMS;
private final ConcurrentHashMap<String, DynoQueue> queues;
private final ShardingStrategy shardingStrategy;
private final boolean singleRingTopology;
}
用戶通過get方法來得到DynoQueue:DynoQueue V1Queue = queues.get("simpleQueue")。
public DynoQueue get(String queueName) {
String key = queueName.intern();
return queues.computeIfAbsent(key, (keyToCompute) -> new RedisDynoQueue(clock, redisKeyPrefix, queueName, allShards, shardName, unackHandlerIntervalInMS, shardingStrategy, singleRingTopology)
.withUnackTime(unackTime)
.withNonQuorumConn(nonQuorumConn)
.withQuorumConn(quorumConn));
}
0x05 數據結構
我們看看 Dyno-queues 中幾種數據結構。
5.1 消息結構
一個完整的消息隊列應該定義清楚自己可以投遞的消息類型,如事務型消息,本地非持久型消息,以及服務端不落地的非可靠消息等。對不同的業務場景做不同的選擇。
Dyno-queues 只有服務端落地的可靠消息。每個延時消息必須包括以下參數:
- id:唯一標示;
- payload:消息過期之后發送mq的body,提供給消費這做具體的消息處理;
- timeout:延時發送時間;
- priority:優先級,與timeout一起決定消息如何發布,即同一 timeout 時間的消息中,哪個優先使用。
- shard:分區;
public class Message {
private String id;
private String payload;
private long timeout;
private int priority;
private String shard;
}
5.2 存儲結構
Dyno-queues 關於存儲的總體思路是:hash 記錄消息內容, zset 實現按到期時間排序的隊列,即:
- 利用hash 記錄消息內容;
- 使用hset存儲消息;
- 使用hget提取消息;
- 通過Redis中的zset來實現一個延遲隊列,主要利用它的score屬性,Redis通過score來為集合中的成員進行從小到大的排序;
- 使用zadd key score1 value1命令生產消息;
- 使用zrem消費消息;
具體邏輯如圖,這里的虛線指的是兩者通過 msg id 來進行邏輯上的管理,物理沒有關聯:
+----------+----------+----------+-----+----------+
| | | | | |
zset | msg id 1 | msg id 2 | msg id 3 | ... | msg id n |
| | | | | |
+---+------+----+-----+----+-----+-----+----+-----+
| | | |
| | | |
v v v v
+---+---+ +---+---+ +--+----+ +--+--+
hash | msg 1 | | msg 2 | | msg 3 | |msg n|
+-------+ +-------+ +-------+ +-----+
具體到代碼,則是:
- Message 的id作為key,Message整體被打包成json String作為value:
quorumConn.hset(messageStoreKey, message.getId(), json); - 用Message 的超時時間,優先級以及當前時間戳構建出zset的分數:
double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;
具體參見如下:
for (Message message : messages) {
String json = om.writeValueAsString(message);
quorumConn.hset(messageStoreKey, message.getId(), json);
double priority = message.getPriority() / 100.0;
double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;
String shard = shardingStrategy.getNextShard(allShards, message);
String queueShard = getQueueShardKey(queueName, shard);
quorumConn.zadd(queueShard, score, message.getId());
}
0x06 隊列
RedisDynoQueue是 Dyno-queues 延遲隊列的主要實現。
6.1 Redis相關
從Redis角度來看,對於每個隊列,維護三組Redis數據結構:
- 包含隊列元素和分數的有序集合 zset;
- 包含消息內容的Hash集合,其中key為消息ID;
- 包含客戶端已經消費但尚未確認的消息有序集合,Un-ack集合 zset;
這三組Redis數據結構在RedisDynoQueue內部其實沒有對應的成員變量,對於RedisDynoQueue 來說,看起來是邏輯概念,而事實上它們存在於Redis的內部存儲中,由Dynomite負責高可用等等。
具體如下:
message list
zset +----------+----------+----------+-----+----------+
| | | | | |
| msg id 1 | msg id 2 | msg id 3 | ... | msg id 9 |
| | | | | |
+---+------+----+-----+----+-----+-----+----+-----+
| | | |
| | | |
v v v v
hash +---+---+ +---+---+ +--+----+ +--+--+
| msg 1 | | msg 2 | | msg 3 | |msg 9|
+-------+ +-------+ +-------+ +-----+
unack list
+------------+-------------+--------------+
zset | | | |
| msg id 11 | msg id 12 | msg id 13 |
| | | |
+------------+-------------+--------------+
6.2 成員變量
RedisDynoQueue 的成員變量可以分類如下:
6.2.1 總體
- String queueName:本Queue名字;
- String shardName:分區名字;
6.2.2 Redis連接相關
- JedisCommands quorumConn:采用 quorum 的連接;
- JedisCommands nonQuorumConn:非Quorum的連接;
6.2.3 Redis操作相關
-
ObjectMapper om:用來把消息序列化,寫到redis中;
-
Clock clock:用以為分數生成時間戳;
-
String redisKeyPrefix:每個queue的用戶會給自己定義key;
-
String messageStoreKey:對於每個Redis hash來說,可以設定自己的field(字段),比如:
this.messageStoreKey = redisKeyPrefix + ".MESSAGE." + queueName; quorumConn.hget(messageStoreKey, messageId) -
List
allShards:所有分區; -
String localQueueShard:本地分區;
-
ShardingStrategy shardingStrategy:分區策略;
-
ConcurrentLinkedQueue
prefetchedIds:Prefetch message IDs from the local shard;本地分區優先的消息; -
Map<String, ConcurrentLinkedQueue
> unsafePrefetchedIdsAllShardsMap; this.unsafePrefetchedIdsAllShardsMap = new HashMap<>(); for (String shard : allShards) { unsafePrefetchedIdsAllShardsMap.put(getQueueShardKey(queueName, shard), new ConcurrentLinkedQueue<>()); } -
int retryCount = 2:重試次數;
6.2.4 Ack相關
-
int unackTime = 60:用以生成ack隊列的分數。
double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue(); long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams); -
ScheduledExecutorService schedulerForUnacksProcessing:用以生成線程,來定期ack
schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);
if (this.singleRingTopology) {
schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
} else {
schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
}
- boolean singleRingTopology:
6.2.5 監控與統計
QueueMonitor monitor:監控與統計;
6.2.6 具體定義
具體代碼如下:
public class RedisDynoQueue implements DynoQueue {
private final Clock clock;
private final String queueName;
private final List<String> allShards;
private final String shardName;
private final String redisKeyPrefix;
private final String messageStoreKey;
private final String localQueueShard;
private volatile int unackTime = 60;
private final QueueMonitor monitor;
private final ObjectMapper om;
private volatile JedisCommands quorumConn;
private volatile JedisCommands nonQuorumConn;
private final ConcurrentLinkedQueue<String> prefetchedIds;
private final Map<String, ConcurrentLinkedQueue<String>> unsafePrefetchedIdsAllShardsMap;
private final ScheduledExecutorService schedulerForUnacksProcessing;
private final int retryCount = 2;
private final ShardingStrategy shardingStrategy;
private final boolean singleRingTopology;
}
至此,Dyno-queues 基本功能初步分析完畢,我們下期繼續介紹消息產生,消費。
0xFF 參考
消息隊列的理解,幾種常見消息隊列對比,新手也能看得懂!----分布式中間件消息隊列
http://blog.mikebabineau.com/2013/02/09/delay-queues-in-redis/
http://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq
http://activemq.apache.org/delay-and-schedule-message-delivery.html
源碼分析] Dynomite 分布式存儲引擎 之 DynoJedisClient(1)
源碼分析] Dynomite 分布式存儲引擎 之 DynoJedisClient(2)
