本文將詳解C#類當中的Task,以及異步函數async await和Task的關系
一.Task的前世今生
1.Thread
一開始我們需要創建線程的時候一般是通過Thread創建線程,一般常用創建線程方式有以下幾種:
static void Main(string[] args)
{
Console.WriteLine("begin");
Thread thread = new Thread(() => TestMethod(2));
thread.IsBackground = true;//設置為后台線程,默認前台線程
thread.Start();
Thread thread1 = new Thread(() => TestMethod1());
//設置thread1優先級為最高,系統盡可能單位時間內調度該線程,默認為Normal
thread1.Priority = ThreadPriority.Highest;
thread1.Start();
Thread thread2 = new Thread((state) => TestMethod2(state));
thread2.Start("data");
thread2.Join();//等待thread2執行完成
Console.WriteLine("end");
}
static void TestMethod(int a)
{
Thread.Sleep(1000);
Console.WriteLine($"TestMethod: run on Thread id :{Thread.CurrentThread.ManagedThreadId},is threadPool:{Thread.CurrentThread.IsThreadPoolThread}" +
$",is Backgound:{Thread.CurrentThread.IsBackground}, result:{a}");
}
static void TestMethod1()
{
Thread.Sleep(1000);
Console.WriteLine($"TestMethod1: run on Thread id :{Thread.CurrentThread.ManagedThreadId},is threadPool:{Thread.CurrentThread.IsThreadPoolThread}" +
$",is Backgound:{Thread.CurrentThread.IsBackground},no result ");
}
static void TestMethod2(object state)
{
Thread.Sleep(2000);
Console.WriteLine($"TestMethod2 :run on Thread id :{Thread.CurrentThread.ManagedThreadId},is threadPool:{Thread.CurrentThread.IsThreadPoolThread}" +
$",is Backgound:{Thread.CurrentThread.IsBackground},result:{state}");
}
輸出結果:
begin
TestMethod: run on Thread id :4,is threadPool:False,is Backgound:True, result:2
TestMethod1: run on Thread id :5,is threadPool:False,is Backgound:False,no result
TestMethod2 :run on Thread id :7,is threadPool:False,is Backgound:False,result:data
end
or
begin
TestMethod1: run on Thread id :5,is threadPool:False,is Backgound:False,no result
TestMethod: run on Thread id :4,is threadPool:False,is Backgound:True, result:2
TestMethod2 :run on Thread id :7,is threadPool:False,is Backgound:False,result:data
end
由於我的PC是多核CPU,那么TestMethod和TestMethod1所在兩個線程是真正並行的,所以有可能輸出結果先后不確定,雖然TestMethod1所在線程設置優先級為Highest最高,但可能系統不會優先調度,其實目前不怎么推薦用Thread.Start去創建線程,缺點大概如下:
- 因為在大量需要創建線程情況下,用Thread.Start去創建線程是會浪費線程資源,因為線程用完就沒了,不具備重復利用能力
- 現在一個進程中的CLR默認會創建線程池和一些工作線程(不要浪費),且線程池的工作線程用完會回到線程池,能夠重復利用,
除非是以下原因:
-
真的需要操作線程優先級
-
需要創建一個前台線程,由於類似於控制台程序當初始前台線程執行完就會退出進程,那么創建前台線程可以保證進程退出前該前台線程正常執行成功
例如在原來的例子注釋掉thread2.Join();,我們會發現輸出完控制台初始的前台線程輸出完end沒退出進程,只有在TestMethod2(該線程凍結2秒最久)執行完才退出
static void Main(string[] args) { Console.WriteLine("begin"); Thread thread = new Thread(() => TestMethod(2)); thread.IsBackground = true;//設置為后台線程,默認前台線程 thread.Start(); Thread thread1 = new Thread(() => TestMethod1()); //設置thread1優先級為最高,系統盡可能單位時間內調度該線程,默認為Normal thread1.Priority = ThreadPriority.Highest; thread1.Start(); Thread thread2 = new Thread((state) => TestMethod2(state)); thread2.Start("data"); //thread2.Join();//等待thread2執行完成 Console.WriteLine("end"); }
輸出:
begin end TestMethod1: run on Thread id :5,is threadPool:False,is Backgound:False,no result TestMethod: run on Thread id :4,is threadPool:False,is Backgound:True, result:2 TestMethod2 :run on Thread id :7,is threadPool:False,is Backgound:False,result:data
-
需要創建一個后台線程,長時間執行的,其實一個Task的TaskScheduler在Default情況下,設置TaskCreationOptions.LongRunning內部也是創建了一個后台線程Thread,而不是在ThreadPool執行,在不需要Task的一些其他功能情況下,Thread更輕量
Thread longTask = new Thread(() => Console.WriteLine("doing long Task...")); longTask.IsBackground = true; longTask.Start(); //等價於 new Task(() => Console.WriteLine("doing long Task..."), TaskCreationOptions.LongRunning).Start(); //OR Task.Factory.StartNew(() => Console.WriteLine("doing long Task..."), TaskCreationOptions.LongRunning);
2.ThreadPool
一個.NET進程中的CLR在進程初始化時,CLR會開辟一塊內存空間給ThreadPool,默認ThreadPool默認沒有線程,在內部會維護一個任務請求隊列,當這個隊列存在任務時,線程池則會通過開辟工作線程(都是后台線程)去請求該隊列執行任務,任務執行完畢則回返回線程池,線程池盡可能會用返回的工作線程去執行(減少開辟),如果沒返回線程池,則會開辟新的線程去執行,而后執行完畢又返回線程池,大概線程池模型如下:
我們通過代碼來看:
static void Main(string[] args)
{
//獲取默認線程池允許開辟的最大工作線程樹和最大I/O異步線程數
ThreadPool.GetMaxThreads(out int maxWorkThreadCount,
out int maxIOThreadCount);
Console.WriteLine($"maxWorkThreadCount:{maxWorkThreadCount},
maxIOThreadCount:{maxIOThreadCount}");
//獲取默認線程池並發工作線程和I/O異步線程數
ThreadPool.GetMinThreads(out int minWorkThreadCount,
out int minIOThreadCount);
Console.WriteLine($"minWorkThreadCount:{minWorkThreadCount},
minIOThreadCount:{minIOThreadCount}");
for (int i = 0; i < 20; i++)
{
ThreadPool.QueueUserWorkItem(s =>
{
var workThreadId = Thread.CurrentThread.ManagedThreadId;
var isBackground = Thread.CurrentThread.IsBackground;
var isThreadPool = Thread.CurrentThread.IsThreadPoolThread;
Console.WriteLine($"work is on thread {workThreadId},
Now time:{DateTime.Now.ToString("ss.ff")}," +
$" isBackground:{isBackground}, isThreadPool:{isThreadPool}");
Thread.Sleep(5000);//模擬工作線程運行
});
}
Console.ReadLine();
}
輸出如下:
maxWorkThreadCount:32767,maxIOThreadCount:1000
minWorkThreadCount:16,minIOThreadCount:16
work is on thread 18, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 14, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 16, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 5, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 13, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 12, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 10, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 4, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 15, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 7, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 19, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 17, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 8, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 11, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 9, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 6, Now time:06.50, isBackground:True, isThreadPool:True
work is on thread 20, Now time:07.42, isBackground:True, isThreadPool:True
work is on thread 21, Now time:08.42, isBackground:True, isThreadPool:True
work is on thread 22, Now time:09.42, isBackground:True, isThreadPool:True
work is on thread 23, Now time:10.42, isBackground:True, isThreadPool:True
由於我CPU為8核16線程,默認線程池給我分配了16條工作線程和I/O線程,保證在該進程下實現真正的並行,可以看到前16條工作線程的啟動時間是一致的,到最后四條,線程池嘗試去用之前的工作線程去請求那個任務隊列執行任務,由於前16條還在運行沒返回到線程池,則每相隔一秒,創建新的工作線程去請求執行,而且該開辟的最多線程數是和線程池允許開辟的最大工作線程樹和最大I/O異步線程數有關的
我們可以通過ThreadPool.SetMaxThreads 將工作線程數設置最多只有16,在執行任務前新增幾行代碼:
var success = ThreadPool.SetMaxThreads(16, 16);//只能設置>=最小並發工作線程數和I/O線程數
Console.WriteLine($"SetMaxThreads success:{success}");
ThreadPool.GetMaxThreads(out int maxWorkThreadCountNew, out int maxIOThreadCountNew);
Console.WriteLine($"maxWorkThreadCountNew:{maxWorkThreadCountNew},
maxIOThreadCountNew:{maxIOThreadCountNew}");
輸出如下:
maxWorkThreadCount:32767,maxIOThreadCount:1000
minWorkThreadCount:16,minIOThreadCount:16
SetMaxThreads success:True
maxWorkThreadCountNew:16,maxIOThreadCountNew:16
work is on thread 6, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 12, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 7, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 8, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 16, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 10, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 15, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 13, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 11, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 4, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 9, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 19, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 17, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 5, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 14, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 18, Now time:01.71, isBackground:True, isThreadPool:True
work is on thread 8, Now time:06.72, isBackground:True, isThreadPool:True
work is on thread 5, Now time:06.72, isBackground:True, isThreadPool:True
work is on thread 19, Now time:06.72, isBackground:True, isThreadPool:True
work is on thread 10, Now time:06.72, isBackground:True, isThreadPool:True
可以很清楚知道,由於線程池最多只允許開辟16條工作線程和I/O線程,那么在線程池再開辟了16條線程之后,將不會再開辟新線程,新的任務也只能等前面的工作線程執行完回線程池后,再用返回的線程去執行新任務,導致新任務的開始執行時間會在5秒后
ThreadPool的優點如下:
- 默認線程池已經根據自身CPU情況做了配置,在需要復雜多任務並行時,智能在時間和空間上做到均衡,在CPU密集型操作有一定優勢,而不是像Thread.Start那樣,需要自己去判斷和考慮
- 同樣可以通過線程池一些方法,例如ThreadPool.SetMaxThreads手動配置線程池情況,很方便去模擬不同電腦硬件的執行情況
- 有專門的I/O線程,能夠實現非阻塞的I/O,I/O密集型操作有優勢(后續Task會提到)
但同樣,缺點也很明顯:
- ThreadPool原生不支持對工作線程取消、完成、失敗通知等交互性操作,同樣不支持獲取函數返回值,靈活度不夠,Thread原生有Abort (同樣不推薦)、Join等可選擇
- 不適合LongTask,因為這類會造成線程池多創建線程(上述代碼可知道),這時候可以單獨去用Thread去執行LongTask
3.Task
在.NET 4.0時候,引入了任務並行庫,也就是所謂的TPL(Task Parallel Library),帶來了Task
類和支持返回值的Task<TResult>
,同時在4.5完善優化了使用,Task解決了上述Thread和ThreadPool的一些問題,Task究竟是個啥,我們來看下代碼:
以下是一個WPF的應用程序,在Button的Click事件:
private void Button_Click(object sender, RoutedEventArgs e)
{
Task.Run(() =>
{
var threadId = Thread.CurrentThread.ManagedThreadId;
var isBackgound = Thread.CurrentThread.IsBackground;
var isThreadPool = Thread.CurrentThread.IsThreadPoolThread;
Thread.Sleep(3000);//模擬耗時操作
Debug.WriteLine($"task1 work on thread:{threadId},isBackgound:{isBackgound},isThreadPool:{isThreadPool}");
});
new Task(() =>
{
var threadId = Thread.CurrentThread.ManagedThreadId;
var isBackgound = Thread.CurrentThread.IsBackground;
var isThreadPool = Thread.CurrentThread.IsThreadPoolThread;
Thread.Sleep(3000);//模擬耗時操作
Debug.WriteLine($"task2 work on thread:{threadId},isBackgound:{isBackgound},isThreadPool:{isThreadPool}");
}).Start(TaskScheduler.FromCurrentSynchronizationContext());
Task.Factory.StartNew(() =>
{
var threadId = Thread.CurrentThread.ManagedThreadId;
var isBackgound = Thread.CurrentThread.IsBackground;
var isThreadPool = Thread.CurrentThread.IsThreadPoolThread;
Thread.Sleep(3000);//模擬耗時操作
Debug.WriteLine($"task3 work on thread:{threadId},isBackgound:{isBackgound},isThreadPool:{isThreadPool}");
}, TaskCreationOptions.LongRunning);
}
輸出:
main thread id :1
//由於是並行,輸出結果的前后順序可能每次都不一樣
task1 work on thread:4,isBackgound:True,isThreadPool:True
task3 work on thread:10,isBackgound:True,isThreadPool:False
task2 work on thread:1,isBackgound:False,isThreadPool:False
我用三種不同的Task開辟運行任務的方式,可以看到,Task運行在三種不同的線程:
- task1是運行在線程池上,是沒進行任何對Task的設置
- task2通過設置TaskScheduler為TaskScheduler.FromCurrentSynchronizationContext()是沒有開辟線程,利用主線程運行
- task3通過設置TaskCreationOptions為LongRunning和默認TaskScheduler情況下,實際是開辟了一個后台Thread去運行
因此,其實Task不一定代表開辟了新線程,可為在線程池上運行,又或是開辟一個后台Thread,又或者沒有開辟線程,通過主線程運行任務,這里提一句TaskScheduler.FromCurrentSynchronizationContext(),假設在控制台或者ASP.NET Core程序運行,會發生報錯,原因是主線程的SynchronizationContext為空,可通過TaskScheduler源碼得知:
public static TaskScheduler FromCurrentSynchronizationContext()
{
return new SynchronizationContextTaskScheduler();
}
internal SynchronizationContextTaskScheduler()
{
m_synchronizationContext = SynchronizationContext.Current ??
throw new InvalidOperationException
(SR.TaskScheduler_FromCurrentSynchronizationContext_NoCurrent);
}
大致對於Task在通過TaskScheduler和TaskCreationOptions設置后對於將任務分配在不同的線程情況,如下圖:
原生支持延續、取消、異常(失敗通知)
1.延續
Task其實有兩種延續任務的方式,一種通過ContinueWith方法,這是Task在.NET Framework4.0就支持的,一種則是通過GetAwaiter方法,則是在.NET Framework4.5開始支持,而且該方法也是async await異步函數所用到
控制台代碼:
static void Main(string[] args)
{
Task.Run(() =>
{
Console.WriteLine($"ContinueWith:threadId:{Thread.CurrentThread.ManagedThreadId},isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
return 25;
}).ContinueWith(t =>
{
Console.WriteLine($"ContinueWith Completed:threadId:{Thread.CurrentThread.ManagedThreadId},isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
Console.WriteLine($"ContinueWith Completed:{t.Result}");
});
//等價於
var awaiter = Task.Run(() =>
{
Console.WriteLine($"GetAwaiter:threadId:{Thread.CurrentThread.ManagedThreadId},isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
return 25;
}).GetAwaiter();
awaiter.OnCompleted(() =>
{
Console.WriteLine($"GetAwaiter Completed:threadId:{Thread.CurrentThread.ManagedThreadId},isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
Console.WriteLine($"GetAwaiter Completed:{awaiter.GetResult()}");
});
Console.ReadLine();
}
輸出結果:
ContinueWith:threadId:4,isThreadPool:True
GetAwaiter:threadId:5,isThreadPool:True
GetAwaiter Completed:threadId:5,isThreadPool:True
GetAwaiter Completed:25
ContinueWith Completed:threadId:4,isThreadPool:True
ContinueWith Completed:25
//事實上,運行的代碼線程,可能和延續的線程有可能不是同一線程,取決於線程池本身的調度
可以手動設置TaskContinuationOptions.ExecuteSynchronously(同一線程)
或者 TaskContinuationOptions.RunContinuationsAsynchronously(不同線程)
默認RunContinuationsAsynchronously優先級大於ExecuteSynchronously
但有意思的是,同樣的代碼,在WPF/WinForm等程序,運行的輸出是不一樣的:
WPF程序代碼:
private void Button_Click(object sender, RoutedEventArgs e)
{
Task.Run(() =>
{
Debug.WriteLine($"ContinueWith:threadId:{Thread.CurrentThread.ManagedThreadId},isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
}).ContinueWith(t =>
{
Debug.WriteLine($"ContinueWith Completed:threadId:{Thread.CurrentThread.ManagedThreadId},isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
}, TaskContinuationOptions.ExecuteSynchronously);
Task.Run(() =>
{
Debug.WriteLine($"GetAwaiter:threadId:{Thread.CurrentThread.ManagedThreadId},isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
}).GetAwaiter().OnCompleted(() =>
{
Debug.WriteLine($"GetAwaiter Completed:threadId:{Thread.CurrentThread.ManagedThreadId},isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
});
}
輸出:
ContinueWith:threadId:7,isThreadPool:True
GetAwaiter:threadId:9,isThreadPool:True
ContinueWith Completed:threadId:7,isThreadPool:True
GetAwaiter Completed:threadId:1,isThreadPool:False
原因就是GetAwaiter().OnCompleted()會去檢測有沒有SynchronizationContext,因此其實就是相當於以下代碼:
Task.Run(() =>
{
Debug.WriteLine($"GetAwaiter:threadId:{Thread.CurrentThread.ManagedThreadId},isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
}).ContinueWith(t =>
{
Debug.WriteLine($"GetAwaiter Completed:threadId:{Thread.CurrentThread.ManagedThreadId},isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
},TaskScheduler.FromCurrentSynchronizationContext());
如果在WPF程序中要獲得控制台那樣效果,只需要修改為ConfigureAwait(false),延續任務不在SynchronizationContext即可,如下:
Task.Run(() =>
{
Debug.WriteLine($"GetAwaiter:threadId:{Thread.CurrentThread.ManagedThreadId},isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
}).ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
{
Debug.WriteLine($"GetAwaiter Completed:threadId:{Thread.CurrentThread.ManagedThreadId},isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
});
2.取消
在.NET Framework4.0帶來Task的同時,同樣帶來了與取消任務有關的類CancellationTokenSource和CancellationToken,下面我們將大致演示下其用法
WPF程序代碼如下:
CancellationTokenSource tokenSource;
private void BeginButton_Click(object sender, RoutedEventArgs e)
{
tokenSource = new CancellationTokenSource();
LongTask(tokenSource.Token);
}
private void CancelButton_Click(object sender, RoutedEventArgs e)
{
tokenSource?.Cancel();
}
private void LongTask(CancellationToken cancellationToken)
{
Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
Dispatcher.Invoke(() =>
{
this.tbox.Text += $"now is {i} \n";
});
Thread.Sleep(1000);
if (cancellationToken.IsCancellationRequested)
{
MessageBox.Show("取消了該操作");
return;
}
}
}, cancellationToken);
}
效果如下:
其實上述代碼,也可以適用於Thread和ThreadPool,等價於如下代碼:
//當TaskCreationOptions為LongRunning和默認TaskScheduler情況下
new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
Dispatcher.Invoke(() =>
{
this.tbox.Text += $"now is {i} \n";
});
Thread.Sleep(1000);
if (cancellationToken.IsCancellationRequested)
{
MessageBox.Show("取消了該操作");
return;
}
}
}).Start();
//默認TaskScheduler情況下
ThreadPool.QueueUserWorkItem(t =>
{
for (int i = 0; i < 10; i++)
{
Dispatcher.Invoke(() =>
{
this.tbox.Text += $"now is {i} \n";
});
Thread.Sleep(1000);
if (cancellationToken.IsCancellationRequested)
{
MessageBox.Show("取消了該操作");
return;
}
}
});
因此,.NET Framework4.0后Thread和ThreadPool也同樣能夠通過CancellationTokenSource和CancellationToken類支持取消功能,只是一般這兩者都可以用Task通過設置,底層同樣調用的Thread和ThreadPool,所以一般沒怎么這么使用,而且關於Task的基本很多方法都默認支持了,例如,Task.Wait、Task.WaitAll、Task.WaitAny、Task.WhenAll、Task.WhenAny、Task.Delay等等
3.異常(失敗通知)
下面控制台代碼:
static void Main(string[] args)
{
var parent = Task.Factory.StartNew(() =>
{
int[] numbers = { 0 };
var childFactory = new TaskFactory(TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
childFactory.StartNew(() => 5 / numbers[0]); // Division by zero
childFactory.StartNew(() => numbers[1]); // Index out of range
childFactory.StartNew(() => { throw null; }); // Null reference
});
try
{
parent.Wait();
}
catch (AggregateException aex)
{
foreach (var item in aex.InnerExceptions)
{
Console.WriteLine(item.InnerException.Message.ToString());
}
}
Console.ReadLine();
}
輸出如下:
嘗試除以零。
索引超出了數組界限。
未將對象引用設置到對象的實例。
這里面parent任務有三個子任務,三個並行子任務分別都拋出不同異常,返回到parent任務中,而當你對parent任務Wait或者獲取其Result屬性時,那么將會拋出異常,而使用AggregateException則能將全部異常放在其InnerExceptions異常列表中,我們則可以分別對不同異常進行處理,這在多任務並行時候是非常好用的,而且AggregateException的功能異常強大,遠遠不止上面的功能,但是如果你只是單任務,使用AggregateException比普通則其實會有浪費性能,也可以這樣做;
try
{
var task = Task.Run(() =>
{
string str = null;
str.ToLower();
return str;
});
var result = task.Result;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message.ToString());
}
//或者通過async await
try
{
var result = await Task.Run(() =>
{
string str = null;
str.ToLower();
return str;
});
catch (Exception ex)
{
Console.WriteLine(ex.Message.ToString());
}
輸出:
未將對象引用設置到對象的實例。
二.異步函數async await
async await是C#5.0,也就是.NET Framework 4.5時期推出的C#語法,通過與.NET Framework 4.0時引入的任務並行庫,也就是所謂的TPL(Task Parallel Library)構成了新的異步編程模型,也就是TAP(Task-based asynchronous pattern),基於任務的異步模式
語法糖async await
我們先來寫下代碼,看看async await的用法:
下面是個控制台的代碼:
static async Task Main(string[] args)
{
var result = await Task.Run(() =>
{
Console.WriteLine($"current thread:{Thread.CurrentThread.ManagedThreadId}," +
$"isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
Thread.Sleep(1000);
return 25;
});
Console.WriteLine($"current thread:{Thread.CurrentThread.ManagedThreadId}," +
$"isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
Console.WriteLine(result);
Console.ReadLine();
}
輸出結果:
current thread:4,isThreadPool:True
current thread:4,isThreadPool:True
25
換成在WPF/WinForm程序執行,結果如下:
current thread:4,isThreadPool:True
current thread:1,isThreadPool:false
25
是不是感覺似曾相識?上面埋下的彩蛋在這里揭曉了,在講Task的延續的時候我們講到.NET Framework4.5的一種通過GetAwaiter延續方法,事實上,async await就是上面的一種語法糖,編譯的時候大致會編譯成那樣,所以我們一般不手動寫GetAwaiter的延續方法,而是通過async await,大大簡化了編程方式,說它是語法糖,那么有啥證據呢?
我們再寫一些代碼來驗證:
class Program
{
static void Main(string[] args)
{
ShowResult(classType: typeof(Program), methodName: nameof(AsyncTaskResultMethod));
ShowResult(classType: typeof(Program), methodName: nameof(AsyncTaskMethod));
ShowResult(classType: typeof(Program), methodName: nameof(AsyncVoidMethod));
ShowResult(classType: typeof(Program), methodName: nameof(RegularMethod));
Console.ReadKey();
}
public static async Task<int> AsyncTaskResultMethod()
{
return await Task.FromResult(5);
}
public static async Task AsyncTaskMethod()
{
await new TaskCompletionSource<int>().Task;
}
public static async void AsyncVoidMethod()
{
}
public static int RegularMethod()
{
return 5;
}
private static bool IsAsyncMethod(Type classType, string methodName)
{
MethodInfo method = classType.GetMethod(methodName);
Type attType = typeof(AsyncStateMachineAttribute);
var attrib = (AsyncStateMachineAttribute)method.GetCustomAttribute(attType);
return (attrib != null);
}
private static void ShowResult(Type classType, string methodName)
{
Console.Write((methodName + ": ").PadRight(16));
if (IsAsyncMethod(classType, methodName))
Console.WriteLine("Async method");
else
Console.WriteLine("Regular method");
}
}
輸出:
AsyncTaskResultMethod: Async method
AsyncTaskMethod: Async method
AsyncVoidMethod: Async method
RegularMethod: Regular method
在這其中,其實async在方法名的時候,只允許,返回值為void、Task
和Task<TResult>
,否則會發生編譯報錯,事實上,這和其編譯后的結果有關,我們通過ILSpy反編譯這段代碼,截圖關鍵代碼:
internal class Program
{
[CompilerGenerated]
private sealed class <AsyncTaskResultMethod>d__1 : IAsyncStateMachine
{
public int <>1__state;
public AsyncTaskMethodBuilder<int> <>t__builder;
private int <>s__1;
private TaskAwaiter<int> <>u__1;
void IAsyncStateMachine.MoveNext()
{
int num = this.<>1__state;
int result;
try
{
TaskAwaiter<int> awaiter;
if (num != 0)
{
awaiter = Task.FromResult<int>(5).GetAwaiter();
if (!awaiter.IsCompleted)
{
this.<>1__state = 0;
this.<>u__1 = awaiter;
Program.<AsyncTaskResultMethod>d__1 <AsyncTaskResultMethod>d__ = this;
this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter<int>, Program.<AsyncTaskResultMethod>d__1>(ref awaiter, ref <AsyncTaskResultMethod>d__);
return;
}
}
else
{
awaiter = this.<>u__1;
this.<>u__1 = default(TaskAwaiter<int>);
this.<>1__state = -1;
}
this.<>s__1 = awaiter.GetResult();
result = this.<>s__1;
}
catch (Exception exception)
{
this.<>1__state = -2;
this.<>t__builder.SetException(exception);
return;
}
this.<>1__state = -2;
this.<>t__builder.SetResult(result);
}
[DebuggerHidden]
void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
{
}
}
[CompilerGenerated]
private sealed class <AsyncTaskMethod>d__2 : IAsyncStateMachine
{
public int <>1__state;
public AsyncTaskMethodBuilder <>t__builder;
private TaskAwaiter<int> <>u__1;
void IAsyncStateMachine.MoveNext()
{
int num = this.<>1__state;
try
{
TaskAwaiter<int> awaiter;
if (num != 0)
{
awaiter = new TaskCompletionSource<int>().Task.GetAwaiter();
if (!awaiter.IsCompleted)
{
this.<>1__state = 0;
this.<>u__1 = awaiter;
Program.<AsyncTaskMethod>d__2 <AsyncTaskMethod>d__ = this;
this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter<int>, Program.<AsyncTaskMethod>d__2>(ref awaiter, ref <AsyncTaskMethod>d__);
return;
}
}
else
{
awaiter = this.<>u__1;
this.<>u__1 = default(TaskAwaiter<int>);
this.<>1__state = -1;
}
awaiter.GetResult();
}
catch (Exception exception)
{
this.<>1__state = -2;
this.<>t__builder.SetException(exception);
return;
}
this.<>1__state = -2;
this.<>t__builder.SetResult();
}
[DebuggerHidden]
void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
{
}
}
private sealed class <AsyncVoidMethod>d__3 : IAsyncStateMachine
{
public int <>1__state;
public AsyncVoidMethodBuilder <>t__builder;
void IAsyncStateMachine.MoveNext()
{
int num = this.<>1__state;
try
{
}
catch (Exception exception)
{
this.<>1__state = -2;
this.<>t__builder.SetException(exception);
return;
}
this.<>1__state = -2;
this.<>t__builder.SetResult();
}
[DebuggerHidden]
void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
{
}
}
[DebuggerStepThrough, AsyncStateMachine(typeof(Program.<AsyncTaskResultMethod>d__1))]
public static Task<int> AsyncTaskResultMethod()
{
Program.<AsyncTaskResultMethod>d__1 <AsyncTaskResultMethod>d__ = new Program.<AsyncTaskResultMethod>d__1();
<AsyncTaskResultMethod>d__.<>t__builder = AsyncTaskMethodBuilder<int>.Create();
<AsyncTaskResultMethod>d__.<>1__state = -1;
<AsyncTaskResultMethod>d__.<>t__builder.Start<Program.<AsyncTaskResultMethod>d__1>(ref <AsyncTaskResultMethod>d__);
return <AsyncTaskResultMethod>d__.<>t__builder.Task;
}
[DebuggerStepThrough, AsyncStateMachine(typeof(Program.<AsyncTaskMethod>d__2))]
public static Task AsyncTaskMethod()
{
Program.<AsyncTaskMethod>d__2 <AsyncTaskMethod>d__ = new Program.<AsyncTaskMethod>d__2();
<AsyncTaskMethod>d__.<>t__builder = AsyncTaskMethodBuilder.Create();
<AsyncTaskMethod>d__.<>1__state = -1;
<AsyncTaskMethod>d__.<>t__builder.Start<Program.<AsyncTaskMethod>d__2>(ref <AsyncTaskMethod>d__);
return <AsyncTaskMethod>d__.<>t__builder.Task;
}
[DebuggerStepThrough, AsyncStateMachine(typeof(Program.<AsyncVoidMethod>d__3))]
public static void AsyncVoidMethod()
{
Program.<AsyncVoidMethod>d__3 <AsyncVoidMethod>d__ = new Program.<AsyncVoidMethod>d__3();
<AsyncVoidMethod>d__.<>t__builder = AsyncVoidMethodBuilder.Create();
<AsyncVoidMethod>d__.<>1__state = -1;
<AsyncVoidMethod>d__.<>t__builder.Start<Program.<AsyncVoidMethod>d__3>(ref <AsyncVoidMethod>d__);
}
public static int RegularMethod()
{
return 5;
}
}
我們大致來捋一捋,事實上,從反編譯后的代碼可以看出來一些東西了,編譯器大致是這樣的,以AsyncTaskResultMethod方法為例子:
- 將標識async的方法,打上AsyncStateMachine 特性
- 根據AsyncStateMachine 該特性,編譯器為該方法新增一個以該方法名為名的類AsyncTaskMethodClass,並且實現接口IAsyncStateMachine,其中最主要的就是其MoveNext方法
- 該方法去除標識async,在內部實例化新增的類AsyncTaskMethodClass,用AsyncTaskMethodBuilder
的Create方法創建一個狀態機對象賦值給對象的該類型的build字段,並且將狀態state設置為-1.即初始狀態,然后通過build字段啟動狀態機
實際上,上述只是編譯器為async做的事情,我們可以看到通過AsyncVoidMethod方法編譯器生成的東西和其他方法大致一樣,那么await為編譯器做的就是MoveNext方法里面try那段,這也是AsyncVoidMethod方法和其他方法不一致的地方:
private TaskAwaiter<int> <>u__1;
try
{
TaskAwaiter<int> awaiter;
if (num != 0)
{
awaiter = new TaskCompletionSource<int>().Task.GetAwaiter();
if (!awaiter.IsCompleted)
{
this.<>1__state = 0;
this.<>u__1 = awaiter;
Program.<AsyncTaskMethod>d__2 <AsyncTaskMethod>d__ = this;
this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter<int>, Program.<AsyncTaskMethod>d__2>(ref awaiter, ref <AsyncTaskMethod>d__);
return;
}
}
else
{
awaiter = this.<>u__1;
this.<>u__1 = default(TaskAwaiter<int>);
this.<>1__state = -1;
}
awaiter.GetResult();
}
我們再看看this.<>t__builder.AwaitUnsafeOnCompleted內部:
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine
{
try
{
AsyncMethodBuilderCore.MoveNextRunner runner = null;
Action completionAction = this.m_coreState.GetCompletionAction(AsyncCausalityTracer.LoggingOn ? this.Task : null, ref runner);
if (this.m_coreState.m_stateMachine == null)
{
Task<TResult> task = this.Task;
this.m_coreState.PostBoxInitialization(stateMachine, runner, task);
}
awaiter.UnsafeOnCompleted(completionAction);
}
catch (Exception exception)
{
AsyncMethodBuilderCore.ThrowAsync(exception, null);
}
}
GetCompletionAction方法內部:
[SecuritySafeCritical]
internal Action GetCompletionAction(Task taskForTracing, ref AsyncMethodBuilderCore.MoveNextRunner runnerToInitialize)
{
Debugger.NotifyOfCrossThreadDependency();
ExecutionContext executionContext = ExecutionContext.FastCapture();
Action action;
AsyncMethodBuilderCore.MoveNextRunner moveNextRunner;
if (executionContext != null && executionContext.IsPreAllocatedDefault)
{
action = this.m_defaultContextAction;
if (action != null)
{
return action;
}
moveNextRunner = new AsyncMethodBuilderCore.MoveNextRunner(executionContext, this.m_stateMachine);
action = new Action(moveNextRunner.Run);
if (taskForTracing != null)
{
action = (this.m_defaultContextAction = this.OutputAsyncCausalityEvents(taskForTracing, action));
}
else
{
this.m_defaultContextAction = action;
}
}
else
{
moveNextRunner = new AsyncMethodBuilderCore.MoveNextRunner(executionContext, this.m_stateMachine);
action = new Action(moveNextRunner.Run);
if (taskForTracing != null)
{
action = this.OutputAsyncCausalityEvents(taskForTracing, action);
}
}
if (this.m_stateMachine == null)
{
runnerToInitialize = moveNextRunner;
}
return action;
}
void moveNextRunner.Run()
{
if (this.m_context != null)
{
try
{
ContextCallback contextCallback = AsyncMethodBuilderCore.MoveNextRunner.s_invokeMoveNext;
if (contextCallback == null)
{
contextCallback = (AsyncMethodBuilderCore.MoveNextRunner.s_invokeMoveNext = new ContextCallback(AsyncMethodBuilderCore.MoveNextRunner.InvokeMoveNext));
}
ExecutionContext.Run(this.m_context, contextCallback, this.m_stateMachine, true);
return;
}
finally
{
this.m_context.Dispose();
}
}
this.m_stateMachine.MoveNext();
}
從上面的代碼可以看出,其實this.<>t__builder.AwaitUnsafeOnCompleted內部就做了以下:
- 從GetCompletionAction方法獲取要給awaiter.UnsafeOnCompleted的action
- GetCompletionAction內部先用ExecutionContext.FastCapture()捕獲了當前線程的執行上下文,在用執行上下文執行了那個回調方法MoveNext,也就是又一次回到那個一開始那個MoveNext方法
大致執行流程圖如下:
因此,我們驗證了async await確實是語法糖,編譯器為其在背后做了太多的事情,簡化了我們編寫異步代碼的方式,我們也注意到了其中一些問題:
- 方法標識async,方法內部沒使用await實際就是同步方法,但是會編譯出async有關的東西,會浪費一些性能
- 能await Task,事實上能await Task是因為后面編譯器有用到了awaiter的一些東西,例如:
- !awaiter.IsCompleted
- awaiter.GetResult()
- awaiter.UnsafeOnCompleted
確實如猜想的,像await Task.Yield()等等,被await的對象,它必須包含以下條件:
-
有一個GetAwaiter方法,為實例方法或者擴展方法
-
GetAwaiter方法的返回值類,必須包含以下條件
-
直接或者間接實現INotifyCompletion接口,ICriticalNotifyCompletion也繼承自ICriticalNotifyCompletion接口,也就是實現了其UnsafeOnCompleted或者OnCompleted方法
-
有個布爾屬性IsCompleted,且get開放
-
有個GetResult方法,返回值為void或者TResult
因此可以自定義一些能被await的類,關於如何自定義的細節,可以參考林德熙大佬的這篇文章:C# await 高級用法
-
async await的正確用途
事實上,我們在線程池上還埋下一個彩蛋,線程池上有工作線程適合CPU密集型操作,還有I/O完成端口線程適合I/O密集型操作,而async await異步函數實際上的主場是在I/O密集型這里,我們先通過一段代碼
static void Main(string[] args)
{
ThreadPool.SetMaxThreads(8, 8);//設置線程池最大工作線程和I/O完成端口線程數量
Read();
Console.ReadLine();
}
static void Read()
{
byte[] buffer;
byte[] buffer1;
FileStream fileStream = new FileStream("E:/test1.txt", FileMode.Open, FileAccess.Read, FileShare.Read, 10000, useAsync: true);
buffer = new byte[fileStream.Length];
var state = Tuple.Create(buffer, fileStream);
FileStream fileStream1 = new FileStream("E:/test2.txt", FileMode.Open, FileAccess.Read, FileShare.Read, 10000, useAsync: true);
buffer1 = new byte[fileStream1.Length];
var state1 = Tuple.Create(buffer1, fileStream1);
fileStream.BeginRead(buffer, 0, (int)fileStream.Length, EndReadCallback, state);
fileStream1.BeginRead(buffer, 0, (int)fileStream1.Length, EndReadCallback, state1);
}
static void EndReadCallback(IAsyncResult asyncResult)
{
Console.WriteLine("Starting EndWriteCallback.");
Console.WriteLine($"current thread:{Thread.CurrentThread.ManagedThreadId},isThreadPool:{Thread.CurrentThread.IsThreadPoolThread}");
try
{
var state = (Tuple<byte[], FileStream>)asyncResult.AsyncState;
ThreadPool.GetAvailableThreads(out int workerThreads, out int portThreads);
Console.WriteLine($"AvailableworkerThreads:{workerThreads},AvailableIOThreads:{portThreads}");
state.Item2.EndRead(asyncResult);
}
finally
{
Console.WriteLine("Ending EndWriteCallback.");
}
}
輸出結果:
Starting EndWriteCallback.
current thread:3,isThreadPool:True
AvailableworkerThreads:8,AvailableIOThreads:7
Ending EndWriteCallback.
Starting EndWriteCallback.
current thread:3,isThreadPool:True
AvailableworkerThreads:8,AvailableIOThreads:7
Ending EndWriteCallback.
我們看到,事實上,兩個回調方法都調用了相同的線程,且是線程池的I/O完成端口線程,假如將兩個實例化FileStream時的參數改下,改為useAsync: false,輸出結果如下:
Starting EndWriteCallback.
current thread:4,isThreadPool:True
AvailableworkerThreads:6,AvailableIOThreads:8
Ending EndWriteCallback.
Starting EndWriteCallback.
current thread:5,isThreadPool:True
AvailableworkerThreads:7,AvailableIOThreads:8
Ending EndWriteCallback.
我們會發現這次用到的是線程池的兩條工作線程了,其實這就是同步I/O和異步I/O的區別,我們可以大概看下最底層BeginRead代碼:
private unsafe int ReadFileNative(SafeFileHandle handle, byte[] bytes, int offset, int count, NativeOverlapped* overlapped, out int hr)
{
if (bytes.Length - offset < count)
{
throw new IndexOutOfRangeException(Environment.GetResourceString("IndexOutOfRange_IORaceCondition"));
}
if (bytes.Length == 0)
{
hr = 0;
return 0;
}
int num = 0;
int numBytesRead = 0;
fixed (byte* ptr = bytes)
{
num = ((!_isAsync) ? Win32Native.ReadFile(handle, ptr + offset, count, out numBytesRead, IntPtr.Zero) : Win32Native.ReadFile(handle, ptr + offset, count, IntPtr.Zero, overlapped));
}
if (num == 0)
{
hr = Marshal.GetLastWin32Error();
if (hr == 109 || hr == 233)
{
return -1;
}
if (hr == 6)
{
_handle.Dispose();
}
return -1;
}
hr = 0;
return numBytesRead;
}
實際上底層是Pinvoke去調用win32api ,Win32Native.ReadFile,關於該win32函數細節可參考MSDN:ReadFile,是否異步的關鍵就是判斷是否傳入overlapped對象,而該對象會關聯到一個window內核對象,IOCP(I/O Completion Port),也就是I/O完成端口,事實上進程創建的時候,創建線程池的同時就會創建這么一個I/O完成端口內核對象,大致流程如下:
- 我們兩個I/O請求,事實上對應着我們傳入的兩個IRP(I/O request packet)數據結構,其中包括文件句柄和文件中偏移量,會在Pinvoke去調用win32api進入win32用戶模式
- 然后通過win32api函數進入window內核模式,我們兩個請求之后會放在一個IRP隊列
- 之后系統就會從該IRP隊列,根據文件句柄和偏移量等信息去對應請求處理不同的I/O設備,完成后會放入到一個完成IRP隊列中
- 然后線程池的I/O完成端口線程通過線程池的I/O完成端口對象去拿取那些已經完成IRP隊列
那么在多請求的時候,IOCP模型異步的這種情況,少量的I/O完成端口線程就能做到這一切,而同步則要因為一條線程要等待該請求處理的完成,那么會大大浪費線程,正如上面一樣,兩個請求卻要兩個工作線程完成通知,而在async await時期,上面的一些方法已經被封裝以Task
和Task<TResult>
對象來代表完成讀取了,那么上面可以簡化為:
static async Task Main(string[] args)
{
ThreadPool.SetMaxThreads(8, 8);//設置線程池最大工作線程和I/O完成端口線程數量
await ReadAsync();
Console.ReadLine();
}
static async Task<int> ReadAsync()
{
FileStream fileStream = new FileStream("E:/test1.txt", FileMode.Open, FileAccess.Read, FileShare.Read, 10000, useAsync: true);
var buffer = new byte[fileStream.Length];
var result = await fileStream.ReadAsync(buffer, 0, (int)fileStream.Length);
return result;
}
底層沒變,只是回調的時候I/O完成端口線程再通過工作線程進行回調(這能避免之前回調的時候阻塞I/O完成端口線程的操作),但是大大的簡化了異步I/O編程,而async await並非不適合CPU密集型,只是I/O操作一般比較耗時,如果用線程池的工作線程,就會有可能創建更多線程來應付更多的請求,CPU密集型的任務並行庫 (TPL)有很多合適的api
總結
我們了解了Task是.NET 編寫多線程的一個非常方便的高層抽象類,你可以不用擔心底層線程處理,通過對Task不同的配置,能寫出較高性能的多線程並發程序,然后探尋了.NET 4.5引入了的async await異步函數內部做了些啥,知道async await通過和TPL的配合,簡化了編寫異步編程的方式,特別適合I/O密集型的異步操作,本文只是起到對於Task和async await有個快速的理解作用,而關於微軟圍繞Task做的事情遠遠不止如此,例如通過ValueTask優化Task,還有更利於CPU密集型操作的TPL中的Parallel和PLINQ api等等,可以參考其他書籍或者msdn更深入了解
參考
Asynchronous programming patterns
Async in depth
ThreadPool 類
Understanding C# async / await
《CLR Via C# 第四版》
《Window核心編程第五版》