c#執行並行任務之Parallel與TaskFactory


任務:幾千條(大量)數據往服務器數據庫填寫。要求單開線程執行,分割成小數據包,多線程運行。

實現方法:Parallel與TaskFactory都可以。

主要代碼:

Parallel:

Barrier _bar;
int _maxLength = 20, _maxChannel = 2;//同時最多2條通道,每條通道最多20個數據
bool _isCancel = false;
private void btnWrite_Click(object sender, EventArgs e)
{
    var tmpEmails = _emails.Where(x => !x.Value).Select(x => x.Key).ToList();
    var state = 0;

    _isCancel = false;
    SetControlEnable(false);
    lblProgress.Text = "* 已完成 0%";
    var channels = (tmpEmails.Count / _maxLength) + ((tmpEmails.Count % _maxLength > 0) ? 1 : 0);//總共多少條通道

    var times = (channels / _maxChannel) + ((channels % _maxChannel > 0) ? 1 : 0);//單服務器分多次
    new Action(() =>
    {
        for (int j = 0; j < times; j++)
        {
            if (_isCancel)
            {
                MessageBox.Show("任務取消!");
                break;
            }
            var currChannel = Math.Min(_maxChannel, (channels - j * _maxChannel));//兩者取其小的
            _bar = new Barrier(currChannel);//根據次數設置柵欄
            var tasks = new Action[currChannel];
            for (int i = 0; i < currChannel; i++)
            {
                var subData = tmpEmails.Skip((i + j * _maxChannel) * _maxLength).Take(_maxLength).ToList();
                tasks[i] = () =>
                {
                    if (_isCancel) return;
                    var resMsg = 0;
                    Connect2WCF.RunSync(sc => resMsg = sc.UpdateMailState(subData, state));
                    if (resMsg == -1)
                        MessageBox.Show("保存失敗了?詳情可以查數據庫日志表");
                    else if (resMsg == 0)
                        subData.ForEach(one => _emails[one] = true);//標記已經完成的。
                    new Action(() => txtEmails.Text = string.Join("\r\n", _emails.Where(x => !x.Value).Select(x => x.Key))).InvokeRun(this);
                    _bar.SignalAndWait();
                };
            }
            Parallel.Invoke(tasks);
            new Action(() => lblProgress.Text = "* 已完成 " + ((100 * (j + 1) / times)) + "%").InvokeRun(this);
        }
        new Action(() => SetControlEnable(true)).InvokeRun(this);
    }).RunThread();
}

用Barrier和Parallel.Invoke結合來實現分割小數據包,每次用兩個線程,每個線程傳遞20條數據,兩個線程的數據都完成后,刷新完成的進度。isCancel作為取消操作的開關。實現的效果較下面的TaskFactory好。

TaskFactory:

CancellationTokenSource cts = new CancellationTokenSource();
int maxLength = 20, maxChannel = 2;//同時最多2條通道,每條通道最多20個數據
private void btnWrite_Click(object sender, EventArgs e)
{
    cts = new CancellationTokenSource();
    var tmpEmails = _emails.Where(x => !x.Value).Select(x => x.Key).ToList();
    var state = 0;

    SetControlEnable(false);
    lblProgress.Text = "* 已完成 0%";
    var channels = (tmpEmails.Count / maxLength) + ((tmpEmails.Count % maxLength > 0) ? 1 : 0);//總共多少條通道

    var times = (channels / maxChannel) + ((channels % maxChannel > 0) ? 1 : 0);//單服務器分多次
    Action<List<string>, CancellationToken> doSave = (data, ct) =>
    {
        if (ct.IsCancellationRequested) return;
        var msg = 0;
        Connect2WCF.RunSync(sc => msg = sc.UpdateMailState(data, state));
        if (msg == -1)
            MessageBox.Show("保存失敗了?詳情可以查數據庫日志表");
        else if (msg == 0)
            data.ForEach(one => _emails[one] = true);//標記已經完成的。
        new Action(() => txtEmails.Text = string.Join("\r\n", _emails.Where(x => !x.Value).Select(x => x.Key))).InvokeRun(this);
    };

    for (int j = 0; j < times; j++)
    {
        int k = j;
        if (cts.Token.IsCancellationRequested)
        {
            MessageBox.Show("任務取消!");
            break;
        }
        var currChannel = Math.Min(maxChannel, (channels - j * maxChannel));//兩者取其小的

        TaskFactory taskFactory = new TaskFactory();
        Task[] tasks = new Task[currChannel];
        for (int i = 0; i < currChannel; i++)
        {
            var subData = tmpEmails.Skip((i + j * maxChannel) * maxLength).Take(maxLength).ToList();
            tasks[i] = new Task(() => doSave(subData, cts.Token), cts.Token);
        }
        taskFactory.ContinueWhenAll(tasks,
            x => new Action(() => lblProgress.Text = "* 已完成 " + ((100 * (k + 1) / times)) + "%").InvokeRun(this), CancellationToken.None);
        Array.ForEach(tasks, x => x.Start());
    }
    SetControlEnable(true);
}

用TaskFactory和CancellationTokenSource結合來實現,在保存修改數據上,實現的效果和上面的方法差不多,但是在中間取消的效果上差很多,取消后,不會有“任務取消”的彈框。后台的執行邏輯猜測是這樣:由於Task是單開線程跑,所以在btn的事件中, 所有Tasks和TaskFactory的聲明基本上是很快就執行完成了的(電腦執行速度來看可能是一瞬間)。至於保存數據的代碼,則在每個Task的后台線程中各自執行,此時操作的時間早已經跳出了btn的事件函數,於是,點擊取消之后,由於btn的事件函數早已執行完,因此不會出現"任務取消"的彈框。而每個Task的執行受到線程個數的限制以及每個TaskFactory的ContinueWhenAll函數的監視,它們是有先后順序但是卻又無序地執行。點擊取消后,可能有幾個線程正在執行保存數據的任務,已經跳過了cancel的判斷,所以取消的命令不會立刻反應到后台執行中,會有一部分任務在取消后,仍然在運行。而剩下的其他任務會判斷cancel之后取消。由於線程的執行速度不是固定的,因此,小數據包保存執行的順序雖然大概按照增序執行,但是細節的排序可能有些插隊。

所以,總體而言TaskFactory的執行順序不可控。斷點不可控。而parallel.Invoke函數只有在傳入的Action[]全部執行完之后,才會返回,所以有效的保證了大層面的執行順序。至於Action[]這個隊列執行的順序,在Parallel里面也是不可控的。

 

補充:4092條數據,開啟一個通道時,TaskFactory:Parallel = 19:25;

開啟5個通道時,多次測試的結果為TaskFactory:Parallel = {18,16,15}:{19,16,15},速度差不多。

一個明顯的現象:在數據很多的時候,可以清晰的看到TaskFactory中已完成的百分數出現忽大忽小的情況。例如:1,4,7,12,17,6,12,19,23...

另外,Parallel剛開始執行時,有明顯的停頓感,猜測可能是啟動並行時產生的效率損耗。

 

如果希望能夠操作過程中能暫停處理,可以使用Parallel,它有一個執行主線程,方便隨時停止。如果沒有暫停需要,而且電腦的核心數不多(只有一個)時,可以考慮用TaskFactory,效率要明顯高於Parallel。

代碼文件


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM