在學習異步之前先來說說異步的好處,例如對於不需要CPU參數的輸入輸出操作,可以將實際的處理步驟分為以下三步:
- 啟動處理;
- 實際的處理,此時不需要CPU參數;
- 任務完成后的處理;
以上步驟如果僅僅使用一個線程,當線程正在處理UI操作時就會出現“卡”的現象。
如果使用異步的處理方式,則這三步處理過程涉及到兩個線程,主線程中啟動第一步;第一步啟動后,主線程結束(如果不結束,只會讓該線程處於無作為的等待狀態);第二步不需要CPU參與;第二步完成之后,在第二個線程上啟動第三步;完成之后第二個線程結束。這樣的處理過程中沒有一個線程需要處於等待狀態,使得運行的線程得到充分利用。
一、CLR線程池的I/O線程
上一篇學習的都是CLR線程池的輔助線程,這次要學習的是CLR線程池的I/O線程。
I/O線程是.NET專為訪問外部資源所設置的一種線程,因為訪問外部資源常常要受到外界因素的影響,為了防止讓主線程受影響而長期處於阻塞狀態,.NET為多個I/O操作都建立起了異步方法。例如:FileStream、TCP/IP、WebRequest、WebService等等,而且每個異步方法的使用方式都非常類似,
都是以Beginxxx開始,以Endxxx結束(APM)。對於APM來說,必須使用Endxxx結束異步,否則可能會造成資源泄露。Beginxxx實際是將線程排入線程池。
另外還有一種基於事件的異步編程模式(EPM),支持基於事件的異步模式的類將有一個或多個后綴為Async的方法,同時還會有一個相應名為Completed后綴的事件,Async方法用於啟動異步處理,而Completed事件將在異步處理完成之后通過事件來宣告異步處理的完成。注意,在使用EPM模式的時候,不管是完成了異步請求,還是在處理中出現異常,或者是終止異步處理,都必須要調用Compeleted處理程序。如:
OpenReadAsync
OpenReadCompleted
二、異步讀寫FileStream
需要在FileStream中異步調用I/O線程,必須使用以下構造函數建立FileStream對象,並把useAsync設置為true。
FileStream stream = new FileStream(string path,FileMode mode,FileAccess access,FileShare share,int bufferSize,bool useAsync);
參數說明:
- path是文件的相對路徑或絕對路徑;
- mode確定如何打開或創建文件;
- access確認訪問文件的方式;
- share確定文件如何進程共享;
- bufferSize是代表緩沖區大小,一般默認最小值為8,在啟動異步讀取或寫入時,文件大小一般大於緩沖大小;
- userAsync代表是否啟動異步I/O線程。
注意:當使用BeginRead和BeginWrite方法在執行大量讀或寫時效果更好,但對於少量讀/寫,這些方法速度可能比同步還要慢,因為進行線程間的切換需要大量時間。
1、異步寫入
FileStream中包含BeginWrite、EndWrite方法可以啟動I/O線程進行異步寫入。
public override IAsyncResult BeginWrite(byte[] array,int offset,int numBytes,AsyncCallback,Object stateObject) public override void EndWrite(IAsyncResult asyncResult)
BeginWrite返回值為IAsyncResult,使用方式與委托的BeginInvoke方法相似,最好就是使用回調函數,避免線程阻塞。
最后兩個參數還是同樣的套路:
- AsyncCallback用於綁定回調函數;
- Object用於傳遞外部數據。
要注意一點:AsyncCallback所綁定的回調函數必須是帶單個IAsyncResult參數的無返回值方法。
在例子中,把FileStream作為外部數據傳遞到回調函數當中,然后再回調函數中利用IAsyncResult.AsyncState獲取FileStream對象,最后通過FileStream.EndWrite(IAsyncResult)結束寫入。
下面是一個異步寫入的例子:
class Program { static void Main(string[] args) { int a, b; ThreadPool.GetMaxThreads(out a, out b); Console.WriteLine("原有輔助線程數" + a + " " + "原有I/O線程數" + b); //文件名 文件創建方式 文件權限 文件進程共享 緩沖區大小為1024 是否啟動異步I/O線程為true FileStream stream = new FileStream(@"D:\123.txt", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true); //這里要注意,如果寫入的字符串很小,則.Net會使用輔助線程寫,因為這樣比較快 byte[] bytes = Encoding.UTF8.GetBytes("你在他鄉還好嗎?"); //異步寫入開始,倒數第二個參數指定回調函數,最后一個參數將自身傳到回調函數里,用於結束異步線程 stream.BeginWrite(bytes, 0, (int)bytes.Length, new AsyncCallback(Callback), stream); ThreadPool.GetAvailableThreads(out a, out b); Console.WriteLine("現有輔助線程數" + a + " " + "現有有I/O線程數" + b); Console.WriteLine("主線程繼續干其他活!"); Console.ReadKey(); } static void Callback(IAsyncResult result) { //顯示線程池現狀 Thread.Sleep(2000); //通過result.AsyncState再強制轉換為FileStream就能夠獲取FileStream對象,用於結束異步寫入 FileStream stream = (FileStream)result.AsyncState; stream.EndWrite(result); stream.Flush(); stream.Close(); } }
輸出結果如下:
對於結束異步線程的方法,還是玩IAsyncResult的這一套,在啟動異步寫時將自身對象傳遞到回調函數中,在回調函數中獲得自身去結束異步線程。
這就是C#中的異步操作,從剩余線程數我們看到,異步實際上是調用線程池的線程來實現異步的。
2、異步讀取
FileStream中可以通過使用BeginRead和EndRead調用異步I/O線程讀取:
public override IAsyncResult BeginRead(byte[] array,int offset,int numBytes,AsyncCallback userCallback,Object stateObject) public override int EndRead(IAsyncResult asyncResult)
BeginRead與EndRead方法與寫相似,AsyncCallback用於綁定回調函數;Object用於傳遞外部數據。在回調函數只需要使用IAsyncResult.AsyncState就可以獲取外部數據。EndRead方法會返回從流中讀取到的字節數量。
首先定義FileData類,里面包含FileStream對象,byte[]數組和長度。然后把FileData對象作為外部數據傳到回調函數,在回調函數中,把IAsyncResult.AsyncState強制轉換為FileData。然后通過FileStream.EndRead(IAsyncResult)結束讀取。
最后比較一下長度,如果讀取到的長度與輸入的數據長度不一致,則拋出異常。
class Program { static void Main(string[] args) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); Console.WriteLine("原有輔助線程:" + a + "原有I/O線程:" + b); byte[] byteData = new byte[1024]; FileStream stream = new FileStream(@"D:\123.txt", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true); //把FileStream對象,byte[]對象,長度等有關數據綁定到FileDate對象中,以附帶屬性方式送到回調函數 Hashtable ht = new Hashtable(); ht.Add("Length", (int)stream.Length); ht.Add("Stream", stream); ht.Add("ByteData", byteData); //啟動異步讀取,倒數第二個參數是指定回調函數,倒數第一個參數是傳入回調函數中的參數 stream.BeginRead(byteData, 0, (int)ht["Length"], new AsyncCallback(Completed), ht); ThreadPool.GetAvailableThreads(out a, out b); Console.WriteLine("現有輔助線程:" + a + "現有I/O線程:" + b); Console.ReadKey(); } //實際參數就是回調函數 static void Completed(IAsyncResult result) { Thread.Sleep(2000); //參數result實際上就是Hashtable對象,以FileStream.EndRead完成異步讀取 Hashtable ht = (Hashtable)result.AsyncState; FileStream stream = (FileStream)ht["Stream"]; int length = stream.EndRead(result); stream.Close(); string str = Encoding.UTF8.GetString(ht["ByteData"] as byte[]); Console.WriteLine(str); stream.Close(); } }
輸出如下:
注意,如果文件過小,小於緩沖區1024,那么可能會調用工作者線程而非I/O線程操作。但是根據我的觀察,只是讀取文件時文件過小可能會調用輔助線程操作,但是寫入時不會。
像上面就是直接用輔助線程處理了。
IAsyncResult的作用主要有兩點:
- AsyncState屬性,用來傳遞參數到回調函數;
- Endxxx方法,結束異步操作方法需要此對象作為參數;
三、異步WebRequest
System.Net.WebRequest是.NET為實現Internet的"請求/響應模型"而開發的一個abstract基類。它主要有三個子類:
- FtpWebRequest,FileWebRequest使用"file://路徑"的URI方式實現對本地資源和內部文件的請求/響應;
- HttpWebRequest,FtpWebRequest使用FTP文件傳輸協議實現文件請求/響應;
- FileWebRequest,HttpWebRequest用於處理HTTP的頁面請求/響應;
當使用WebRequest.Create(string uri)創建對象時,應用程序就可以根據請求協議判斷實現類來進行操作FileWebRequest、FtpWebRequest、HttpWebRequest各有其作用。由於使用方法類似,下面就用常用的HttpWebRequest為例子介紹一下異步WebRequest的使用方法。
HttpWebRequest包含由一下幾個常用方法處理請求/響應:
public override Stream GetRequest() public override WebResponse GetResponse() public override IAsyncResult BeginGetRequestStream(AsyncCallback callback,Object state) public override Stream EndGetRequestStream(IAsyncResult asyncResult) public override IAsyncResult BeginGetResponse(AsyncCallback callback,Object state) public override WebResponse EndGetResponse(IAsyncResult asyncResult)
- BeginGetRequestStream、EndGetRequestStream用於異步向HttpWebRequest對象寫入請求信息;
- BeginGetResponse、EndGetResponse用於異步發送頁面請求並獲取返回信;
使用異步方式操作Internet的"請求/響應",避免主線程長期處於等待狀態,而操作期間異步線程是來自CLR線程池的I/O線程。
注意:請求與響應不能使用同步與異步混合開發模式,即當請求寫入使用GetRequestStream同步模式,即使響應使用BeginGetResponse異步方法,操作也與GetRequestStream方法在於同一線程內。
class Program { static void Main(string[] args) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); Console.WriteLine("原有輔助線程:" + a + "原有I/O線程:" + b); //使用WebRequest.Create方法建立HttpWebRequest對象 HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create("http://www.baidu.com"); webRequest.Method = "post"; //對寫入數據的RequestStream對象進行異步請求 IAsyncResult result = webRequest.BeginGetResponse(new AsyncCallback(EndGetResponse), webRequest); Thread.Sleep(1000); ThreadPool.GetAvailableThreads(out a, out b); Console.WriteLine("現有輔助線程:" + a + "現有I/O線程:" + b); Console.WriteLine("主線程繼續干其他事!"); Console.ReadKey(); } static void EndGetResponse(IAsyncResult result) { Thread.Sleep(2000); //結束異步請求,獲取結果 HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState; WebResponse webResponse = webRequest.EndGetResponse(result); Stream stream = webResponse.GetResponseStream(); StreamReader sr = new StreamReader(stream); string html = sr.ReadToEnd(); Console.WriteLine(html.Substring(0,50)); } }
顯示結果如下:
四、異步SqlCommand
使用異步SqlCommand的時候,請注意把ConnectionString 的 Asynchronous Processing 設置為 true 。
class Program { static void Main(string[] args) { int a, b; ThreadPool.GetMaxThreads(out a, out b); Console.WriteLine("原有輔助線程數" + a + " " + "原有I/O線程數" + b); string str = "server=.;database=Test;uid=sa;pwd=123;Asynchronous Processing=true"; SqlConnection conn = new SqlConnection(str); SqlCommand cmd = conn.CreateCommand(); cmd.CommandText = "INSERT INTO Person VALUES(15,'郭嘉',22)"; conn.Open(); cmd.BeginExecuteNonQuery(new AsyncCallback(EndCallback), cmd); Thread.Sleep(1000); ThreadPool.GetAvailableThreads(out a, out b); Console.WriteLine("現有輔助線程數" + a + " " + "現有I/O線程數" + b); Console.WriteLine("主線程繼續執行!"); Console.ReadKey(); } public static void EndCallback(IAsyncResult result) { Thread.Sleep(2000); SqlCommand cmd = result.AsyncState as SqlCommand; //獲得異步傳入的參數 Console.WriteLine("成功執行命令:" + cmd.CommandText); Console.WriteLine("本次執行影響行數為:" + cmd.EndExecuteNonQuery(result)); cmd.Connection.Close(); } }
輸出如下: