由一篇文章引發的思考——多線程處理大數組


今天領導給我們發了一篇文章文章,讓我們學習一下。

文章鏈接:TAM - Threaded Array Manipulator

這是codeproject上的一篇文章,花了一番時間閱讀了一下。文章主要是介紹當單線程處理大量數組遇到性能瓶頸時,使用多線程的方式進行處理,可以縮短數組的處理時間。

看了這篇文章后,感覺似曾相識,很多次,當我想要處理大數組時,我就會進行構思,然后想出的解決方案,與此文章中介紹的方案非常的相似。但是說來慚愧,此文章的作者有了構思后便動手寫出了實現代碼,然后還進行了性能測試,而我每次只是構思,覺得我能想出來就可以了,等到真正用的時候再把它寫出來就行了。事實上,我大概已經構思過很多次了,但是還從來沒有寫過,直到看到這篇文章,我才下定決心,一定要將這個思路整理一遍。

當單線程處理大數組遇到性能瓶頸時應該怎樣處理

雖然科技一直在進步,CPU的處理能力也一直在提高,但是當我們進入大數據時代后,CPU每秒鍾都會面臨着大量的數據需要處理,這個時候CPU的處理能力可能就會成為性能瓶頸。這是我們就要選擇多核多CPU了,編程中也就是使用多線程進行處理。

首先看下單線程處理的例子

static void Main(string[] args)
{
    int count = 100000000;
    double[] arrayForTest = new double[count];
    Stopwatch watch = new Stopwatch();
    watch.Start();
    for (int i = 0; i < arrayForTest.Length; i++)
    {
        arrayForTest[i] = MathOperationFunc(arrayForTest[i]);
    }
    watch.Stop();
    Console.WriteLine("經過 " + arrayForTest.Length + " 次循環共消耗時間 " + (watch.ElapsedMilliseconds / 1000.0) + " s");
}

static double MathOperationFunc(double value)
{
    return Math.Sin(Math.Sqrt(Math.Sqrt((double)value * Math.PI)) * 1.01);
}

單線程處理的耗時

單線程測試

這個單線程的例子中對一個有10000000個元素的數組中的每個元素進行了數學計算,執行完畢共計耗時5.95秒。

然后看兩個線程處理的例子

static void Main(string[] args)
{
    //四線程測試
    int threadCount = 2;
    Thread[] threads = new Thread[threadCount];
    for (int i = 0; i < threadCount; i++)
    {
        threads[i] = new Thread(ForTestInThread);
        threads[i].Name = threadCount + "線程測試" + (i + 1);
        threads[i].Start();
    }
}
//工作線程
static void ForTestInThread()
{
    int count = 50000000;
    double[] arrayForTest = new double[count];
    Stopwatch watch = new Stopwatch();
    watch.Start();
    for (int i = 0; i < arrayForTest.Length; i++)
    {
        arrayForTest[i] = MathOperationFunc(arrayForTest[i]);
    }
    watch.Stop();
    Console.WriteLine("線程:" + Thread.CurrentThread.Name + ",經過 " + arrayForTest.Length + " 次循環共消耗時間 " + (watch.ElapsedMilliseconds / 1000.0) + " s");
}

//數據計算
static double MathOperationFunc(double value)
{
    return Math.Sin(Math.Sqrt(Math.Sqrt((double)value * Math.PI)) * 1.01);
}

兩個線程測試耗時

雙線程測試

我們再來看一下四個線程的例子

static void Main(string[] args)
{
    //四線程測試
    int threadCount = 4;
    Thread[] threads = new Thread[threadCount];
    for (int i = 0; i < threadCount; i++)
    {
        threads[i] = new Thread(ForTestInThread);
        threads[i].Name = threadCount + "線程測試" + (i + 1);
        threads[i].Start();
    }
}
//工作線程
static void ForTestInThread()
{
    int count = 25000000;
    double[] arrayForTest = new double[count];
    Stopwatch watch = new Stopwatch();
    watch.Start();
    for (int i = 0; i < arrayForTest.Length; i++)
    {
        arrayForTest[i] = MathOperationFunc(arrayForTest[i]);
    }
    watch.Stop();
    Console.WriteLine("線程:" + Thread.CurrentThread.Name + ",經過 " + arrayForTest.Length + " 次循環共消耗時間 " + (watch.ElapsedMilliseconds / 1000.0) + " s");
}

//數據計算
static double MathOperationFunc(double value)
{
    return Math.Sin(Math.Sqrt(Math.Sqrt((double)value * Math.PI)) * 1.01);
}

四個線程測試耗時

四線程測試

由上面的測試中可以看到,隨着線程數的增多,任務被分解后每個線程執行的任務耗時由原來的 6秒 逐漸降到 2秒 左右,由此我們可以猜想當所有線程同時執行的時候,那么總任務的耗時就會下降,接下來讓我們來進行更精確的測試。

Thread.Join方法簡介

進行多線程測試時,經常會遇到這樣的問題:主線程中如何等待所有線程執行結束后,再執行后續任務。

錯誤的做法

Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++)
{
    int beginIndex = i*arrayForTest.Length/threadCount;
    int length = arrayForTest.Length/threadCount;
    threads[i] = new Thread(WorkerThread);
    var arg = new Tuple<double[], int, int>(arrayForTest, beginIndex, length);
    threads[i].Name = threadCount + "線程測試" + (i + 1).ToString();
    threads[i].Start(arg);
    //等待所有線程結束 
    threads[i].Join();
}

這么做實際上所有的子線程均是串行執行的,並沒有達到並行的效果。

正確的做法

Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++)
{
    int beginIndex = i*arrayForTest.Length/threadCount;
    int length = arrayForTest.Length/threadCount;
    threads[i] = new Thread(WorkerThread);
    var arg = new Tuple<double[], int, int>(arrayForTest, beginIndex, length);
    threads[i].Name = threadCount + "線程測試" + (i + 1).ToString();
    threads[i].Start(arg);
}
//等待所有線程結束
foreach (var thread in threads)
{
    thread.Join();
}

多線程處理大數組的實現

了解了Thread.Join后,就可以進行多線程處理大數組的代碼編寫了:

class Program
{
    static void Main(string[] args)
    {
        int count = 100000000;
        double[] arrayForTest = new double[count];
        Stopwatch totalWatch = new Stopwatch();
        totalWatch.Start();
        ThreadTest(arrayForTest, 2);
        totalWatch.Stop();
        Console.WriteLine("總任務,經過 " + arrayForTest.Length + " 次循環共消耗時間 " + (totalWatch.ElapsedMilliseconds / 1000.0) + " s");
    }

    //大循環測試
    static void ForTest(double[] arrayForTest, int beingIndex, int offset, Func<double, double> func)
    {
        for (int i = beingIndex; i < beingIndex + offset; i++)
        {
            arrayForTest[i] = func(arrayForTest[i]);
        }
    }

    //數學計算
    static double MathOperationFunc(double value)
    {
        return Math.Sin(Math.Sqrt(Math.Sqrt((double)value * Math.PI)) * 1.01);
    }

    static void ThreadTest(double[] arrayForTest, int threadCount)
    {
        //啟動線程
        Thread[] threads = new Thread[threadCount];
        for (int i = 0; i < threadCount; i++)
        {
            //為每個線程分配任務
            int beginIndex = i*arrayForTest.Length/threadCount;
            int length = arrayForTest.Length/threadCount;
            threads[i] = new Thread(WorkerThread);
            var arg = new Tuple<double[], int, int>(arrayForTest, beginIndex, length);
            threads[i].Name = threadCount + "線程測試" + (i + 1).ToString();
            threads[i].Start(arg);
            threads[i].Join();
        }
        //等待所有線程結束
        foreach (var thread in threads)
        {
            thread.Join();
        }
    }

    //工作線程
    static void WorkerThread(object arg)
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        var argArray = arg as Tuple<double[], int, int>;
        if (argArray == null)
            return;
        ForTest(argArray.Item1, argArray.Item2, argArray.Item3, MathOperationFunc);
        watch.Stop();
        Console.WriteLine("線程:" + Thread.CurrentThread.Name + ",經過 " + argArray.Item3 + " 次循環共消耗時間 " + (watch.ElapsedMilliseconds/1000.0) + " s");
    }
}

這樣多線程處理大數組的功能代碼就編寫完成了,那么性能是如何呢,用事實說話,效果如下:

多線程方法測試

由圖可以看出,將一個大任務分解到兩個線程中去執行后,大任務總體的執行時間會縮短,但是與兩個線程中耗時最長的線程的執行時間有關。

同時執行耗時由原來的6秒逐漸降到2秒左右。可見在多核的機器上,多線程是可以提高性能的。

所以當單線程處理大數組遇到性能瓶頸時可以考慮通過多線程來處理。

既然這個多線程處理大數組的功能效果非常好,那么何不把它封裝為一個類,添加到自己的類庫中,這樣就可以隨時使用了:

class BigArrayFor
{
    /// <summary>
    /// 執行任務時,使用的線程數
    /// </summary>
    public int ThreadCount { get; set; }

    /// <summary>
    /// 處理大數組中每個元素的方法
    /// </summary>
    public Func<double, double> ForFunc { get; private set; }

    /// <summary>
    /// 需要處理的大數組
    /// </summary>
    public double[] ArrayForTest { get; private set; }

    /// <summary>
    /// 實例化處理大數組的類
    /// </summary>
    /// <param name="arrayForTest">需要處理的大數組</param>
    /// <param name="forFunc">處理大數組中每個元素的方法</param>
    public BigArrayFor(double[] arrayForTest, Func<double, double> forFunc)
    {
        if (arrayForTest == null || forFunc == null)
        {
            throw new ArgumentNullException();
        }
        ThreadCount = 4;
        ForFunc = forFunc;
        ArrayForTest = arrayForTest;
    }

    /// <summary>
    /// 開始處理大數組
    /// </summary>
    public void Run()
    {
        //啟動線程
        Thread[] threads = new Thread[ThreadCount];
        for (int i = 0; i < ThreadCount; i++)
        {
            //為每個線程分配任務
            int beginIndex = i * (ArrayForTest.Length / ThreadCount);
            int length = ArrayForTest.Length / ThreadCount;
            threads[i] = new Thread(WorkerThread);
            var arg = new Tuple<double[], int, int>(ArrayForTest, beginIndex, length);
            threads[i].Name = ThreadCount + "線程測試" + (i + 1);
            threads[i].Start(arg);
        }
        //等待所有線程結束
        foreach (var thread in threads)
        {
            thread.Join();
        }
    }

    private void WorkerThread(object arg)
    {
        var argArray = arg as Tuple<double[], int, int>;
        if (argArray == null)
            return;
        ForTest(argArray.Item1, argArray.Item2, argArray.Item3, ForFunc);
    }
    //大循環測試
    private void ForTest(double[] arrayForTest, int beingIndex, int offset, Func<double, double> func)
    {
        for (int i = beingIndex; i < beingIndex + offset; i++)
        {
            arrayForTest[i] = func(arrayForTest[i]);
        }
    }

}

好了,大數組循環類完成了,到目前為止,最多也只測試過4個線程同時處理大數組的效果,那么線程數繼續增多,是不是執行時間會隨之縮短呢,萬事俱備,讓我們開始更詳細的測試吧

static void Main(string[] args)
{

    //多線程操作大數組
    int count = 100000000;
    double[] arrayForTest = new double[count];
    //一個線程
    ThreadTest(arrayForTest, 1);
    //兩個線程
    ThreadTest(arrayForTest, 2);
    //四個線程
    ThreadTest(arrayForTest, 4);
    //八個線程
    ThreadTest(arrayForTest, 8);
    //十六個線程
    ThreadTest(arrayForTest, 16);
    //二十五個線程
    ThreadTest(arrayForTest, 25);
    //三十二個線程
    ThreadTest(arrayForTest, 32);
}

static void ThreadTest(double[] arrayForTest, int threadCount)
{
    BigArrayFor bigArrayFor = new BigArrayFor(arrayForTest, MathOperationFunc);
    bigArrayFor.ThreadCount = threadCount;
    Stopwatch totalWatch = new Stopwatch();
    totalWatch.Start();
    bigArrayFor.Run();
    totalWatch.Stop();
    Console.WriteLine(bigArrayFor.ThreadCount + " 個線程,經過 " + arrayForTest.Length + " 次循環,共消耗時間 " + (totalWatch.ElapsedMilliseconds / 1000.0) + " s");
    Console.WriteLine();
}

然后看測試效果

多線程_類測試

我們可以看到,隨着線程數量的增多,處理數組所需的總體時間並不是隨着線性的縮短,這是因為當線程數量超過CPU的核數后,會增加很多的線程調度的時間,當線程超過一定數量后,性能反而會下降。

總結

在多核機器上,當單線程處理大數組遇到性能瓶頸時,可以考慮使用多線程進行處理,但是線程數量要適量,否則會因為線程調度導致性能下降。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM