Dapr-狀態管理


前言:

 前一篇對Dapr的服務調用方式進行了解,本篇繼續對狀態管理進行了解。

一、狀態管理-解決的問題

 在分布式應用程序中跟蹤狀態存在一下問題:

  • 應用程序可能需要不同類型的數據存儲
  • 訪問和更新數據時可能需要不同的一致性級別。
  • 多個用戶可以同時更新數據,需要沖突解決。
  • 在與數據存儲交互時,服務必須重試發生的任何短暫的暫時性錯誤 。

 Dapr 狀態管理構建塊解決了這些難題。 它簡化了跟蹤狀態,而無需依賴關系或第三方存儲 Sdk 上的學習曲線。

Dapr 狀態管理提供 密鑰/值 API。 此功能不支持關系數據存儲或圖形數據存儲

二、狀態管理-工作原理

 應用程序可以使用Dapr的狀態管理API,使用狀態存儲組件保存和讀取鍵/值對。通過 HTTP 或 gRPC 調用 API。 

 

 例如,通過使用HTTP POST可以保存鍵/值對,通過使用HTTP GET可以讀取一個鍵並返回它的值。

 

三、狀態管理-特點

  • 可插拔狀態存儲

  Dapr數據存儲被建模為組件,可以在不修改你的服務代碼的情況下進行替換。

  • 一致性

   CAP 定理是一組適用於存儲狀態的分布式系統的原則。下圖展示了CAP定理的三個屬性。

  

  定理指出,分布式數據系統提供一致性、可用性和分區容差之間的權衡。 而且,任何數據存儲只能 保證三個屬性中的兩個

    • 一致性 (C) 。 群集中的每個節點都將使用最新的數據做出響應,即使在所有副本更新之前,系統都必須阻止請求。 如果查詢當前正在更新的項的 "一致性系統",則在所有副本都成功更新之前,將不會收到響應。 但是,您將始終接收最新的數據。

    • 可用性 () 。 即使該響應不是最新的數據,每個節點都將返回立即響應。 如果您在 "可用系統" 中查詢正在更新的項,則您將獲得該服務在此時可以提供的最佳可能的答案。

    • 分區容差 (P) 。 即使復制的數據節點發生故障或失去與其他復制的數據節點的連接,保證系統仍可繼續運行。

  分布式應用程序必須處理 P 屬性。 隨着服務彼此間的網絡調用通信,會發生網絡中斷 (P) 。 考慮到這一點,分布式應用程序必須是 AP 或 CP。

  AP 應用程序選擇 "可用性一致性"。 Dapr 通過其 最終一致性 策略支持此選擇。 請考慮使用基礎數據存儲(例如 Azure CosmosDB)將冗余數據存儲在多個副本上。 對於最終一致性,狀態存儲會將更新寫入一個副本並完成與客戶端的寫入請求。 此時間過后,存儲將以異步方式更新其副本。 讀取請求可以返回任何副本的數據,包括尚未收到最新更新的副本。

  CP 應用程序選擇一致性和可用性。 Dapr 通過其 強一致性 策略支持此選擇。 在此方案中,狀態存儲將同步更新 所有 (或在某些情況下,在完成寫入請求 之前) 必需副本的 仲裁。 讀取操作將跨副本持續返回最新數據。

  • 並發

   在多用戶應用程序中,有可能多個用戶同時更新同一時間) (相同的數據。 Dapr 支持樂觀並發控制 (OCC) 來管理沖突。 OCC 基於一個假設,因為用戶處理數據的不同部分,所以更新沖突很少見。 更有效的方法是將更新成功,如果不成功,則重試。 實現悲觀鎖定的替代方法可能會影響長時間運行的鎖定,導致數據爭用。

  Dapr 支持使用 Etag) (OCC 的樂觀並發控制。 ETag 是與存儲的鍵/值對的特定版本相關聯的值。 鍵/值對的每次更新時,ETag 值也會更新。 當客戶端檢索鍵/值對時,響應包括當前 ETag 值。 當客戶端更新或刪除鍵/值對時,它必須在請求正文中發送回該 ETag 值。 如果其他客戶端同時更新了數據,則 Etag 不會匹配,請求將失敗。 此時,客戶端必須檢索更新的數據,重新進行更改,然后重新提交更新。 此策略稱為 第一次寫入-wins。

  Dapr 還支持 最后寫入 wins 策略。 使用此方法時,客戶端不會將 ETag 附加到寫入請求。 狀態存儲組件將始終允許更新,即使基礎值在會話期間已更改也是如此。 最后寫入-wins 對於數據爭用較少的高吞吐量寫入方案非常有用。 同樣,可以容忍偶爾的用戶更新。

  • 事務

   Dapr 可以將 多項更改 作為一個作為事務實現的操作寫入數據存儲區。 此功能僅適用於支持 ACID 事務的數據存儲。 在撰寫本文時,這些存儲包括 Redis、MongoDB、PostgreSQL、SQL Server和 Azure CosmosDB。

在下面的示例中,多項操作將發送到單個事務中的狀態存儲。 所有操作都必須成功,事務才能提交。 如果一個或多個操作失敗,則回滾整個事務。

四、.Net Core中應用

  1、在項目【DaprFrontEnd】中添加控制器-DaprStateController  用於展示狀態的各種操作

[Route("[controller]")]
[ApiController]
public class StateController : ControllerBase
{
    private readonly ILogger<DaprStateController> _logger;
    private readonly DaprClient _daprClient;
    public StateController(ILogger<StateController> logger, DaprClient daprClient)
    {
        _logger = logger;
        _daprClient = daprClient;
    }
    /// <summary>
    /// 獲取值
    /// </summary>
    /// <returns></returns>
    [HttpGet]
    public async Task<ActionResult> GetAsync()
    {
        var result = await _daprClient.GetStateAsync<string>("statestore", "guid");
        return Ok(result);
    }
    /// <summary>
    /// 保存值
    /// </summary>
    /// <returns></returns>
    [HttpPost]
    public async Task<ActionResult> PostAsync()
    {
        await _daprClient.SaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), new StateOptions() { Consistency = ConsistencyMode.Strong });
        return Ok("done");
    }
    /// <summary>
    /// 刪除值
    /// </summary>
    /// <returns></returns>
    [HttpDelete]
    public async Task<ActionResult> DeleteAsync()
    {
        await _daprClient.DeleteStateAsync("statestore", "guid");
        return Ok("done");
    }
    /// <summary>
    /// 通過tag防止並發沖突,保存值
    /// </summary>
    /// <returns></returns>
    [HttpPost("withtag")]
    public async Task<ActionResult> PostWithTagAsync()
    {
        var (value, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
        await _daprClient.TrySaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), etag);
        return Ok("done");
    }
    /// <summary>
    /// 通過tag防止並發沖突,刪除值
    /// </summary>
    /// <returns></returns>
    [HttpDelete("withtag")]
    public async Task<ActionResult> DeleteWithTagAsync()
    {
        var (value, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
        return Ok(await _daprClient.TryDeleteStateAsync("statestore", "guid", etag));
    }
    /// <summary>
    /// 從綁定獲取值,健值name從路由模板獲取
    /// </summary>
    /// <param name="state"></param>
    /// <returns></returns>
    [HttpGet("frombinding/{name}")]
    public async Task<ActionResult> GetFromBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
    {
        return await Task.FromResult<ActionResult>(Ok(state.Value));
    }
    /// <summary>
    /// 根據綁定獲取並修改值,健值name從路由模板獲取
    /// </summary>
    /// <param name="state"></param>
    /// <returns></returns>
    [HttpPost("withbinding/{name}")]
    public async Task<ActionResult> PostWithBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
    {
        state.Value = Guid.NewGuid().ToString();
        return Ok(await state.TrySaveAsync());
    }
    /// <summary>
    /// 獲取多個值
    /// </summary>
    /// <returns></returns>
    [HttpGet("list")]
    public async Task<ActionResult> GetListAsync()
    {
        var result = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
        return Ok(result);
    }
    /// <summary>
    /// 刪除多個值
    /// </summary>
    /// <returns></returns>
    [HttpDelete("list")]
    public async Task<ActionResult> DeleteListAsync()
    {
        var data = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
        var removeList = new List<BulkDeleteStateItem>();
        foreach (var item in data)
        {
            removeList.Add(new BulkDeleteStateItem(item.Key, item.ETag));
        }
        await _daprClient.DeleteBulkStateAsync("statestore", removeList);
        return Ok("done");
    }
}

 2、啟動程序

dapr run --dapr-http-port 3501 --app-port 8230  --app-id frontend dotnet  .\DaprFrontEnd.dll

 3、調用過程:

  

  

五、總結:

  Dapr 狀態管理構建塊提供了一個 API,用於在各種數據存儲區中存儲鍵/值數據。 API 為以下內容提供支持:

  • 批量操作
  • 強一致性和最終一致性
  • 樂觀並發控制
  • 多項事務


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM