使用new Thread來新建一個線程,但新建線程需要內存和CPU上下文切換的開銷,200,000個周期,銷毀線程也需要100,000個周期


C#並行庫(TaskParallelLibrary)用法小結

.NET 4.5並行庫(TaskParallelLibrary)

 

 也許C和C++的程序員剛剛開始寫C#還習慣於new Thread來新建一個線程,但新建線程需要內存和CPU上下文切換的開銷,200,000個周期,銷毀線程也需要100,000個周期;所以還需要實現一個線程池Threadpool。自從有了並行庫(TaskParallelLibrary),這些都不需要了。使用Task.Factory.StartNew(() => DoSomething(item));可以創建一個線程並自動由線程池管理。寫法非常簡單,但其實里面誤區很多:

1. Task.Factory.StartNew(() => DoSomeWork())不是阻塞的

下面的代碼會先輸出ddd,因為Task.Factory.Startnew不阻塞:

1
2
var  task = Task.Factory.StartNew(() => Console.WriteLine( "eee" ));
Console.WriteLine( "ddd" );

如果你想阻塞,應該加上wait,改為這樣:

1
2
var  task = Task.Factory.StartNew(() => Console.WriteLine( "eee" )).Wait();
Console.WriteLine( "ddd" );

同樣,Task.Factory.StartNew(() => DoSomeWork()).ContinueWith…也是是異步的,想讓它阻塞,應該加上wait,這樣寫:

1
2
var  task = Task.Factory.StartNew(() => return  "" ).ContinueWith( s => { Console.WriteLine(s.Result);  }).Wait();
Console.WriteLine( "ddd" );

2. Task.Factory.StartNew(() => DoSomeWork()).ContinueWith…沒有運行在新的線程里

1
2
3
4
5
var  task = Task.Factory.StartNew(() => return  "" ).ContinueWith( s =>
  {
      DoSomething2(s.Result);
   }).Wait();
Console.WriteLine( "ddd" );

注意上面的DoSomething2()是運行在主線程,而不是在新的線程里

3. Parallel.ForEach為何導致內存溢出

如果對一個10000個item的collection使用Parallel.ForEach,可以想象會發生什么。TPL默認是Parallel.ForEach使用場景是對CPU敏感的,TPL會持續創建線程,直到你的CPU利用率達100%;問題是你的使用場景如果不是CPU敏感的,例如是I/O敏感的,TPL想盡可能的利用你的CPU,所以檢測你的CPU利用率,如果還不是100%就會一直創建線程....直到內存耗盡。所以,使用要注意使用場景十分CPU敏感的,另外可以加一個參數來限制TPL線程的創建:

1
2
3
4
5
6
Parallel.ForEach(items,
                new  ParallelOptions
                {
                    MaxDegreeOfParallelism = 4
                },
                 item => DoSomething(item));

ParallelOptions.MaxDegreeOfParallelism參數含義:

 

If your task is CPU-bound then you should see a pattern like this on a quad-core system:

 

  • ParallelOptions.MaximumDegreeOfParallelism = 1: use one full CPU or 25% CPU utilization
  • ParallelOptions.MaximumDegreeOfParallelism = 2: use two CPUs or 50% CPU utilization
  • ParallelOptions.MaximumDegreeOfParallelism = 4: use all CPUs or 100% CPU utilization

4. 如何等待Parallel.ForEach運行都結束

1
2
Parallel.ForEach<Item>(items, item => DoSomething(item));
Console.WriteLine( "ddd" );

是阻塞的,所以以上代碼會在最后輸出ddd。

如果是等多個Task,可以這樣寫:

1
2
3
4
var  task1 = Task.Factory.StartNew(() => DoSomeWork());
var  task2 = Task.Factory.StartNew(() => DoSomeWork());
var  task3 = Task.Factory.StartNew(() => DoSomeWork());
Task.WaitAll(task1, task2, task3);

或者這樣寫:

1
2
3
4
5
6
7
Task.Factory.ContinueWhenAll( new [] { task1, task2, task3 }, tasks =>
{
     foreach  (Task< string > task in  tasks)
     {
         Console.WriteLine(task.Result);
     }
});

 

5. Task.Factory.StartNew和Parallel.ForEach可以嵌套使用嗎

都可以嵌套使用,例如:

1
2
3
var  task1 = Task.Factory.StartNew( () => Parallel.ForEach<Item>(items, item => DoSomething(item)));
var  task2 = Task.Factory.StartNew( () => Parallel.ForEach<Item>(items2, item => DoSomething2(item)));
Task.WaitAll(task1, task2);

6. Thread.Sleep還需要嗎

以前,我們輪詢的時候常常喜歡這樣的寫法:

1
2
3
4
5
6
while ( true )
{
   doSomework();
   
   Thread.Sleep(1000);
}

 

這是一種代碼的壞味道,Stackoverflow的討論在這兒,解決方法是用WaitEvent替代,當然在C#中還是推薦用BlockingCollection替代。

6. TPL中閉包的陷阱

例如在下面的代碼中 counter++存在線程不安全的問題。

1
2
3
4
5
6
7
8
9
10
11
12
13
int  counter = 0;
 
Task.Factory.StartNew( () =>
  Parallel.ForEach(items,
                new  ParallelOptions
                {
                    MaxDegreeOfParallelism = 4
                },
                 item => {
                    DoSomething(item);
                    counter++;
                    });
  );

應該改為:

1
Interlocked.Increment( ref  successCount);

7. Lock鎖帶來的性能問題

性能問題首先要診斷,例如用條件編譯打印出線程id和運行時序,可以知道所有線程的運行先后次序和等待情況。還可以借助工具來調試多線程問題。這里要說的鎖的問題。如果你的程序用Parallel.ForEach貌似是並發的,但如果有用到Lock,那可能你所有的線程都在等待,性能將是一塌糊塗的。所以最好的方法是避免鎖,保證Parallel.ForEach里面每一個對象不會用到競爭的資源/例如修改同一個對象。退而求其次的是用鎖,但要非常小心。例如,lock(this),lock(typeof(mytype)),lock(“mylock”),如果lock的是public訪問的,或者鎖名字一樣,將會造成問題。還有的人干脆來個大括號,一整段全都鎖住。死鎖有時候很難調試發現診斷,下面的代碼有死鎖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// thread 1
lock ( typeof ( int )) {
   Thread.Sleep(1000);
   lock ( typeof ( float )) {
     Console.WriteLine( "Thread 1 got both locks" );
   }
 
}
 
// thread 2
lock ( typeof ( float )) {
   Thread.Sleep(1000);
   lock ( typeof ( int )) {
     Console.WriteLine( "Thread 2 got both locks" );
   }
}

 

8. TaskFactory.Startnew和異步async/await的不同

1
2
3
var  Data = await Task.WhenAll(WebService1.Call(),
     WebService2.Call(),
     WebService3.Call());

關於TaskFactory.Startnew和異步async/await的不同,下面兩文章已經講的非常清楚了:

在下面的情況下,推薦使用Task.Factory.FromAsync()因為異步I/O比同步的CPU等待等有效,特別是對於獲取I/O的高伸縮性。

1
2
3
4
5
6
7
8
9
10
11
12
NetworkStream stream;
byte [] data;
int  bytesRead;
 
//using FromAsync
Task< int > readChunk = Task< int >.Factory.FromAsync (
       stream.BeginRead, stream.EndRead,
       data, bytesRead, data.Length - bytesRead, null );
 
//using StartNew with blocking version
Task< int > readChunk2 = Task< int >.Factory.StartNew(() =>
       stream.Read(data, bytesRead, data.Length - bytesRead));

9. 其它資源

 
分類:  C#


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM