C#異步案例一則


場景

  生產者和消費者隊列, 生產者有多個, 消費者也有多個, 生產到消費需要異步.

下面用一個Asp.NetCore Web-API項目來模擬

  創建兩個API, 一個Get(), 一個Set(), Get返回一個字符串, Set放入一個字符串, Get返回的就是Set進去的字符串.

  實現如下:  

[Route("api/[controller]/[action]")]
public class FooController : Control
{
    IMessageQueue _mq;
    public FooController(IMessageQueue mq)
    {
        _mq = mq;
    }

    [HttpGet]
    public string Get()
    {
        string str = _mq.ReadOne<string>();
        return str;
    }

    [HttpGet]
    public void Set(string v)
    {
        _mq.WriteOne(v);
    }
}

public interface IMessageQueue
{
    T ReadOne<T>();
    void WriteOne<T>(T value);
}

public class MessageQueue: IMessageQueue
{
    private object _value;

    public T ReadOne<T>()
    {
        return (T)_value;
    }

    public void WriteOne<T>(T value)
    {
        _value = value;

    }
}

接着在StartUp中把IMessageQueue給注入了.

services.AddSingleton<IMessageQueue, MessageQueue>();

運行后, 先調用/api/foo/set/?v=xxx, 再調用/api/foo/get/

可以看到成功返回了xxx

第二步, value字段改為隊列:

使set進去的值不會被下一個覆蓋, get取隊列最前的值

為了線程安全, 這里使用了ConcurrentQueue<T>

代碼如下:

public class MessageQueue: IMessageQueue
{
    private readonly ConcurrentQueue<object> _queue = new ConcurrentQueue<object>();

    public T ReadOne<T>()
    {
        _queue.TryDequeue(out object str);
        return (T)str ;
    }

    public void WriteOne<T>(Tvalue)
    {
        _queue.Enqueue(value);
    }
}

那么此時, 只要get不斷地輪詢, 就可以取到set生產出來的數據了.

調用/api/foo/set/

三, 異步阻塞

再增加需求, 調換get和set的順序,先get后set模擬異步, (我這里的demo是個web-api會有http請求超時之類的...假裝不存在)我想要get調用等待有數據時才返回.

也就是說我想要在瀏覽器地址欄輸入http://localhost:5000/api/foo/get/之后會不斷地轉圈直到我用set接口放入一個值

方案A: while(true), 根本無情簡直無敵, 死等Read() != null時break; 為防單核滿轉加個Thread.Sleep();

方案B: Monitor, 一個Wait()一個Exit/Release();

但是以上兩個方案都是基於Thread的, .Net4.0之后伴隨ConcurrentQueue一起來的還有個BlockingCollection<T>相當好用

方案C: 修改后代碼如下:

public class MessageQueue : IMessageQueue
{
    private readonly BlockingCollection<object> _queue = new BlockingCollection<object>(new ConcurrentQueue<object>());

    public T ReadOne<T>()
    {
        var obj = _queue.Take();
        return (T)obj;
    }

    public void WriteOne<T>(T value)
    {
        _queue.Add(value);
    }
}

此時, 如果先get, 會阻塞等待set; 如果已經有set過數據就會直接返回隊列中的數據. get不會無功而返了. 基於這個類型, 可以實現更像樣的訂閱模型.

擴展RPC

這里的set是生產者, get是消費者, 那如果我的這個生產者並不單純產生數據返回void而是需要等待一個結果的呢? 此時訂閱模型不夠用了, 我需要一個異步的RPC .

比如有個Ask請求會攜帶參數發起請求, 並等待, 知道另外有個地方處理了這個任務產生結果, ask結束等待返回這個結果answer. 

我可以回頭繼續用方案A或B, 但連.net4.0都已經過去很久了, 所以應該用更好的基於Task的異步方案.

代碼如下, 首先新增兩個接口:

public interface IMessageQueue
{
    void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func); Task<TResponse> Rpc<TRequest, TResponse>(TRequest req);

    T ReadOne<T>();
    void WriteOne<T>(T data);
}

接着定義一個特殊的任務類:

public class RpcTask<TRequest, TResponse>
{
    public TaskCompletionSource<TResponse> Tcs { get; set; }
    public TRequest Request { get; set; }
}

實現剛才新加的兩個接口:

public Task<TResponse> Rpc<TRequest, TResponse>(TRequest req)
{
    TaskCompletionSource<TResponse> tcs = new TaskCompletionSource<TResponse>();
    _queue.Add(new RpcTask<TRequest, TResponse> { Request = req, Tcs = tcs});
    return tcs.Task;
}

public void Respond<TRequest, TResponse>(Func<TRequest, TResponse> func)
{
    var obj = _queue.Take();
    if(obj is RpcTask<TRequest, TResponse> t)
    {
        var response = func(t.Request);
        t.Tcs.SetResult(response);
    }
}

同樣的, 寫兩個Web API接口, 一個請求等待結果 一個負責處理工作

[HttpGet]
public async Task<string> Ask(string v)
{
    var response = await _mq.Rpc<MyRequest, MyResponse>(new MyRequest { Id = v });
    return $"[{response.DoneTime}] {response.Id}";
}

[HttpGet]
public void Answer()
{
    _mq.Respond<MyRequest, MyResponse>((req)=> new MyResponse { Id = req.Id, DoneTime = DateTime.Now });
}

上面還隨便寫了兩個class作為請求和返回

public class MyRequest
{
    public string Id { get; set; }
}
public class MyResponse
{
    public string Id { get; set; }
    public DateTime DoneTime { get; set; }
}

測試一下, 用瀏覽器或postman打開三個選項卡, 各發起一個Ask接口的請求, 參數v分別為1 2 3, 三個選項卡都開始轉圈等待

然后再打開一個選項卡訪問answer接口, 處理剛才放進隊列的任務, 發起一次之前的三個選項卡之中就有一個停止等待並顯示返回數據. 需求實現.

這里用到的關鍵類型是TaskCompletionSource<T>.

再擴展

如果是個分布式系統, 請求和處理邏輯不是在一個程序里呢? 那么這個隊列可能也是一個單獨的服務. 此時就要再加個返回隊列了, 給隊列中傳輸的每一個任務打上Id, 返回隊列中取出返回之后再找到Id對於的TCS.SetResult()

 

 

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM