本文主要從線程的基礎用法,CLR線程池當中工作者線程與I/O線程的開發,並行操作PLINQ等多個方面介紹多線程的開發。
其中委托的BeginInvoke方法以及回調函數最為常用。
而 I/O線程可能容易遭到大家的忽略,其實在開發多線程系統,更應該多留意I/O線程的操作。特別是在ASP.NET開發當中,可能更多人只會留意在客戶端使用Ajax或者在服務器端使用UpdatePanel。其實合理使用I/O線程在通訊項目或文件下載時,能盡量降低IIS的壓力。
並行編程是Framework4.0中極力推廣的異步操作方式,更值得更深入地學習。
希望本篇文章能對各位的學習研究有所幫助,當中有所錯漏的地方敬請點評。
目錄
五、CLR線程池的I/O線程
在前一節所介紹的線程都屬於CLR線程池的工作者線程,這一節開始為大家介紹一下CLR線程池的I/O線程
I/O 線程是.NET專為訪問外部資源所設置的一種線程,因為訪問外部資源常常要受到外界因素的影響,為了防止讓主線程受影響而長期處於阻塞狀態,.NET為多個I/O操作都建立起了異步方法,例如:FileStream、TCP/IP、WebRequest、WebService等等,而且每個異步方法的使用方式都非常類似,都是以BeginXXX為開始,以EndXXX結束,下面為大家一一解說。
5.1 異步讀寫 FileStream
需要在 FileStream 異步調用 I/O線程,必須使用以下構造函數建立 FileStream 對象,並把useAsync設置為 true。
FileStream stream = new FileStream ( string path, FileMode mode, FileAccess access, FileShare share, int bufferSize,bool useAsync ) ;
其中 path 是文件的相對路徑或絕對路徑; mode 確定如何打開或創建文件; access 確定訪問文件的方式; share 確定文件如何進程共享; bufferSize 是代表緩沖區大小,一般默認最小值為8,在啟動異步讀取或寫入時,文件大小一般大於緩沖大小; userAsync代表是否啟動異步I/O線程。
5.1.1 異步寫入
FileStream中包含BeginWrite、EndWrite 方法可以啟動I/O線程進行異步寫入。
public override IAsyncResult BeginWrite ( byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject )
public override void EndWrite (IAsyncResult asyncResult )
BeginWrite 返回值為IAsyncResult, 使用方式與委托的BeginInvoke方法相似,最好就是使用回調函數,避免線程阻塞。在最后兩個參數中,參數AsyncCallback用於綁定回調函數; 參數Object用於傳遞外部數據。要注意一點:AsyncCallback所綁定的回調函數必須是帶單個 IAsyncResult 參數的無返回值方法。
在例子中,把FileStream作為外部數據傳遞到回調函數當中,然后在回調函數中利用IAsyncResult.AsyncState獲取FileStream對象,最后通過FileStream.EndWrite(IAsyncResult)結束寫入。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //把線程池的最大值設置為1000
6 ThreadPool.SetMaxThreads(1000, 1000);
7 ThreadPoolMessage("Start");
8
9 //新立文件File.sour
10 FileStream stream = new FileStream("File.sour", FileMode.OpenOrCreate,
11 FileAccess.ReadWrite,FileShare.ReadWrite,1024,true);
12 byte[] bytes = new byte[16384];
13 string message = "An operating-system ThreadId has no fixed relationship........";
14 bytes = Encoding.Unicode.GetBytes(message);
15
16 //啟動異步寫入
17 stream.BeginWrite(bytes, 0, (int)bytes.Length,new AsyncCallback(Callback),stream);
18 stream.Flush();
19
20 Console.ReadKey();
21 }
22
23 static void Callback(IAsyncResult result)
24 {
25 //顯示線程池現狀
26 Thread.Sleep(200);
27 ThreadPoolMessage("AsyncCallback");
28 //結束異步寫入
29 FileStream stream = (FileStream)result.AsyncState;
30 stream.EndWrite(result);
31 stream.Close();
32 }
33
34 //顯示線程池現狀
35 static void ThreadPoolMessage(string data)
36 {
37 int a, b;
38 ThreadPool.GetAvailableThreads(out a, out b);
39 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+
40 "WorkerThreads is:{2} CompletionPortThreads is :{3}",
41 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
42 Console.WriteLine(message);
43 }
44 }
由輸出結果可以看到,在使用FileStream.BeginWrite方法后,系統將自動啟動CLR線程池中I/O線程。
5.1.2 異步讀取
FileStream 中包含 BeginRead 與 EndRead 可以異步調用I/O線程進行讀取。
public override IAsyncResult BeginRead ( byte[] array,int offset,int numBytes, AsyncCallback userCallback,Object stateObject)
public override int EndRead(IAsyncResult asyncResult)
其使用方式與BeginWrite和EndWrite相似,AsyncCallback用於綁定回調函數; Object用於傳遞外部數據。在回調函數只需要使用IAsyncResut.AsyncState就可獲取外部數據。EndWrite 方法會返回從流讀取到的字節數量。
首先定義 FileData 類,里面包含FileStream對象,byte[] 數組和長度。然后把FileData對象作為外部數據傳到回調函數,在回調函數中,把IAsyncResult.AsyncState強制轉換為FileData,然后通過FileStream.EndRead(IAsyncResult)結束讀取。最后比較一下長度,若讀取到的長度與輸入的數據長度不一至,則拋出異常。
1 class Program
2 {
3 public class FileData
4 {
5 public FileStream Stream;
6 public int Length;
7 public byte[] ByteData;
8 }
9
10 static void Main(string[] args)
11 {
12 //把線程池的最大值設置為1000
13 ThreadPool.SetMaxThreads(1000, 1000);
14 ThreadPoolMessage("Start");
15 ReadFile();
16
17 Console.ReadKey();
18 }
19
20 static void ReadFile()
21 {
22 byte[] byteData=new byte[80961024];
23 FileStream stream = new FileStream("File1.sour", FileMode.OpenOrCreate,
24 FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true);
25
26 //把FileStream對象,byte[]對象,長度等有關數據綁定到FileData對象中,以附帶屬性方式送到回調函數
27 FileData fileData = new FileData();
28 fileData.Stream = stream;
29 fileData.Length = (int)stream.Length;
30 fileData.ByteData = byteData;
31
32 //啟動異步讀取
33 stream.BeginRead(byteData, 0, fileData.Length, new AsyncCallback(Completed), fileData);
34 }
35
36 static void Completed(IAsyncResult result)
37 {
38 ThreadPoolMessage("Completed");
39
40 //把AsyncResult.AsyncState轉換為FileData對象,以FileStream.EndRead完成異步讀取
41 FileData fileData = (FileData)result.AsyncState;
42 int length=fileData.Stream.EndRead(result);
43 fileData.Stream.Close();
44
45 //如果讀取到的長度與輸入長度不一致,則拋出異常
46 if (length != fileData.Length)
47 throw new Exception("Stream is not complete!");
48
49 string data=Encoding.ASCII.GetString(fileData.ByteData, 0, fileData.Length);
50 Console.WriteLine(data.Substring(2,22));
51 }
52
53 //顯示線程池現狀
54 static void ThreadPoolMessage(string data)
55 {
56 int a, b;
57 ThreadPool.GetAvailableThreads(out a, out b);
58 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+
59 "WorkerThreads is:{2} CompletionPortThreads is :{3}",
60 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
61 Console.WriteLine(message);
62 }
63
64 }
由輸出結果可以看到,在使用FileStream.BeginRead方法后,系統將自動啟動CLR線程池中I/O線程。
5.2 異步操作TCP/IP套接字
在介紹 TCP/IP 套接字前先簡單介紹一下 NetworkStream 類,它是用於網絡訪問的基礎數據流。 NetworkStream 提供了好幾個方法控制套接字數據的發送與接收, 其中BeginRead、EndRead、BeginWrite、EndWrite 能夠實現異步操作,而且異步線程是來自於CLR線程池的I/O線程。
public override int ReadByte ()
public override int Read (byte[] buffer,int offset, int size)
public override void WriteByte (byte value)
public override void Write (byte[] buffer,int offset, int size)
public override IAsyncResult BeginRead (byte [] buffer, int offset, int size, AsyncCallback callback, Object state )
public override int EndRead(IAsyncResult result)
public override IAsyncResult BeginWrite (byte [] buffer, int offset, int size, AsyncCallback callback, Object state )
public override void EndWrite(IAsyncResult result)
若要創建 NetworkStream,必須提供已連接的 Socket。而在.NET中使用TCP/IP套接字不需要直接與Socket打交道,因為.NET把Socket的大部分操作都放在System.Net.TcpListener和System.Net.Sockets.TcpClient里面,這兩個類大大地簡化了Socket的操作。一般套接字對象Socket包含一個Accept()方法,此方法能產生阻塞來等待客戶端的請求,而在TcpListener類里也包含了一個相似的方法 public TcpClient AcceptTcpClient()用於等待客戶端的請求。此方法將會返回一個TcpClient 對象,通過 TcpClient 的 public NetworkStream GetStream()方法就能獲取NetworkStream對象,控制套接字數據的發送與接收。
下面以一個例子說明異步調用TCP/IP套接字收發數據的過程。
首先在服務器端建立默認地址127.0.0.1用於收發信息,使用此地址與端口500新建TcpListener對象,調用TcpListener.Start 偵聽傳入的連接請求,再使用一個死循環來監聽信息。
在ChatClient類包括有接收信息與發送信息兩個功能:當接收到客戶端請求時,它會利用 NetworkStream.BeginRead 讀取客戶端信息,並在回調函數ReceiveAsyncCallback中輸出信息內容,若接收到的信息的大小小於1時,它將會拋出一個異常。當信息成功接收后,再使用 NetworkStream.BeginWrite 方法回饋信息到客戶端
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //設置CLR線程池最大線程數
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 //默認地址為127.0.0.1
9 IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
10 TcpListener tcpListener = new TcpListener(ipAddress, 500);
11 tcpListener.Start();
12
13 //以一個死循環來實現監聽
14 while (true)
15 { //調用一個ChatClient對象來實現監聽
16 ChatClient chatClient = new ChatClient(tcpListener.AcceptTcpClient());
17 }
18 }
19 }
20
21 public class ChatClient
22 {
23 static TcpClient tcpClient;
24 static byte[] byteMessage;
25 static string clientEndPoint;
26
27 public ChatClient(TcpClient tcpClient1)
28 {
29 tcpClient = tcpClient1;
30 byteMessage = new byte[tcpClient.ReceiveBufferSize];
31
32 //顯示客戶端信息
33 clientEndPoint = tcpClient.Client.RemoteEndPoint.ToString();
34 Console.WriteLine("Client's endpoint is " + clientEndPoint);
35
36 //使用NetworkStream.BeginRead異步讀取信息
37 NetworkStream networkStream = tcpClient.GetStream();
38 networkStream.BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize,
39 new AsyncCallback(ReceiveAsyncCallback), null);
40 }
41
42 public void ReceiveAsyncCallback(IAsyncResult iAsyncResult)
43 {
44 //顯示CLR線程池狀態
45 Thread.Sleep(100);
46 ThreadPoolMessage("\nMessage is receiving");
47
48 //使用NetworkStream.EndRead結束異步讀取
49 NetworkStream networkStreamRead = tcpClient.GetStream();
50 int length=networkStreamRead.EndRead(iAsyncResult);
51
52 //如果接收到的數據長度少於1則拋出異常
53 if (length < 1)
54 {
55 tcpClient.GetStream().Close();
56 throw new Exception("Disconnection!");
57 }
58
59 //顯示接收信息
60 string message = Encoding.UTF8.GetString(byteMessage, 0, length);
61 Console.WriteLine("Message:" + message);
62
63 //使用NetworkStream.BeginWrite異步發送信息
64 byte[] sendMessage = Encoding.UTF8.GetBytes("Message is received!");
65 NetworkStream networkStreamWrite=tcpClient.GetStream();
66 networkStreamWrite.BeginWrite(sendMessage, 0, sendMessage.Length,
67 new AsyncCallback(SendAsyncCallback), null);
68 }
69
70 //把信息轉換成二進制數據,然后發送到客戶端
71 public void SendAsyncCallback(IAsyncResult iAsyncResult)
72 {
73 //顯示CLR線程池狀態
74 Thread.Sleep(100);
75 ThreadPoolMessage("\nMessage is sending");
76
77 //使用NetworkStream.EndWrite結束異步發送
78 tcpClient.GetStream().EndWrite(iAsyncResult);
79
80 //重新監聽
81 tcpClient.GetStream().BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize,
82 new AsyncCallback(ReceiveAsyncCallback), null);
83 }
84
85 //顯示線程池現狀
86 static void ThreadPoolMessage(string data)
87 {
88 int a, b;
89 ThreadPool.GetAvailableThreads(out a, out b);
90 string message = string.Format("{0}\n CurrentThreadId is {1}\n " +
91 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
92 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
93
94 Console.WriteLine(message);
95 }
96 }
而在客戶端只是使用簡單的開發方式,利用TcpClient連接到服務器端,然后調用NetworkStream.Write方法發送信息,最后調用NetworkStream.Read方法讀取回饋信息
1 static void Main(string[] args)
2 {
3 //連接服務端
4 TcpClient tcpClient = new TcpClient("127.0.0.1", 500);
5
6 //發送信息
7 NetworkStream networkStream = tcpClient.GetStream();
8 byte[] sendMessage = Encoding.UTF8.GetBytes("Client request connection!");
9 networkStream.Write(sendMessage, 0, sendMessage.Length);
10 networkStream.Flush();
11
12 //接收信息
13 byte[] receiveMessage=new byte[1024];
14 int count=networkStream.Read(receiveMessage, 0,1024);
15 Console.WriteLine(Encoding.UTF8.GetString(receiveMessage));
16 Console.ReadKey();
17 }
注意觀察運行結果,服務器端的異步操作線程都是來自於CLR線程池的I/O線程
5.3 異步WebRequest
System.Net.WebRequest 是 .NET 為實現訪問 Internet 的 “請求/響應模型” 而開發的一個 abstract 基類, 它主要有三個子類:FtpWebRequest、HttpWebRequest、FileWebRequest。當使用WebRequest.Create(string uri)創建對象時,應用程序就可以根據請求協議判斷實現類來進行操作。FileWebRequest、FtpWebRequest、HttpWebRequest 各有其作用:FileWebRequest 使用 “file://路徑” 的URI方式實現對本地資源和內部文件的請求/響應、FtpWebRequest 使用FTP文件傳輸協議實現文件請求/響應、HttpWebRequest 用於處理HTTP的頁面請求/響應。由於使用方法相類似,下面就以常用的HttpWebRequest為例子介紹一下異步WebRequest的使用方法。
在使用ASP.NET開發網站的時候,往往會忽略了HttpWebRequest的使用,因為開發都假設客戶端是使用瀏覽器等工具去閱讀頁面的。但如果你對REST開發方式有所了解,那對 HttpWebRequest 就應該非常熟悉。它可以在路徑參數、頭文件、頁面主體、Cookie 等多處地方加入請求條件,然后對回復數據進行適當處理。HttpWebRequest 包含有以下幾個常用方法用於處理請求/響應:
public override Stream GetRequestStream ()
public override WebResponse GetResponse ()
public override IAsyncResult BeginGetRequestStream ( AsyncCallback callback, Object state )
public override Stream EndGetRequestStream ( IAsyncResult asyncResult )
public override IAsyncResult BeginGetResponse ( AsyncCallback callback, Object state )
public override WebResponse EndGetResponse ( IAsyncResult asyncResult )
其中BeginGetRequestStream、EndGetRequestStream 用於異步向HttpWebRequest對象寫入請求信息; BeginGetResponse、EndGetResponse 用於異步發送頁面請求並獲取返回信息。使用異步方式操作Internet的“請求/響應”,避免主線程長期處於等待狀態,而操作期間異步線程是來自CLR線程池的I/O線程。
下面以簡單的例子介紹一下異步請求的用法。
首先為Person類加上可序列化特性,在服務器端建立Hanlder.ashx,通過Request.InputStream 獲取到請求數據並把數據轉化為String對象,此實例中數據是以 “Id:1” 的形式實現傳送的。然后根據Id查找對應的Person對象,並把Person對象寫入Response.OutStream 中返還到客戶端。
在客戶端先把 HttpWebRequird.Method 設置為 "post",使用異步方式通過BeginGetRequireStream獲取請求數據流,然后寫入請求數據 “Id:1”。再使用異步方法BeginGetResponse 獲取回復數據,最后把數據反序列化為Person對象顯示出來。
注意:HttpWebRequire.Method默認為get,在寫入請求前必須把HttpWebRequire.Method設置為post,否則在使用BeginGetRequireStream 獲取請求數據流的時候,系統就會發出 “無法發送具有此謂詞類型的內容正文" 的異常。
Model
1 namespace Model
2 {
3 [Serializable]
4 public class Person
5 {
6 public int ID
7 {
8 get;
9 set;
10 }
11 public string Name
12 {
13 get;
14 set;
15 }
16 public int Age
17 {
18 get;
19 set;
20 }
21 }
22 }
服務器端
1 public class Handler : IHttpHandler {
2
3 public void ProcessRequest(HttpContext context)
4 {
5 //把信息轉換為String,找出輸入條件Id
6 byte[] bytes=new byte[1024];
7 int length=context.Request.InputStream.Read(bytes,0,1024);
8 string condition = Encoding.Default.GetString(bytes);
9 int id = int.Parse(condition.Split(new string[] { ":" },
10 StringSplitOptions.RemoveEmptyEntries)[1]);
11
12 //根據Id查找對應Person對象
13 var person = GetPersonList().Where(x => x.ID == id).First();
14
15 //所Person格式化為二進制數據寫入OutputStream
16 BinaryFormatter formatter = new BinaryFormatter();
17 formatter.Serialize(context.Response.OutputStream, person);
18 }
19
20 //模擬源數據
21 private IList<Person> GetPersonList()
22 {
23 var personList = new List<Person>();
24
25 var person1 = new Person();
26 person1.ID = 1;
27 person1.Name = "Leslie";
28 person1.Age = 30;
29 personList.Add(person1);
30 ...........
31 return personList;
32 }
33
34 public bool IsReusable
35 {
36 get { return true;}
37 }
38 }
客戶端
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6 Request();
7 Console.ReadKey();
8 }
9
10 static void Request()
11 {
12 ThreadPoolMessage("Start");
13 //使用WebRequest.Create方法建立HttpWebRequest對象
14 HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create(
15 "http://localhost:5700/Handler.ashx");
16 webRequest.Method = "post";
17
18 //對寫入數據的RequestStream對象進行異步請求
19 IAsyncResult result=webRequest.BeginGetRequestStream(
20 new AsyncCallback(EndGetRequestStream),webRequest);
21 }
22
23 static void EndGetRequestStream(IAsyncResult result)
24 {
25 ThreadPoolMessage("RequestStream Complete");
26 //獲取RequestStream
27 HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState;
28 Stream stream=webRequest.EndGetRequestStream(result);
29
30 //寫入請求條件
31 byte[] condition = Encoding.Default.GetBytes("Id:1");
32 stream.Write(condition, 0, condition.Length);
33
34 //異步接收回傳信息
35 IAsyncResult responseResult = webRequest.BeginGetResponse(
36 new AsyncCallback(EndGetResponse), webRequest);
37 }
38
39 static void EndGetResponse(IAsyncResult result)
40 {
41 //顯出線程池現狀
42 ThreadPoolMessage("GetResponse Complete");
43
44 //結束異步請求,獲取結果
45 HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState;
46 WebResponse webResponse = webRequest.EndGetResponse(result);
47
48 //把輸出結果轉化為Person對象
49 Stream stream = webResponse.GetResponseStream();
50 BinaryFormatter formatter = new BinaryFormatter();
51 var person=(Person)formatter.Deserialize(stream);
52 Console.WriteLine(string.Format("Person Id:{0} Name:{1} Age:{2}",
53 person.ID, person.Name, person.Age));
54 }
55
56 //顯示線程池現狀
57 static void ThreadPoolMessage(string data)
58 {
59 int a, b;
60 ThreadPool.GetAvailableThreads(out a, out b);
61 string message = string.Format("{0}\n CurrentThreadId is {1}\n " +
62 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
63 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
64
65 Console.WriteLine(message);
66 }
67 }
從運行結果可以看到,BeginGetRequireStream、BeginGetResponse方法是使用CLR線程池的I/O線程。
5.4 異步調用WebService
相比TCP/IP套接字,在使用WebService的時候,服務器端需要更復雜的操作處理,使用時間往往會更長。為了避免客戶端長期處於等待狀態,在配置服務引用時選擇 “生成異步操作”,系統可以自動建立異步調用的方式。
以.NET 2.0以前,系統都是使用ASMX來設計WebService,而近年來WCF可說是火熱登場,下面就以WCF為例子簡單介紹一下異步調用WebService的例子。
由於系統可以自動生成異步方法,使用起來非常簡單,首先在服務器端建立服務ExampleService,里面包含方法Method。客戶端引用此服務時,選擇 “生成異步操作”。然后使用 BeginMethod 啟動異步方法, 在回調函數中調用EndMethod結束異步調用。
服務端
1 [ServiceContract]
2 public interface IExampleService
3 {
4 [OperationContract]
5 string Method(string name);
6 }
7
8 public class ExampleService : IExampleService
9 {
10 public string Method(string name)
11 {
12 return "Hello " + name;
13 }
14 }
15
16 class Program
17 {
18 static void Main(string[] args)
19 {
20 ServiceHost host = new ServiceHost(typeof(ExampleService));
21 host.Open();
22 Console.ReadKey();
23 host.Close();
24 }
25 }
26
27 <configuration>
28 <system.serviceModel>
29 <services>
30 <service name="Example.ExampleService">
31 <endpoint address="" binding="wsHttpBinding" contract="Example.IExampleService">
32 <identity>
33 <dns value="localhost" />
34 </identity>
35 </endpoint>
36 <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange" />
37 <host>
38 <baseAddresses>
39 <add baseAddress="http://localhost:7200/Example/ExampleService/" />
40 </baseAddresses>
41 </host>
42 </service>
43 </services>
44 </system.serviceModel>
45 </configuration>
客戶端
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //設置最大線程數
6 ThreadPool.SetMaxThreads(1000, 1000);
7 ThreadPoolMessage("Start");
8
9 //建立服務對象,異步調用服務方法
10 ExampleServiceReference.ExampleServiceClient exampleService = new
11 ExampleServiceReference.ExampleServiceClient();
12 exampleService.BeginMethod("Leslie",new AsyncCallback(AsyncCallbackMethod),
13 exampleService);
14 Console.ReadKey();
15 }
16
17 static void AsyncCallbackMethod(IAsyncResult result)
18 {
19 Thread.Sleep(1000);
20 ThreadPoolMessage("Complete");
21 ExampleServiceReference.ExampleServiceClient example =
22 (ExampleServiceReference.ExampleServiceClient)result.AsyncState;
23 string data=example.EndMethod(result);
24 Console.WriteLine(data);
25 }
26
27 //顯示線程池現狀
28 static void ThreadPoolMessage(string data)
29 {
30 int a, b;
31 ThreadPool.GetAvailableThreads(out a, out b);
32 string message = string.Format("{0}\n CurrentThreadId is {1}\n " +
33 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
34 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
35
36 Console.WriteLine(message);
37 }
38 }
39
40 <configuration>
41 <system.serviceModel>
42 <bindings>
43 <wsHttpBinding>
44 <binding name="WSHttpBinding_IExampleService" closeTimeout="00:01:00"
45 openTimeout="00:01:00" receiveTimeout="00:10:00" sendTimeout="00:01:00"
46 bypassProxyOnLocal="false" transactionFlow="false"
47 hostNameComparisonMode="StrongWildcard" maxBufferPoolSize="524288"
48 maxReceivedMessageSize="65536" messageEncoding="Text" textEncoding="utf-8"
49 useDefaultWebProxy="true" allowCookies="false">
50 <readerQuotas maxDepth="32" maxStringContentLength="8192" maxArrayLength="16384"
51 maxBytesPerRead="4096" maxNameTableCharCount="16384" />
52 <reliableSession ordered="true" inactivityTimeout="00:10:00" enabled="false" />
53 <security mode="Message">
54 <transport clientCredentialType="Windows" proxyCredentialType="None"
55 realm="" />
56 <message clientCredentialType="Windows" negotiateServiceCredential="true"
57 algorithmSuite="Default" />
58 </security>
59 </binding>
60 </wsHttpBinding>
61 </bindings>
62 <client>
63 <endpoint address="http://localhost:7200/Example/ExampleService/"
64 binding="wsHttpBinding" bindingConfiguration="WSHttpBinding_IExampleService"
65 contract="ExampleServiceReference.IExampleService"
66 name="WSHttpBinding_IExampleService">
67 <identity>
68 <dns value="localhost" />
69 </identity>
70 </endpoint>
71 </client>
72 </system.serviceModel>
73 </configuration>
注意觀察運行結果,異步調用服務時,回調函數都是運行於CLR線程池的I/O線程當中。
六、異步 SqlCommand
從ADO.NET 2.0開始,SqlCommand就新增了幾個異步方法執行SQL命令。相對於同步執行方式,它使主線程不需要等待數據庫的返回結果,在使用復雜性查詢或批量插入時將有效提高主線程的效率。使用異步SqlCommand的時候,請注意把ConnectionString 的 Asynchronous Processing 設置為 true 。
注意:SqlCommand異步操作的特別之處在於線程並不依賴於CLR線程池,而是由Windows內部提供,這比使用異步委托更有效率。但如果需要使用回調函數的時候,回調函數的線程依然是來自於CLR線程池的工作者線程。
SqlCommand有以下幾個方法支持異步操作:
public IAsyncResult BeginExecuteNonQuery (......)
public int EndExecuteNonQuery(IAsyncResult)
public IAsyncResult BeginExecuteReader(......)
public SqlDataReader EndExecuteReader(IAsyncResult)
public IAsyncResult BeginExecuteXmlReader (......)
public XmlReader EndExecuteXmlReader(IAsyncResult)
由於使用方式相似,此處就以 BeginExecuteNonQuery 為例子,介紹一下異步SqlCommand的使用。首先建立connectionString,注意把Asynchronous Processing設置為true來啟動異步命令,然后把SqlCommand.CommandText設置為 WAITFOR DELAY "0:0:3" 來虛擬數據庫操作。再通過BeginExecuteNonQuery啟動異步操作,利用輪詢方式監測操作情況。最后在操作完成后使用EndExecuteNonQuery完成異步操作。
1 class Program
2 {
3 //把Asynchronous Processing設置為true
4 static string connectionString = "Data Source=LESLIE-PC;Initial Catalog=Business;“+
5 "Integrated Security=True;Asynchronous Processing=true";
6
7 static void Main(string[] args)
8 {
9 //把CLR線程池最大線程數設置為1000
10 ThreadPool.SetMaxThreads(1000, 1000);
11 ThreadPoolMessage("Start");
12
13 //使用WAITFOR DELAY命令來虛擬操作
14 SqlConnection connection = new SqlConnection(connectionString);
15 SqlCommand command = new SqlCommand("WAITFOR DELAY '0:0:3';", connection);
16 connection.Open();
17
18 //啟動異步SqlCommand操作,利用輪詢方式監測操作
19 IAsyncResult result = command.BeginExecuteNonQuery();
20 ThreadPoolMessage("BeginRead");
21 while (!result.AsyncWaitHandle.WaitOne(500))
22 Console.WriteLine("Main thread do work........");
23
24 //結束異步SqlCommand
25 int count= command.EndExecuteNonQuery(result);
26 ThreadPoolMessage("\nCompleted");
27 Console.ReadKey();
28 }
29
30 //顯示線程池現狀
31 static void ThreadPoolMessage(string data)
32 {
33 int a, b;
34 ThreadPool.GetAvailableThreads(out a, out b);
35 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+
36 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
37 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
38 Console.WriteLine(message);
39 }
40 }
注意運行結果,SqlCommand的異步執行線程並不屬於CLR線程池。
如果覺得使用輪詢方式過於麻煩,可以使用回調函數,但要注意當調用回調函數時,線程是來自於CLR線程池的工作者線程。
1 class Program
2 {
3 //把Asynchronous Processing設置為true
4 static string connectionString = "Data Source=LESLIE-PC;Initial Catalog=Business;”+
5 “Integrated Security=True;Asynchronous Processing=true";
6 static void Main(string[] args)
7 {
8 //把CLR線程池最大線程數設置為1000
9 ThreadPool.SetMaxThreads(1000, 1000);
10 ThreadPoolMessage("Start");
11
12 //使用WAITFOR DELAY命令來虛擬操作
13 SqlConnection connection = new SqlConnection(connectionString);
14 SqlCommand command = new SqlCommand("WAITFOR DELAY '0:0:3';", connection);
15 connection.Open();
16
17 //啟動異步SqlCommand操作,並把SqlCommand對象傳遞到回調函數
18 IAsyncResult result = command.BeginExecuteNonQuery(
19 new AsyncCallback(AsyncCallbackMethod),command);
20 Console.ReadKey();
21 }
22
23 static void AsyncCallbackMethod(IAsyncResult result)
24 {
25 Thread.Sleep(200);
26 ThreadPoolMessage("AsyncCallback");
27 SqlCommand command = (SqlCommand)result.AsyncState;
28 int count=command.EndExecuteNonQuery(result);
29 command.Connection.Close();
30 }
31
32 //顯示線程池現狀
33 static void ThreadPoolMessage(string data)
34 {
35 int a, b;
36 ThreadPool.GetAvailableThreads(out a, out b);
37 string message = string.Format("{0}\n CurrentThreadId is {1}\n "+
38 "WorkerThreads is:{2} CompletionPortThreads is :{3}\n",
39 data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
40
41 Console.WriteLine(message);
42 }
43 }
運行結果:
七、並行編程與PLINQ
要使用多線程開發,必須非常熟悉Thread的使用,而且在開發過程中可能會面對很多未知的問題。為了簡化開發,.NET 4.0 特別提供一個並行編程庫System.Threading.Tasks,它可以簡化並行開發,你無需直接跟線程或線程池打交道,就可以簡單建立多線程應用程序。此外,.NET還提供了新的一組擴展方法PLINQ,它具有自動分析查詢功能,如果並行查詢能提高系統效率,則同時運行,如果查詢未能從並行查詢中受益,則按原順序查詢。下面將詳細介紹並行操作的方式。
7.1 泛型委托
使用並行編程可以同時操作多個委托,在介紹並行編程前先簡單介紹一下兩個泛型委托System.Func<>與System.Action<>。
Func<>是一個能接受多個參數和一個返回值的泛型委托,它能接受0個到16個輸入參數, 其中 T1,T2,T3,T4......T16 代表自定的輸入類型,TResult為自定義的返回值。
public delegate TResult Func<TResult>()
public delegate TResult Func<T1,TResult>(T1 arg1)
public delegate TResult Func<T1,T2, TResult>(T1 arg1,T2 arg2)
public delegate TResult Func<T1,T2, T3, TResult>(T1 arg1,T2 arg2,T3 arg3)
public delegate TResult Func<T1,T2, T3, ,T4, TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4)
..............
public delegate TResult Func<T1,T2, T3, ,T4, ...... ,T16,TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)
Action<>與Func<>十分相似,不同在於Action<>的返回值為void,Action能接受0~16個參數
public delegate void Action<T1>()
public delegate void Action<T1,T2>(T1 arg1,T2 arg2)
public delegate void Action<T1,T2, T3>(T1 arg1,T2 arg2, T3 arg3)
.............
public delegate void Action<T1,T2, T3, ,T4, ...... ,T16>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)
7.2 任務並行庫(TPL)
System.Threading.Tasks中的類被統稱為任務並行庫(Task Parallel Library,TPL),TPL使用CLR線程池把工作分配到CPU,並能自動處理工作分區、線程調度、取消支持、狀態管理以及其他低級別的細節操作,極大地簡化了多線程的開發。
TPL包括常用的數據並行與任務並行兩種執行方式:
7.2.1 數據並行
數據並行的核心類就是System.Threading.Tasks.Parallel,它包含兩個靜態方法 Parallel.For 與 Parallel.ForEach, 使用方式與for、foreach相仿。通過這兩個方法可以並行處理System.Func<>、System.Action<>委托。
以下一個例子就是利用 public static ParallelLoopResult For( int from, int max, Action<int>) 方法對List<Person>進行並行查詢。
假設使用單線程方式查詢3個Person對象,需要用時大約6秒,在使用並行方式,只需使用2秒就能完成查詢,而且能夠避開Thread的繁瑣處理。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //設置最大線程數
6 ThreadPool.SetMaxThreads(1000, 1000);
7 //並行查詢
8 Parallel.For(0, 3,n =>
9 {
10 Thread.Sleep(2000); //模擬查詢
11 ThreadPoolMessage(GetPersonList()[n]);
12 });
13 Console.ReadKey();
14 }
15
16 //模擬源數據
17 static IList<Person> GetPersonList()
18 {
19 var personList = new List<Person>();
20
21 var person1 = new Person();
22 person1.ID = 1;
23 person1.Name = "Leslie";
24 person1.Age = 30;
25 personList.Add(person1);
26 ...........
27 return personList;
28 }
29
30 //顯示線程池現狀
31 static void ThreadPoolMessage(Person person)
32 {
33 int a, b;
34 ThreadPool.GetAvailableThreads(out a, out b);
35 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
36 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
37 " CompletionPortThreads is :{5}\n",
38 person.ID, person.Name, person.Age,
39 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
40
41 Console.WriteLine(message);
42 }
43 }
觀察運行結果,對象並非按照原排列順序進行查詢,而是使用並行方式查詢。
若想停止操作,可以利用ParallelLoopState參數,下面以ForEach作為例子。
public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> action)
其中source為數據集,在Action<TSource,ParallelLoopState>委托的ParallelLoopState參數當中包含有Break()和 Stop()兩個方法都可以使迭代停止。Break的使用跟傳統for里面的使用方式相似,但因為處於並行處理當中,使用Break並不能保證所有運行能立即停止,在當前迭代之前的迭代會繼續執行。若想立即停止操作,可以使用Stop方法,它能保證立即終止所有的操作,無論它們是處於當前迭代的之前還是之后。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //設置最大線程數
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 //並行查詢
9 Parallel.ForEach(GetPersonList(), (person, state) =>
10 {
11 if (person.ID == 2)
12 state.Stop();
13 ThreadPoolMessage(person);
14 });
15 Console.ReadKey();
16 }
17
18 //模擬源數據
19 static IList<Person> GetPersonList()
20 {
21 var personList = new List<Person>();
22
23 var person1 = new Person();
24 person1.ID = 1;
25 person1.Name = "Leslie";
26 person1.Age = 30;
27 personList.Add(person1);
28 ..........
29 return personList;
30 }
31
32 //顯示線程池現狀
33 static void ThreadPoolMessage(Person person)
34 {
35 int a, b;
36 ThreadPool.GetAvailableThreads(out a, out b);
37 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
38 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
39 " CompletionPortThreads is :{5}\n",
40 person.ID, person.Name, person.Age,
41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
42
43 Console.WriteLine(message);
44 }
45 }
觀察運行結果,當Person的ID等於2時,運行將會停止。
當要在多個線程中調用本地變量,可以使用以下方法:
public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<Of TSource>, Func<Of TLocal>, Func<Of TSource,ParallelLoopState,TLocal,TLocal>, Action<Of TLocal>)
其中第一個參數為數據集;
第二個參數是一個Func委托,用於在每個線程執行前進行初始化;
第 三個參數是委托Func<Of T1,T2,T3,TResult>,它能對數據集的每個成員進行迭代,當中T1是數據集的成員,T2是一個ParallelLoopState對 象,它可以控制迭代的狀態,T3是線程中的本地變量;
第四個參數是一個Action委托,用於對每個線程的最終狀態進行最終操作。
在以下例子中,使用ForEach計算多個Order的總體價格。在ForEach方法中,首先把參數初始化為0f,然后用把同一個Order的多個OrderItem價格進行累加,計算出Order的價格,最后把多個Order的價格進行累加,計算出多個Order的總體價格。
1 public class Order
2 {
3 public int ID;
4 public float Price;
5 }
6
7 public class OrderItem
8 {
9 public int ID;
10 public string Goods;
11 public int OrderID;
12 public float Price;
13 public int Count;
14 }
15
16 class Program
17 {
18 static void Main(string[] args)
19 {
20 //設置最大線程數
21 ThreadPool.SetMaxThreads(1000, 1000);
22 float totalPrice = 0f;
23 //並行查詢
24 var parallelResult = Parallel.ForEach(GetOrderList(),
25 () => 0f, //把參數初始值設為0
26 (order, state, orderPrice) =>
27 {
28 //計算單個Order的價格
29 orderPrice = GetOrderItem().Where(item => item.OrderID == order.ID)
30 .Sum(item => item.Price * item.Count);
31 order.Price = orderPrice;
32 ThreadPoolMessage(order);
33
34 return orderPrice;
35 },
36 (finallyPrice) =>
37 {
38 totalPrice += finallyPrice;//計算多個Order的總體價格
39 }
40 );
41
42 while (!parallelResult.IsCompleted)
43 Console.WriteLine("Doing Work!");
44
45 Console.WriteLine("Total Price is:" + totalPrice);
46 Console.ReadKey();
47 }
48 //虛擬數據
49 static IList<Order> GetOrderList()
50 {
51 IList<Order> orderList = new List<Order>();
52 Order order1 = new Order();
53 order1.ID = 1;
54 orderList.Add(order1);
55 ............
56 return orderList;
57 }
58 //虛擬數據
59 static IList<OrderItem> GetOrderItem()
60 {
61 IList<OrderItem> itemList = new List<OrderItem>();
62
63 OrderItem orderItem1 = new OrderItem();
64 orderItem1.ID = 1;
65 orderItem1.Goods = "iPhone 4S";
66 orderItem1.Price = 6700;
67 orderItem1.Count = 2;
68 orderItem1.OrderID = 1;
69 itemList.Add(orderItem1);
70 ...........
71 return itemList;
72 }
73
74 //顯示線程池現狀
75 static void ThreadPoolMessage(Order order)
76 {
77 int a, b;
78 ThreadPool.GetAvailableThreads(out a, out b);
79 string message = string.Format("OrderID:{0} OrderPrice:{1}\n" +
80 " CurrentThreadId is {2}\n WorkerThreads is:{3}" +
81 " CompletionPortThreads is:{4}\n",
82 order.ID, order.Price,
83 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
84
85 Console.WriteLine(message);
86 }
87 }
運行結果
7.2.2 任務並行
在TPL當中還可以使用Parallel.Invoke方法觸發多個異步任務,其中 actions 中可以包含多個方法或者委托,parallelOptions用於配置Parallel類的操作。
public static void Invoke(Action[] actions )
public static void Invoke(ParallelOptions parallelOptions, Action[] actions )
下面例子中利用了Parallet.Invoke並行查詢多個Person,actions當中可以綁定方法、lambda表達式或者委托,注意綁定方法時必須是返回值為void的無參數方法。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //設置最大線程數
6 ThreadPool.SetMaxThreads(1000, 1000);
7
8 //任務並行
9 Parallel.Invoke(option,
10 PersonMessage,
11 ()=>ThreadPoolMessage(GetPersonList()[1]),
12 delegate(){
13 ThreadPoolMessage(GetPersonList()[2]);
14 });
15 Console.ReadKey();
16 }
17
18 static void PersonMessage()
19 {
20 ThreadPoolMessage(GetPersonList()[0]);
21 }
22
23 //顯示線程池現狀
24 static void ThreadPoolMessage(Person person)
25 {
26 int a, b;
27 ThreadPool.GetAvailableThreads(out a, out b);
28 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
29 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
30 " CompletionPortThreads is :{5}\n",
31 person.ID, person.Name, person.Age,
32 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
33
34 Console.WriteLine(message);
35 }
36
37 //模擬源數據
38 static IList<Person> GetPersonList()
39 {
40 var personList = new List<Person>();
41
42 var person1 = new Person();
43 person1.ID = 1;
44 person1.Name = "Leslie";
45 person1.Age = 30;
46 personList.Add(person1);
47 ..........
48 return personList;
49 }
50 }
運行結果
7.3 Task簡介
以Thread創建的線程被默認為前台線程,當然你可以把線程IsBackground屬性設置為true,但TPL為此提供了一個更簡單的類Task。
Task存在於System.Threading.Tasks命名空間當中,它可以作為異步委托的簡單替代品。
通過Task的Factory屬性將返回TaskFactory類,以TaskFactory.StartNew(Action)方法可以創建一個新線程,所創建的線程默認為后台線程。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6 Task.Factory.StartNew(() => ThreadPoolMessage());
7 Console.ReadKey();
8 }
9
10 //顯示線程池現狀
11 static void ThreadPoolMessage()
12 {
13 int a, b;
14 ThreadPool.GetAvailableThreads(out a, out b);
15 string message = string.Format("CurrentThreadId is:{0}\n" +
16 "CurrentThread IsBackground:{1}\n" +
17 "WorkerThreads is:{2}\nCompletionPortThreads is:{3}\n",
18 Thread.CurrentThread.ManagedThreadId,
19 Thread.CurrentThread.IsBackground.ToString(),
20 a.ToString(), b.ToString());
21 Console.WriteLine(message);
22 }
23 }
運行結果
若要取消處理,可以利用CancellationTakenSource對象,在TaskFactory中包含有方法
public Task StartNew( Action action, CancellationToken cancellationToken )
在方法中加入CancellationTakenSource對象的CancellationToken屬性,可以控制任務的運行,調用CancellationTakenSource.Cancel時任務就會自動停止。下面以圖片下載為例子介紹一下TaskFactory的使用。
服務器端頁面
1 <html xmlns="http://www.w3.org/1999/xhtml">
2 <head runat="server">
3 <title></title>
4 <script type="text/C#" runat="server">
5 private static List<string> url=new List<string>();
6
7 protected void Page_Load(object sender, EventArgs e)
8 {
9 if (!Page.IsPostBack)
10 {
11 url.Clear();
12 Application["Url"] = null;
13 }
14 }
15
16 protected void CheckBox_CheckedChanged(object sender, EventArgs e)
17 {
18 CheckBox checkBox = (CheckBox)sender;
19 if (checkBox.Checked)
20 url.Add(checkBox.Text);
21 else
22 url.Remove(checkBox.Text);
23 Application["Url"]= url;
24 }
25 </script>
26 </head>
27 <body>
28 <form id="form1" runat="server" >
29 <div align="left">
30 <div align="center" style="float: left;">
31 <asp:Image ID="Image1" runat="server" ImageUrl="~/Images/A.jpg" /><br />
32 <asp:CheckBox ID="CheckBox1" runat="server" AutoPostBack="True"
33 oncheckedchanged="CheckBox_CheckedChanged" Text="A.jpg" />
34 </div>
35 <div align="center" style="float: left">
36 <asp:Image ID="Image2" runat="server" ImageUrl="~/Images/B.jpg" /><br />
37 <asp:CheckBox ID="CheckBox2" runat="server" AutoPostBack="True"
38 oncheckedchanged="CheckBox_CheckedChanged" Text="B.jpg" />
39 </div>
40 <div align="center" style="float: left">
41 <asp:Image ID="Image3" runat="server" ImageUrl="~/Images/C.jpg" /><br />
42 <asp:CheckBox ID="CheckBox3" runat="server" AutoPostBack="True"
43 oncheckedchanged="CheckBox_CheckedChanged" Text="C.jpg" />
44 </div>
45 <div align="center" style="float: left">
46 <asp:Image ID="Image4" runat="server" ImageUrl="~/Images/D.jpg" /><br />
47 <asp:CheckBox ID="CheckBox4" runat="server" AutoPostBack="True"
48 oncheckedchanged="CheckBox_CheckedChanged" Text="D.jpg" />
49 </div>
50 <div align="center" style="float: left">
51 <asp:Image ID="Image5" runat="server" ImageUrl="~/Images/E.jpg" /><br />
52 <asp:CheckBox ID="CheckBox5" runat="server" AutoPostBack="True"
53 oncheckedchanged="CheckBox_CheckedChanged" Text="E.jpg" />
54 </div>
55 </div>
56 </form>
57 </body>
58 </html>
首先在服務器頁面中顯示多個*.jpg圖片,每個圖片都有對應的CheckBox檢測其選擇情況。
所選擇圖片的路徑會記錄在Application["Url"]當中傳遞到Handler.ashx當中。
Handler.ashx 處理圖片的下載,它從 Application["Url"] 當中獲取所選擇圖片的路徑,並把圖片轉化成byte[]二進制數據。
再把圖片的數量,每副圖片的二進制數據的長度記錄在OutputStream的頭部。
最后把圖片的二進制數據記入 OutputStream 一並輸出。
1 public class Handler : IHttpHandler
2 {
3 public void ProcessRequest(HttpContext context)
4 {
5 //獲取圖片名,把圖片數量寫OutputStream
6 List<String> urlList = (List<string>)context.Application["Url"];
7 context.Response.OutputStream.Write(BitConverter.GetBytes(urlList.Count), 0, 4);
8
9 //把圖片轉換成二進制數據
10 List<string> imageList = GetImages(urlList);
11
12 //把每副圖片長度寫入OutputStream
13 foreach (string image in imageList)
14 {
15 byte[] imageByte=Convert.FromBase64String(image);
16 context.Response.OutputStream.Write(BitConverter.GetBytes(imageByte.Length),0,4);
17 }
18
19 //把圖片寫入OutputStream
20 foreach (string image in imageList)
21 {
22 byte[] imageByte = Convert.FromBase64String(image);
23 context.Response.OutputStream.Write(imageByte,0,imageByte.Length);
24 }
25 }
26
27 //獲取多個圖片的二進制數據
28 private List<string> GetImages(List<string> urlList)
29 {
30 List<string> imageList = new List<string>();
31 foreach (string url in urlList)
32 imageList.Add(GetImage(url));
33 return imageList;
34 }
35
36 //獲取單副圖片的二進制數據
37 private string GetImage(string url)
38 {
39 string path = "E:/My Projects/Example/WebSite/Images/"+url;
40 FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read);
41 byte[] imgBytes = new byte[10240];
42 int imgLength = stream.Read(imgBytes, 0, 10240);
43 return Convert.ToBase64String(imgBytes,0,imgLength);
44 }
45
46 public bool IsReusable
47 {
48 get{ return false;}
49 }
50 }
客戶端
建立一個WinForm窗口,里面加入一個WebBrowser連接到服務器端的Default.aspx頁面。
當按下Download按鍵時,系統就會利用TaskFactory.StartNew的方法建立異步線程,使用WebRequest方法向Handler.ashx發送請求。
接收到回傳流時,就會根據頭文件的內容判斷圖片的數量與每副圖片的長度,把二進制數據轉化為*.jpg文件保存。
系統利用TaskFactory.StartNew(action,cancellationToken) 方式異步調用GetImages方法進行圖片下載。
當用戶按下Cancel按鈕時,異步任務就會停止。值得注意的是,在圖片下載時調用了CancellationToken.ThrowIfCancellationRequested方法,目的在檢查並行任務的運行情況,在並行任務被停止時釋放出OperationCanceledException異常,確保用戶按下Cancel按鈕時,停止所有並行任務。
1 public partial class Form1 : Form
2 {
3 private CancellationTokenSource tokenSource = new CancellationTokenSource();
4
5 public Form1()
6 {
7 InitializeComponent();
8 ThreadPool.SetMaxThreads(1000, 1000);
9 }
10
11 private void downloadToolStripMenuItem_Click(object sender, EventArgs e)
12 {
13 Task.Factory.StartNew(GetImages,tokenSource.Token);
14 }
15
16 private void cancelToolStripMenuItem_Click(object sender, EventArgs e)
17 {
18 tokenSource.Cancel();
19 }
20
21 private void GetImages()
22 {
23 //發送請求,獲取輸出流
24 WebRequest webRequest = HttpWebRequest.Create("Http://localhost:5800/Handler.ashx");
25 Stream responseStream=webRequest.GetResponse().GetResponseStream();
26
27 byte[] responseByte = new byte[81960];
28 IAsyncResult result=responseStream.BeginRead(responseByte,0,81960,null,null);
29 int responseLength = responseStream.EndRead(result);
30
31 //獲取圖片數量
32 int imageCount = BitConverter.ToInt32(responseByte, 0);
33
34 //獲取每副圖片的長度
35 int[] lengths = new int[imageCount];
36 for (int n = 0; n < imageCount; n++)
37 {
38 int length = BitConverter.ToInt32(responseByte, (n + 1) * 4);
39 lengths[n] = length;
40 }
41 try
42 {
43 //保存圖片
44 for (int n = 0; n < imageCount; n++)
45 {
46 string path = string.Format("E:/My Projects/Example/Test/Images/pic{0}.jpg", n);
47 FileStream file = new FileStream(path, FileMode.Create, FileAccess.ReadWrite);
48
49 //計算字節偏移量
50 int offset = (imageCount + 1) * 4;
51 for (int a = 0; a < n; a++)
52 offset += lengths[a];
53
54 file.Write(responseByte, offset, lengths[n]);
55 file.Flush();
56
57 //模擬操作
58 Thread.Sleep(1000);
59
60 //檢測CancellationToken變化
61 tokenSource.Token.ThrowIfCancellationRequested();
62 }
63 }
64 catch (OperationCanceledException ex)
65 {
66 MessageBox.Show("Download cancel!");
67 }
68 }
69 }
7.4 並行查詢(PLINQ)
並行 LINQ (PLINQ) 是 LINQ 模式的並行實現,主要區別在於 PLINQ 嘗試充分利用系統中的所有處理器。 它利用所有處理器的方法,把數據源分成片段,然后在多個處理器上對單獨工作線程上的每個片段並行執行查詢, 在許多情況下,並行執行意味着查詢運行速度顯著提高。但這並不說明所有PLINQ都會使用並行方式,當系統測試要並行查詢會對系統性能造成損害時,那將自動化地使用同步執行。
在System.Linq.ParallelEnumerable類中,包含了並行查詢的大部分方法。
方法成員 |
說明 |
AsParallel |
PLINQ 的入口點。 指定如果可能,應並行化查詢的其余部分。 |
AsSequential(Of TSource) |
指定查詢的其余部分應像非並行 LINQ 查詢一樣按順序運行。 |
AsOrdered |
指定 PLINQ 應保留查詢的其余部分的源序列排序,直到例如通過使用 orderby(在 Visual Basic 中為 Order By)子句更改排序為止。 |
AsUnordered(Of TSource) |
指定查詢的其余部分的 PLINQ 不需要保留源序列的排序。 |
WithCancellation(Of TSource) |
指定 PLINQ 應定期監視請求取消時提供的取消標記和取消執行的狀態。 |
WithDegreeOfParallelism(Of TSource) |
指定 PLINQ 應當用來並行化查詢的處理器的最大數目。 |
WithMergeOptions(Of TSource) |
提供有關 PLINQ 應當如何(如果可能)將並行結果合並回到使用線程上的一個序列的提示。 |
WithExecutionMode(Of TSource) |
指定 PLINQ 應當如何並行化查詢(即使默認行為是按順序運行查詢)。 |
ForAll(Of TSource) |
多線程枚舉方法,與循環訪問查詢結果不同,它允許在不首先合並回到使用者線程的情況下並行處理結果。 |
Aggregate 重載 |
對於 PLINQ 唯一的重載,它啟用對線程本地分區的中間聚合以及一個用於合並所有分區結果的最終聚合函數。 |
7.4.1 AsParallel
通常想要實現並行查詢,只需向數據源添加 AsParallel 查詢操作即可。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var personList=GetPersonList().AsParallel()
6 .Where(x=>x.Age>30);
7 Console.ReadKey();
8 }
9
10 //模擬源數據
11 static IList<Person> GetPersonList()
12 {
13 var personList = new List<Person>();
14
15 var person1 = new Person();
16 person1.ID = 1;
17 person1.Name = "Leslie";
18 person1.Age = 30;
19 personList.Add(person1);
20 ...........
21 return personList;
22 }
23 }
7.4.2 AsOrdered
若要使查詢結果必須保留源序列排序方式,可以使用AsOrdered方法。
AsOrdered依然使用並行方式,只是在查詢過程加入額外信息,在並行結束后把查詢結果再次進行排列。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var personList=GetPersonList().AsParallel().AsOrdered()
6 .Where(x=>x.Age<30);
7 Console.ReadKey();
8 }
9
10 static IList<Person> GetPersonList()
11 {......}
12 }
7.4.3 WithDegreeOfParallelism
默認情況下,PLINQ 使用主機上的所有處理器,這些處理器的數量最多可達 64 個。
通過使用 WithDegreeOfParallelism(Of TSource) 方法,可以指示 PLINQ 使用不多於指定數量的處理器。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var personList=GetPersonList().AsParallel().WithDegreeOfParallelism(2)
6 .Where(x=>x.Age<30);
7 Console.ReadKey();
8 }
9
10 static IList<Person> GetPersonList()
11 {.........}
12 }
7.4.4 ForAll
如果要對並行查詢結果進行操作,一般會在for或foreach中執行,執行枚舉操作時會使用同步方式。
有見及此,PLINQ中包含了ForAll方法,它可以使用並行方式對數據集進行操作。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6 GetPersonList().AsParallel().ForAll(person =>{
7 ThreadPoolMessage(person);
8 });
9 Console.ReadKey();
10 }
11
12 static IList<Person> GetPersonList()
13 {.......}
14
15 //顯示線程池現狀
16 static void ThreadPoolMessage(Person person)
17 {
18 int a, b;
19 ThreadPool.GetAvailableThreads(out a, out b);
20 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
21 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
22 " CompletionPortThreads is :{5}\n",
23 person.ID, person.Name, person.Age,
24 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
25 Console.WriteLine(message);
26 }
27 }
運行結果
7.4.5 WithCancellation
如果需要停止查詢,可以使用 WithCancellation(Of TSource) 運算符並提供 CancellationToken 實例作為參數。
與第三節Task的例子相似,如果標記上的 IsCancellationRequested 屬性設置為 true,則 PLINQ 將會注意到它,並停止所有線程上的處理,然后引發 OperationCanceledException。這可以保證並行查詢能夠立即停止。
1 class Program
2 {
3 static CancellationTokenSource tokenSource = new CancellationTokenSource();
4
5 static void Main(string[] args)
6 {
7 Task.Factory.StartNew(Cancel);
8 try
9 {
10 GetPersonList().AsParallel().WithCancellation(tokenSource.Token)
11 .ForAll(person =>
12 {
13 ThreadPoolMessage(person);
14 });
15 }
16 catch (OperationCanceledException ex)
17 { }
18 Console.ReadKey();
19 }
20
21 //在10~50毫秒內發出停止信號
22 static void Cancel()
23 {
24 Random random = new Random();
25 Thread.Sleep(random.Next(10,50));
26 tokenSource.Cancel();
27 }
28
29 static IList<Person> GetPersonList()
30 {......}
31
32 //顯示線程池現狀
33 static void ThreadPoolMessage(Person person)
34 {
35 int a, b;
36 ThreadPool.GetAvailableThreads(out a, out b);
37 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
38 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
39 " CompletionPortThreads is :{5}\n",
40 person.ID, person.Name, person.Age,
41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
42 Console.WriteLine(message);
43 }
44 }
45
八、定時器與鎖
8.1定時器
若要長期定時進行一些工作,比如像郵箱更新,實時收聽信息等等,可以利用定時器Timer進行操作。
在System.Threading命名空間中存在Timer類與對應的TimerCallback委托,它可以在后台線程中執行一些長期的定時操作,使主線程不受干擾。
Timer類中最常用的構造函數為 public Timer( timerCallback , object , int , int )
timerCallback委托可以綁定執行方法,執行方法必須返回void,它可以是無參數方法,也可以帶一個object參數的方法。
第二個參數是為 timerCallback 委托輸入的參數對象。
第三個參數是開始執行前等待的時間。
第四個參數是每次執行之間的等待時間。
開發實例
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6
7 TimerCallback callback = new TimerCallback(ThreadPoolMessage);
8 Timer t = new Timer(callback,"Hello Jack! ", 0, 1000);
9 Console.ReadKey();
10 }
11
12 //顯示線程池現狀
13 static void ThreadPoolMessage(object data)
14 {
15 int a, b;
16 ThreadPool.GetAvailableThreads(out a, out b);
17 string message = string.Format("{0}\n CurrentThreadId is:{1}\n" +
18 " CurrentThread IsBackground:{2}\n" +
19 " WorkerThreads is:{3}\n CompletionPortThreads is:{4}\n",
20 data + "Time now is " + DateTime.Now.ToLongTimeString(),
21 Thread.CurrentThread.ManagedThreadId,
22 Thread.CurrentThread.IsBackground.ToString(),
23 a.ToString(), b.ToString());
24 Console.WriteLine(message);
25 }
26 }
注意觀察運行結果,每次調用Timer綁定的方法時不一定是使用同一線程,但線程都會是來自工作者線程的后台線程。
8.2 鎖
在使用多線程開發時,存在一定的共用數據,為了避免多線程同時操作同一數據,.NET提供了lock、Monitor、Interlocked等多個鎖定數據的方式。
8.2.1 lock
lock的使用比較簡單,如果需要鎖定某個對象時,可以直接使用lock(this)的方式。
1 private void Method()
2 {
3 lock(this)
4 {
5 //在此進行的操作能保證在同一時間內只有一個線程對此對象操作
6 }
7 }
如果操作只鎖定某段代碼,可以事先建立一個object對象,並對此對象進行操作鎖定,這也是.net提倡的鎖定用法。
1 class Control
2 {
3 private object obj=new object();
4
5 public void Method()
6 {
7 lock(obj)
8 {.......}
9 }
10 }
8.2.2 Montior
Montior存在於System.Thread命名空間內,相比lock,Montior使用更靈活。
它存在 Enter, Exit 兩個方法,它可以對對象進行鎖定與解鎖,比lock使用更靈活。
1 class Control
2 {
3 private object obj=new object();
4
5 public void Method()
6 {
7 Monitor.Enter(obj);
8 try
9 {......}
10 catch(Excetion ex)
11 {......}
12 finally
13 {
14 Monitor.Exit(obj);
15 }
16 }
17 }
18
使用try的方式,能確保程序不會因死鎖而釋放出異常!
而且在finally中釋放obj對象能夠確保無論是否出現死鎖狀態,系統都會釋放obj對象。
而且Monitor中還存在Wait方法可以讓線程等待一段時間,然后在完成時使用Pulse、PulseAll等方法通知等待線程。
8.2.3 Interlocked
Interlocked存在於System.Thread命名空間內,它的操作比Monitor使用更簡單。
它存在CompareExchange、Decrement、Exchange、Increment等常用方法讓參數在安全的情況進行數據交換。
Increment、Decrement 可以使參數安全地加1或減1並返回遞增后的新值。
1 class Example
2 {
3 private int a=1;
4
5 public void AddOne()
6 {
7 int newA=Interlocked.Increment(ref a);
8 }
9 }
Exchange可以安全地變量賦值。
1 public void SetData()
2 {
3 Interlocked.Exchange(ref a,100);
4 }
CompareExchange使用特別方便,它相當於if的用法,當a等於1時,則把100賦值給a。
1 public void CompareAndExchange()
2 {
3 Interlocked.CompareExchange(ref a,100,1);
4 }
結束語
熟悉掌握多線程開發,對提高系統工作效率非常有幫助,尤其是回調方法與最近火熱的並行編程更應該引起各位的重視。
在下用了較長的時間總結所用過的多線程開發方式,希望本篇文章能對各位的學習研究有所幫助,當中有所錯漏的地方敬請點評。
對 .NET 開發有興趣的朋友歡迎加入QQ群:230564952 共同探討 !
C#綜合揭秘
通過修改注冊表建立Windows自定義協議
Entity Framework 並發處理詳解
細說進程、應用程序域與上下文
細說多線程(上)
細說多線程(下)
細說事務
深入分析委托與事件