在移動交通流調查項目的一個算法分析程序中,碰到一個業務問題:用戶采集上傳的基站定位數據需要進行分析預處理,方案是先按預定格式解析文件並從中提取出成百上千個基站定位數據記錄,並合並相同的基站點,根據獲取到的基站位置信息作為參數,去請求google 基站定位 api,從而得到對應的基站定位經緯度等信息,接下來再加上華工的算法分析。
在執行華工算法分析邏輯之前,調用谷歌api這一步必需全部完成;網絡請求是個耗時的過程,故對每一個請求開啟單獨的線程(同時請求可能數百個,這里通過Semaphore信號量來控制每次發出請求的最大數,該部分的討論不再本話題之類)。
問題出來了,那么如何知道所有的網絡請求全部完成了,可以進行下一步算法分析呢?答案是利用前面講的ManualResetEvent來處理;於是有下面的寫法
1
2
3
4
5
|
//針對每個線程 綁定初始化一個ManualResetEvent實例
ManualResetEvent doneEvent =
new
ManualResetEvent(
false
);
//通過ThreadPool.QueueUserWorkItem(網絡請求方法HttpRequest,doneEvent ) 來開啟多線程
//將等待事件一一加入事件列表
|
1
2
3
4
5
6
7
8
9
10
11
12
|
List<ManualResetEvent> listEvent =
new
List<ManualResetEvent>();
for
(
int
i=0;i<請求線程數;i++){
listEvent.Add(doneEvent);
}
//主線程等待網絡請求全部完成
WaitHandle.WaitAll(listEvent.ToArray());
//....接下去的算法分析
//在網絡請求方法HttpRequest的子線程中調用
doneEvent.Set();
//通知主線程 本網絡請求已經完成
|
運行好像沒有問題,程序按原定計划執行;但是當線程數大於64個之后拋出異常
WaitHandles must be less than or equal to 64
原來WaitHandle.WaitAll(listEvent.ToArray()); 這里listEvent線程數不能超過64個
以前解決方法:
下面是吳建飛以前的方案:既然WaitHandle.WaitAll方法只能喚醒64個ManualResetEvent對象,那么就采用
1
|
List<List<ManualResetEvent>> _listLocEventList =
new
List<List<ManualResetEvent>>();
|
采用這種復雜集合;集合的每個元素也是一個集合(內部每個集合包含最大64個ManualResetEvent對象);和上面一樣 把每個線程相關的ManualResetEvent對象添加到該集合;
//主線程等待網絡請求全部完成
1
2
3
4
|
foreach
(List<ManualResetEvent> listEvent
in
_listLocEventList)
{
WaitHandle.WaitAll(listEvent.ToArray());
}
|
該方案運用起來比較復雜,而且會導致創建大量的ManualResetEvent對象;
現在的設計目標是這種對文件的分析是多任務同時進行的,也就是說會產生的ManualResetEvent對象List<List<ManualResetEvent>>.Size() * 任務數(N個文件上傳)
改進的解決方法:
原理:封裝一個ManualResetEvent對象,一個計數器current,提供SetOne和WaitAll方法;
主線程調用WaitAll方法使ManualResetEvent對象等待喚醒信號;
各個子線程調用setOne方法 ,setOne每執行一次current減1,直到current等於0時表示所有子線程執行完畢 ,調用ManualResetEvent的set方法,這時主線程可以執行WaitAll之后的步驟。
目標:減少ManualResetEvent對象的大量產生和使用的簡單性。
在這里我寫了個封裝類:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
/********************************************************************************
* Copyright © 2001 - 2010Comit. All Rights Reserved.
* 文件:MutipleThreadResetEvent.cs
* 作者:楊柳
* 日期:2010年11月13日
* 描述:封裝 ManualResetEvent ,該類允許一次等待N(N>64)個事件執行完畢
*
* 解決問題:WaitHandle.WaitAll(evetlist)方法最大只能等待64個ManualResetEvent事件
* *********************************************************************************/
using
System;
using
System.Collections.Generic;
using
System.Linq;
using
System.Text;
using
System.Threading;
namespace
TestMutipleThreadRestEvent
{
/// <summary>
/// 封裝ManualResetEvent
/// </summary>
public
class
MutipleThreadResetEvent : IDisposable
{
private
readonly
ManualResetEvent done;
private
readonly
int
total;
private
long
current;
/// <summary>
/// 構造函數
/// </summary>
/// <param name="total">需要等待執行的線程總數</param>
public
MutipleThreadResetEvent(
int
total)
{
this
.total = total;
current = total;
done =
new
ManualResetEvent(
false
);
}
/// <summary>
/// 喚醒一個等待的線程
/// </summary>
public
void
SetOne()
{
// Interlocked 原子操作類 ,此處將計數器減1
if
(Interlocked.Decrement(
ref
current) == 0)
{
//當所以等待線程執行完畢時,喚醒等待的線程
done.Set();
}
}
/// <summary>
/// 等待所以線程執行完畢
/// </summary>
public
void
WaitAll()
{
done.WaitOne();
}
/// <summary>
/// 釋放對象占用的空間
/// </summary>
public
void
Dispose()
{
((IDisposable)done).Dispose();
}
}
}
|
注釋寫的很清楚了:本質就是只通過1個ManualResetEvent 對象就可以實現同步N(N可以大於64)個線程
下面是測試用例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
using
System;
using
System.Collections.Generic;
using
System.Linq;
using
System.Text;
using
System.Threading;
namespace
TestMutipleThreadRestEvent
{
/// <summary>
/// 測試MutipleThreadResetEvent
/// </summary>
class
Program
{
static
int
i = 0;
/// <summary>
/// 主方法
/// </summary>
/// <param name="args">參數</param>
static
void
Main(
string
[] args)
{
//假設有100個請求線程
int
num = 100;
//使用 MutipleThreadResetEvent
using
(var countdown =
new
MutipleThreadResetEvent(num))
{
for
(
int
i=0;i<num;i++)
{
//開啟N個線程,傳遞MutipleThreadResetEvent對象給子線程
ThreadPool.QueueUserWorkItem(MyHttpRequest, countdown);
}
//等待所有線程執行完畢
countdown.WaitAll();
}
Console.WriteLine(
"所有的網絡請求以及完畢,可以繼續下面的分析..."
);
Console.ReadKey();
}
/// <summary>
/// 假設的網絡請求
/// </summary>
/// <param name="state">參數</param>
private
static
void
MyHttpRequest(
object
state)
{
// Thread.Sleep(1000);
Console.WriteLine(String.Format(
"哈哈:{0}"
,++i));
MutipleThreadResetEvent countdown = state
as
MutipleThreadResetEvent;
//發送信號量 本線程執行完畢
countdown.SetOne();
}
}
}
|
輸出: