線程池 異步I/O線程 <第三篇>


  在學習異步之前先來說說異步的好處,例如對於不需要CPU參數的輸入輸出操作,可以將實際的處理步驟分為以下三步:

  1. 啟動處理;
  2. 實際的處理,此時不需要CPU參數;
  3. 任務完成后的處理;

  以上步驟如果僅僅使用一個線程,當線程正在處理UI操作時就會出現“卡”的現象。

  如果使用異步的處理方式,則這三步處理過程涉及到兩個線程,主線程中啟動第一步;第一步啟動后,主線程結束(如果不結束,只會讓該線程處於無作為的等待狀態);第二步不需要CPU參與;第二步完成之后,在第二個線程上啟動第三步;完成之后第二個線程結束。這樣的處理過程中沒有一個線程需要處於等待狀態,使得運行的線程得到充分利用。

一、CLR線程池的I/O線程

    上一篇學習的都是CLR線程池的輔助線程,這次要學習的是CLR線程池的I/O線程。

    I/O線程是.NET專為訪問外部資源所設置的一種線程,因為訪問外部資源常常要受到外界因素的影響,為了防止讓主線程受影響而長期處於阻塞狀態,.NET為多個I/O操作都建立起了異步方法。例如:FileStream、TCP/IP、WebRequest、WebService等等,而且每個異步方法的使用方式都非常類似,

  都是以Beginxxx開始,以Endxxx結束(APM)。對於APM來說,必須使用Endxxx結束異步,否則可能會造成資源泄露。Beginxxx實際是將線程排入線程池。

  另外還有一種基於事件的異步編程模式(EPM),支持基於事件的異步模式的類將有一個或多個后綴為Async的方法,同時還會有一個相應名為Completed后綴的事件,Async方法用於啟動異步處理,而Completed事件將在異步處理完成之后通過事件來宣告異步處理的完成。注意,在使用EPM模式的時候,不管是完成了異步請求,還是在處理中出現異常,或者是終止異步處理,都必須要調用Compeleted處理程序。如:

  OpenReadAsync
  OpenReadCompleted

二、異步讀寫FileStream

    需要在FileStream中異步調用I/O線程,必須使用以下構造函數建立FileStream對象,並把useAsync設置為true。

FileStream stream = new FileStream(string path,FileMode mode,FileAccess access,FileShare share,int bufferSize,bool useAsync);

  參數說明:

  1. path是文件的相對路徑或絕對路徑;
  2. mode確定如何打開或創建文件;
  3. access確認訪問文件的方式;
  4. share確定文件如何進程共享;
  5. bufferSize是代表緩沖區大小,一般默認最小值為8,在啟動異步讀取或寫入時,文件大小一般大於緩沖大小;
  6. userAsync代表是否啟動異步I/O線程。

  注意:當使用BeginRead和BeginWrite方法在執行大量讀或寫時效果更好,但對於少量讀/寫,這些方法速度可能比同步還要慢,因為進行線程間的切換需要大量時間。

  1、異步寫入

  FileStream中包含BeginWrite、EndWrite方法可以啟動I/O線程進行異步寫入。

  public override IAsyncResult BeginWrite(byte[] array,int offset,int numBytes,AsyncCallback,Object stateObject)
  public override void EndWrite(IAsyncResult asyncResult)

  BeginWrite返回值為IAsyncResult,使用方式與委托的BeginInvoke方法相似,最好就是使用回調函數,避免線程阻塞。

  最后兩個參數還是同樣的套路:

  • AsyncCallback用於綁定回調函數;
  • Object用於傳遞外部數據。

  要注意一點:AsyncCallback所綁定的回調函數必須是帶單個IAsyncResult參數的無返回值方法。

  在例子中,把FileStream作為外部數據傳遞到回調函數當中,然后再回調函數中利用IAsyncResult.AsyncState獲取FileStream對象,最后通過FileStream.EndWrite(IAsyncResult)結束寫入。

  下面是一個異步寫入的例子:

    class Program
    {
        static void Main(string[] args)
        {
            int a, b;
            ThreadPool.GetMaxThreads(out a, out b);
            Console.WriteLine("原有輔助線程數" + a + "   " + "原有I/O線程數" + b);

            //文件名 文件創建方式 文件權限 文件進程共享 緩沖區大小為1024 是否啟動異步I/O線程為true
            FileStream stream = new FileStream(@"D:\123.txt", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true);
            //這里要注意,如果寫入的字符串很小,則.Net會使用輔助線程寫,因為這樣比較快
            byte[] bytes = Encoding.UTF8.GetBytes("你在他鄉還好嗎?");
            //異步寫入開始,倒數第二個參數指定回調函數,最后一個參數將自身傳到回調函數里,用於結束異步線程
            stream.BeginWrite(bytes, 0, (int)bytes.Length, new AsyncCallback(Callback), stream);

            ThreadPool.GetAvailableThreads(out a, out b);
            Console.WriteLine("現有輔助線程數" + a + "   " + "現有有I/O線程數" + b);

            Console.WriteLine("主線程繼續干其他活!");
            Console.ReadKey();
        }

        static void Callback(IAsyncResult result)
        {
            //顯示線程池現狀
            Thread.Sleep(2000);
            //通過result.AsyncState再強制轉換為FileStream就能夠獲取FileStream對象,用於結束異步寫入
            FileStream stream = (FileStream)result.AsyncState;
            stream.EndWrite(result);
            stream.Flush();
            stream.Close();
        }
    }

  輸出結果如下:

  

  對於結束異步線程的方法,還是玩IAsyncResult的這一套,在啟動異步寫時將自身對象傳遞到回調函數中,在回調函數中獲得自身去結束異步線程。

  這就是C#中的異步操作,從剩余線程數我們看到,異步實際上是調用線程池的線程來實現異步的。

  2、異步讀取

  FileStream中可以通過使用BeginRead和EndRead調用異步I/O線程讀取:

  public override IAsyncResult BeginRead(byte[] array,int offset,int numBytes,AsyncCallback userCallback,Object stateObject)
  public override int EndRead(IAsyncResult asyncResult)

  BeginRead與EndRead方法與寫相似,AsyncCallback用於綁定回調函數;Object用於傳遞外部數據。在回調函數只需要使用IAsyncResult.AsyncState就可以獲取外部數據。EndRead方法會返回從流中讀取到的字節數量。

  首先定義FileData類,里面包含FileStream對象,byte[]數組和長度。然后把FileData對象作為外部數據傳到回調函數,在回調函數中,把IAsyncResult.AsyncState強制轉換為FileData。然后通過FileStream.EndRead(IAsyncResult)結束讀取。

  最后比較一下長度,如果讀取到的長度與輸入的數據長度不一致,則拋出異常。

    class Program
    {
        static void Main(string[] args)
        {
            int a, b;
            ThreadPool.GetAvailableThreads(out a, out b);
            Console.WriteLine("原有輔助線程:" + a + "原有I/O線程:" + b);

            byte[] byteData = new byte[1024];
            FileStream stream = new FileStream(@"D:\123.txt", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true);
            //把FileStream對象,byte[]對象,長度等有關數據綁定到FileDate對象中,以附帶屬性方式送到回調函數
            Hashtable ht = new Hashtable();
            ht.Add("Length", (int)stream.Length);
            ht.Add("Stream", stream);
            ht.Add("ByteData", byteData);

            //啟動異步讀取,倒數第二個參數是指定回調函數,倒數第一個參數是傳入回調函數中的參數
            stream.BeginRead(byteData, 0, (int)ht["Length"], new AsyncCallback(Completed), ht);

            ThreadPool.GetAvailableThreads(out a, out b);
            Console.WriteLine("現有輔助線程:" + a + "現有I/O線程:" + b);

            Console.ReadKey();
        }

        //實際參數就是回調函數
        static void Completed(IAsyncResult result)
        {
            Thread.Sleep(2000);
            //參數result實際上就是Hashtable對象,以FileStream.EndRead完成異步讀取
            Hashtable ht = (Hashtable)result.AsyncState;
            FileStream stream = (FileStream)ht["Stream"];
            int length = stream.EndRead(result);
            stream.Close();
            string str = Encoding.UTF8.GetString(ht["ByteData"] as byte[]);
            Console.WriteLine(str);
            stream.Close();
        }
    }

  輸出如下:

  

  注意,如果文件過小,小於緩沖區1024,那么可能會調用工作者線程而非I/O線程操作。但是根據我的觀察,只是讀取文件時文件過小可能會調用輔助線程操作,但是寫入時不會。

  像上面就是直接用輔助線程處理了。

  IAsyncResult的作用主要有兩點:

  • AsyncState屬性,用來傳遞參數到回調函數;
  • Endxxx方法,結束異步操作方法需要此對象作為參數;

三、異步WebRequest

  System.Net.WebRequest是.NET為實現Internet的"請求/響應模型"而開發的一個abstract基類。它主要有三個子類:

  • FtpWebRequest,FileWebRequest使用"file://路徑"的URI方式實現對本地資源和內部文件的請求/響應;
  • HttpWebRequest,FtpWebRequest使用FTP文件傳輸協議實現文件請求/響應;
  • FileWebRequest,HttpWebRequest用於處理HTTP的頁面請求/響應;

  當使用WebRequest.Create(string uri)創建對象時,應用程序就可以根據請求協議判斷實現類來進行操作FileWebRequest、FtpWebRequest、HttpWebRequest各有其作用。由於使用方法類似,下面就用常用的HttpWebRequest為例子介紹一下異步WebRequest的使用方法。

  HttpWebRequest包含由一下幾個常用方法處理請求/響應:

  public override Stream GetRequest()
  public override WebResponse GetResponse()
  public override IAsyncResult BeginGetRequestStream(AsyncCallback callback,Object state)
  public override Stream EndGetRequestStream(IAsyncResult asyncResult)
  public override IAsyncResult BeginGetResponse(AsyncCallback callback,Object state)
  public override WebResponse EndGetResponse(IAsyncResult asyncResult)
  • BeginGetRequestStream、EndGetRequestStream用於異步向HttpWebRequest對象寫入請求信息;
  • BeginGetResponse、EndGetResponse用於異步發送頁面請求並獲取返回信;

  使用異步方式操作Internet的"請求/響應",避免主線程長期處於等待狀態,而操作期間異步線程是來自CLR線程池的I/O線程。

  注意:請求與響應不能使用同步與異步混合開發模式,即當請求寫入使用GetRequestStream同步模式,即使響應使用BeginGetResponse異步方法,操作也與GetRequestStream方法在於同一線程內。

    class Program
    {
        static void Main(string[] args)
        {
            int a, b;
            ThreadPool.GetAvailableThreads(out a, out b);
            Console.WriteLine("原有輔助線程:" + a + "原有I/O線程:" + b);

            //使用WebRequest.Create方法建立HttpWebRequest對象
            HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create("http://www.baidu.com");
            webRequest.Method = "post";

            //對寫入數據的RequestStream對象進行異步請求
            IAsyncResult result = webRequest.BeginGetResponse(new AsyncCallback(EndGetResponse), webRequest);
            Thread.Sleep(1000);
            ThreadPool.GetAvailableThreads(out a, out b);
            Console.WriteLine("現有輔助線程:" + a + "現有I/O線程:" + b);

            Console.WriteLine("主線程繼續干其他事!");
            Console.ReadKey();
        }

        static void EndGetResponse(IAsyncResult result)
        {
            Thread.Sleep(2000);
            //結束異步請求,獲取結果
            HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState;
            WebResponse webResponse = webRequest.EndGetResponse(result);

            Stream stream = webResponse.GetResponseStream();
            StreamReader sr = new StreamReader(stream);
            string html = sr.ReadToEnd();
            Console.WriteLine(html.Substring(0,50));
        }
    }

  顯示結果如下:

  

四、異步SqlCommand

  使用異步SqlCommand的時候,請注意把ConnectionString 的 Asynchronous Processing 設置為 true 。

    class Program
    {
        static void Main(string[] args)
        {
            int a, b;
            ThreadPool.GetMaxThreads(out a, out b);
            Console.WriteLine("原有輔助線程數" + a + "   " + "原有I/O線程數" + b);

            string str = "server=.;database=Test;uid=sa;pwd=123;Asynchronous Processing=true";
            SqlConnection conn = new SqlConnection(str);            
            SqlCommand cmd = conn.CreateCommand();                  
            cmd.CommandText = "INSERT INTO Person VALUES(15,'郭嘉',22)";    
            conn.Open();
            cmd.BeginExecuteNonQuery(new AsyncCallback(EndCallback), cmd);
            Thread.Sleep(1000);
            ThreadPool.GetAvailableThreads(out a, out b);
            Console.WriteLine("現有輔助線程數" + a + "   " + "現有I/O線程數" + b);

            Console.WriteLine("主線程繼續執行!");

            Console.ReadKey();
        }

        public static void EndCallback(IAsyncResult result)
        {
            Thread.Sleep(2000);
            SqlCommand cmd = result.AsyncState as SqlCommand;   //獲得異步傳入的參數
            Console.WriteLine("成功執行命令:" + cmd.CommandText);
            Console.WriteLine("本次執行影響行數為:" + cmd.EndExecuteNonQuery(result));
            cmd.Connection.Close(); 
        }
    }

  輸出如下:

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM