Redis之上的分布式Java隊列


最近學習的勢頭大漲,碼了很多干貨。分享給大家參考學習!

通過優銳課的java學習筆記中,了解到關於讓我們使用Redisson Java框架討論六種不同類型的基於Redis的分布式隊列。

 

1、在Redis中使用隊列

Redis是一個功能強大的工具,支持從字符串和列表到映射和流的許多不同類型的數據結構。 開發人員將Redis用於多種目的,包括用於數據庫,緩存和消息代理。

像任何消息代理一樣,Redis需要以正確的順序發送消息。 可以根據消息的年齡或某些其他預定義的優先級等級發送消息。

為了存儲這些未決消息,Redis開發人員需要隊列數據結構。 Redisson是使用Redis和Java進行分布式編程的框架,它提供了許多分布式數據結構(包括隊列)的實現。

Redisson通過提供Java API使Redis開發更加容易。 Redisson不需要開發人員學習Redis命令,而是包括所有眾所周知的Java接口,例如Queue和BlockingQueue。 Redisson還處理Redis中繁瑣的幕后工作,例如連接管理,故障轉移處理和數據序列化。

2、基於Redis的分布式Java隊列

Redisson提供了Java中基本隊列數據結構的多個基於Redis的實現,每種實現都有不同的功能。 這使可以選擇最適合目的的隊列類型。

下面,我們將使用Redisson Java框架討論六種不同類型的基於Redis的分布式隊列。

3、隊列

Redisson中的RQueue對象實現了java.util.Queue接口。 隊列用於需要從最早的最早的元素開始處理(也稱為“先進先出”或FIFO)的情況。

與普通Java一樣,可以使用peek()方法檢查RQueue的第一個元素,或者使用poll()方法檢查和刪除RQueue的第一個元素:

1 RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
2 
3 queue.add(new SomeObject());
4 
5 SomeObject obj = queue.peek();
6 
7 SomeObject someObj = queue.poll();

 

4、阻塞隊列

Redisson中的RBlockingQueue對象實現了java.util.BlockingQueue接口。

BlockingQueues是阻塞線程的隊列,這些線程試圖從空隊列中進行輪詢,或者試圖在已滿的隊列中插入元素。 該線程將被阻塞,直到另一個線程將一個元素插入到空隊列中,或從完整隊列中輪詢第一個元素為止。

下面的示例代碼演示了RBlockingQueue的正確實例化和使用。 特別是,可以使用參數指定對象將等待線程變得可用的時間來調用poll()方法:

1 RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
2 
3 queue.offer(new SomeObject());
4 
5 SomeObject obj = queue.peek();
6 
7 SomeObject someObj = queue.poll();
8 
9 SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

 

在故障轉移或重新連接到Redis服務器的過程中,將自動重新預訂poll(),pollFromAny(),pollLastAndOfferFirstTo()和take()Java方法。

5、BoundedBlockingQueue

Redisson中的RBoundedBlockingQueue對象實現了有界的阻塞隊列結構。 有界阻塞隊列是容量已受限制(即有限)的阻塞隊列。

以下代碼演示了如何在Redisson中實例化和使用RBoundedBlockingQueue。 trySetCapacity()方法用於嘗試設置阻塞隊列的容量。 trySetCapacity()返回布爾值“ true”或“ false”,這取決於是否成功設置了容量或是否已經設置了容量:

 1 RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
 2 
 3 queue.trySetCapacity(2);
 4 
 5 queue.offer(new SomeObject(1));
 6 
 7 queue.offer(new SomeObject(2));
 8 
 9 // will be blocked until free space available in queue
10 
11 queue.put(new SomeObject());
12 
13 SomeObject obj = queue.peek();
14 
15 SomeObject someObj = queue.poll();
16 
17 SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

 

6、延遲排隊

Redisson中的RDelayedQueue對象允許Redis中實現延遲隊列。 當使用諸如指數補償的策略將消息傳遞給消費者時,這可能會很有用。 每次嘗試發送郵件失敗后,重試之間的時間將成倍增加。

在與元素一起指定的延遲之后,延遲隊列中的每個元素將被轉移到目標隊列。 此目標隊列可以是實現RQueue接口的任何隊列,例如RBlockingQueue或RBoundedBlockingQueue。

 1 RQueue<String> destinationQueue = redisson.getQueue("anyQueue");
 2 
 3 RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue);
 4 
 5 // move object to destinationQueue in 10 seconds
 6 
 7 delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
 8 
 9 // move object to destinationQueue in 1 minute
10 
11 delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

 

在不再需要隊列之后,通過使用destroy()方法銷毀延遲的隊列是一個好主意。 但是,如果要關閉Redisson,則沒有必要。

7、PriorityQueue

Redisson中的RPriorityQueue對象實現了java.util.Queue接口。 優先級隊列是不是按元素的使用期限而是按照與每個元素相關聯的優先級排序的隊列。

如下面的示例代碼所示,RPriorityQueue使用比較器對隊列中的元素進行排序:

 1 RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
 2 
 3 queue.trySetComparator(new MyComparator()); // set object comparator
 4 
 5 queue.add(3);
 6 
 7 queue.add(1);
 8 
 9 queue.add(2);
10 
11 queue.removeAsync(0);
12 
13 queue.addAsync(5);
14 
15 queue.poll();

 

8、PriorityBlockingQueue

Redisson中的RPriorityBlockingQueue對象結合了RPriorityQueue和RBlockingQueue的功能。 與RPriorityQueue一樣,RPriorityBlockingQueue也使用Comparator對隊列中的元素進行排序。

 1 RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
 2 
 3 queue.trySetComparator(new MyComparator()); // set object comparator
 4 
 5 queue.add(3);
 6 
 7 queue.add(1);
 8 
 9 queue.add(2);
10 
11 queue.removeAsync(0);
12 
13 queue.addAsync(5);
14 
15 queue.take();

 

在故障轉移或重新連接到Redis服務器的過程中,將自動重新預訂poll(),pollLastAndOfferFirstTo()和take()Java方法。

 文章分享到這里,如有不足之處,歡迎補充評論!

抽絲剝繭,細說架構那些事!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM