Azure 基礎:Queue Storage


Azure Storage 是微軟 Azure 雲提供的雲端存儲解決方案,當前支持的存儲類型有 Blob、Queue、File 和 Table。

筆者在前文中介紹了 File Storage 的基本用法,本文將介紹 Queue Storage 的主要使用方法。

Queue Storage 是什么?

Azure Queue Storage 是一個存儲大量消息的存儲服務,這些消息可以在任何地方通過 HTTP/HTTPS 訪問。每條消息最大 64K,消息的數據量幾乎不受限制 (除非超出了您的 Storage Account 的總容量) 。

下面是 Queue Storage 典型的應用場景:
1.    創建未處理任務的隊列,以便異步的處理這些任務。
2.    把消息從 web role 傳遞給 worker role 進行處理。

Azure Queue Storage的結構

下圖描述了 Queue Storage 的基本組織結構:

Azure Storage Account:

Storage Account 是用來管理 Azure Storage 的一個命名空間,主要用來控制存儲數據的訪問權限和計費。對 Blob、Queue、File 和 Table 這些 Azure 提供的存儲服務的訪問控制都是通過 Storage Account 來進行的,所以要想使用 Queue Storage,需要先創建你的 Storage Account。

Queue:

每個 Queue 都是一組消息的集合。每一條消息都必須屬於一個 Queue。Queue 名稱中的字符必須是小寫。

Message:

每條 Message 的最大長度為 64KB,Message 在 Queue 中停留的最長時間為 7 天。

URL format:

Queue 的 URL 地址格式為:

http://<storage account>.queue.core.windows.net/<queuename>

下面是個更真實的例子:

http://nickstorage.queue.core.windows.net/app1tasks

如果您還不熟悉 Azure Storage Account 的使用,以及如何通過 WindowsAzure.Storage 庫訪問 Azure Storage,請參考前文《Azure Table storage 基本用法》中的介紹,這里就不重復了。

為了方便查看 C# 代碼執行的結果,本文使用了 MS 發布的一個 Azure Storage 客戶端工具:Microsoft Azure Storage Explorer,文中簡稱為 Storage Explorer。下面是 Queue Storage 的一個截圖:

接下來我們通過 C# 代碼來介紹如何操作 Queue Storage。

創建 Queue

我們先來創建一個名為 "app2tasks" 的 Queue:

// CloudStorageAccount 類表示一個 Azure Storage Account,我們需要先創建它的實例,才能訪問屬於它的資源。
// 注意連接字符串中的xxx和yyy,分別對應Access keys中的Storage account name 和 key。
CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=xxx;AccountKey=yyy");

// CloudQueueClient 類是 Windows Azure Queue Service 客戶端的邏輯表示,我們需要使用它來配置和執行對 Queue Storage 的操作。
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
// CloudQueue 表示一個 Queue 對象, 絕大多數的操作都是通過這個對象完成的。
CloudQueue queue = queueClient.GetQueueReference("app2tasks");
// 如果不存在就創建名稱為 "app2tasks" 的 Queue。
queue.CreateIfNotExists();

執行上面的代碼,然后在 Storage Explorer 中查看結果:

把消息插入 Queue

現實的應用場景中肯定有一個或多個程序產生 Message 並插入到 Queue 中,接下來我們看看用 C# 如何實現:

string current = DateTime.Now.ToString();
// 把消息插入到隊列中。
CloudQueueMessage message = new CloudQueueMessage("Hello, World. -- " + current);
queue.AddMessage(message)

調用幾次上面的代碼看看結果如何:

我通過三次調用向 Queue 中加入了三條消息,請注意插入它們的時間,分別是 11:33:45,11:33:57,和 11.34:16。在接下來的描述中我分別稱它們為第一條消息、第二條消息和第三條消息。

查看 Queue 中的消息

既然是隊列,肯定有隊頭和隊尾,消息從隊頭出隊從隊尾入隊。那么能不能查看一下隊頭的消息 (也就是下一條要處理的消息,此處只是查看並不是要處理) 呢?當然可以:

// 總是取到隊頭的消息,沒有消息出隊。
// 消息在隊列中的位置、可見狀態也沒有發生變化。 
CloudQueueMessage peekedMessage = queue.PeekMessage();

PeekMessage 方法總是取到處於隊頭位置的那條消息,並且不改變隊列的狀態!

為了幫助小伙伴們更深刻地理解 PeekMessage 方法的內涵,筆者從網上找了一張解釋 peek 一詞的圖片,請注意圖片中的黃色線條:

(本圖片來自於互聯網,如有版權問題請與筆者聯系。)

查看 Queue 的長度

經常的查看 Queue 的長度是個不錯的注意,因為你需要避免一些由於 Queue 過長帶來的問題:

// 獲取 Queue 的屬性。
queue.FetchAttributes();
int? cachedMessageCount = queue.ApproximateMessageCount;

更新 Queue 中的消息

如果一條消息已經被添加到 Queue 中了,但是又需要更新其內容該怎么辦?我們可以找到這條消息然后更新它的內容:

CloudQueueMessage message = queue.GetMessage();
// 執行 getmessage(), 隊頭的消息會變得不可見。
message.SetMessageContent("Updated contents.");
queue.UpdateMessage(message,
    TimeSpan.FromSeconds(60.0), 
    MessageUpdateFields.Content | MessageUpdateFields.Visibility);
// 更新完消息內容的60s 之后,該消息會重新可見,但是是在隊尾。

執行上面的代碼后,我們發現在 Storage Explorer 中"第一條消息"不見了。過了 60 秒之后它又重新出現在 Storage Explorer 中,但是它的內容已經變化,位置也成了隊尾:

此時我們也只能通過 ID 認出它是之前的"第一條消息",之前"第二條消息","第三條消息"的位置也發生了相應的變化。

處理 Queue 中的消息

如何處理 Queue 中的消息呢?我們的程序大體應該遵循下面的邏輯:
使用 GetMessage 方法取出隊頭的消息,此時該消息會在 Queue 中 30 秒不可見(這個時常用戶可以設置,默認是 30 秒);
處理消息;
正常處理完成后,調用 Delete 方法刪除消息;
如果沒有正常處理消息 (沒有調用 Delete 方法),此消息會在 30 秒后重新出現在隊尾。
類似於下面的代碼邏輯:

// 執行 getmessage(), 隊頭的消息會變得不可見。
CloudQueueMessage message = queue.GetMessage();
try
{
    //處理消息
    // 如果在30s內你沒有刪除這條消息,它會重新出現在隊尾。
    // 所以正確處理一條消息的過程是,處理完成后,刪除這條消息
    queue.DeleteMessage(message);
}
catch //(消息處理異常)
{ }

刪除 Queue 中的消息

除了正常處理完消息后把消息從隊列中刪除,我們也可以找到一條消息,直接刪除它。本質上和處理完再刪除是一樣的。

總結

Queue Storage 為應用之間的解耦提供了很好的解決方式。使得消息的產生者和消息的處理者可以互相不知道彼此的存在。為我們處理這類問題添加了一件有力的武器。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM