C# 多線程小試牛刀


2019年6月28日更新

采用預先生成隨機數 + bitarray 來判斷重復 + 數組分段插入隔離進行插入 1000w的數據的不重復隨機數插入可以壓縮到 3 s 內。

前言

昨天在上班時瀏覽博問,發現了一個問題,雖然自己在 C# 多線程上沒有怎么嘗試過,看了幾遍 CLR 中關於 線程的概念和講解(后面三章)。也想拿來實踐實踐。問題定義是這樣的:

對於多線程不是很懂,面試的時候遇到一個多線程的題,不會做,分享出來,懂的大佬指點一下,謝謝

建一個winform窗體,在窗體中放上一個開始按鈕,一個停止按鈕,一個文本框,在窗體中聲明一個List 類型的屬性,點擊開始按鈕后開啟10個線程,所有線程同時不間斷的給List 集合中添加1-10000之間的隨機數,要求添加List 集合中的數字不能重復,並且實時在文本框中顯示集合的長度,當集合List 的長度等於1000時自動停止所有線程,如果中途點擊停止按鈕也停止所有線程,點擊開始又繼續執行。

我其實沒有完全實現了這位博問中提問的同學的需求,具體問題的來源可查看該地址 問題來源

開始嘗試

剛拿到這個需求的時候,映入我腦海里的是 Task, Threadpool,Concurrent,和 Lock 等概念,接下來就是組裝和編碼的過程了,首先理一理頭緒,

  • 生成隨機數
  • 插入到 List 中,且不能重復
  • 開啟多個線程同時插入。

首先是生成 隨機數,使用 System.Random 類來生成偽隨機數(這個其實性能和效率賊低,后面再敘述)

private int GenerateInt32Num()
{
    var num = random.Next(0, TOTAL_NUM);
    return num;
}

然后是插入到 List<Int32> 中的代碼,判斷是否 已經達到了 我們需要的 List 長度,如果已滿足,則退出程序。

private void AddToList(int num)
{
    if (numList.Count == ENDNUM)
    {
        return;
    }

    numList.Add(num);
}

如果是個 單線程的,按照上面那樣 while(true) 然后一直插入即可,可這個是個 多線程,那么需要如何處理呢?

我思考了一下,想到了之前在 CLR 中學到的 可以用 CancellationTokenSource 中的 Cancel 來通知 Task 來取消操作。所以現在的邏輯是,用線程池來實現多線程。然后傳入 CancellationTokenSource.Token 來取消任務。

最后用 Task.WhanAny() 來獲取到第一個到達此 Task 的 ID。

首先是建立 Task[] 的數組

internal void DoTheCompeteSecond()
{
    Task[] tasks = new Task[10];

    for (int i = 0; i < 10; ++i)
    {
        int num = i;
        tasks[i] = Task.Factory.StartNew(() => AddNumToList(num, cts), cts.Token);
    }

    Task.WaitAny(tasks);
}

然后 AddNumToList 方法是這樣定義的,

private void AddNumToList(object state, CancellationTokenSource cts)
{-
    Console.WriteLine("This is the {0} thread,Current ThreadId={1}",
                      state,
                      Thread.CurrentThread.ManagedThreadId);

    while (!cts.Token.IsCancellationRequested)
    {
        if (GetTheListCount() == ENDNUM)
        {
            cts.Cancel();
            Console.WriteLine("Current Thread Id={0},Current Count={1}",
                              Thread.CurrentThread.ManagedThreadId,
                              GetTheListCount());

            break;
        }
        var insertNum = GenerateInt32Num();
        if (numList.Contains(insertNum))
        {
            insertNum = GenerateInt32Num();
        }

        AddToList(insertNum);
    }
}

看起來是沒有什么問題的,運行了一下。得到了如下結果,

這應該是昨晚運行時得到的數據,當時也沒有多想,就貼了上去,回答了那位提問同學的問題。但是心里有一個疑惑,為什么會同時由 兩個 Thread 同時達到了該目標呢?

發現問題

今天早上到公司時,我又打開了這個 代碼,發現確實有點不對勁,於是就和我邊上 做 Go 語言開發的同學,問了問他,哪里出現了問題,他和我說:“你加了讀寫鎖了嗎?” 你這里有數據臟讀寫。心里面有了點眉目。

按照他說的,修改了一下 AddToList 里面的邏輯,這時候,確實解決了上面的問題,

private void AddToList(int num)
{
    rwls.EnterReadLock();
    if (numList.Count == ENDNUM)
        return;
    rwls.ExitReadLock();

    rwls.EnterWriteLock();
    numList.Add(num);
    rwls.ExitWriteLock();
}

得到的結果如下:

完整的代碼如下所示:

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CSharpFundamental
{
    class MultipleThreadCompete
    {
        List<int> numList = new List<int>();
        Random random = new Random();
        CancellationTokenSource cts = new CancellationTokenSource();
        private const int ENDNUM = 1000000;

        ReaderWriterLockSlim rwls = new ReaderWriterLockSlim();

        internal void DoTheCompeteSecond()
        {
            Stopwatch sw = new Stopwatch();
            sw.Start();
            Task[] tasks = new Task[100];

            for (int i = 0; i < 100; ++i)
            {
                int num = i;
                tasks[i] = Task.Run(() => AddNumToList(num, cts), cts.Token);
            }

            Task.WaitAny(tasks);

            Console.WriteLine("ExecuteTime={0}", sw.ElapsedMilliseconds / 1000);
        }

        private int GetTheListCount()
        {
            return numList.Count;
        }

        private void AddToList(int num)
        {
            rwls.EnterReadLock();
            if (numList.Count == ENDNUM)
                return;
            rwls.ExitReadLock();

            rwls.EnterWriteLock();
            numList.Add(num);
            rwls.ExitWriteLock();
        }

        private void AddNumToList(object state, CancellationTokenSource cts)
        {
            Console.WriteLine("This is the {0} thread,Current ThreadId={1}",
                state,
                Thread.CurrentThread.ManagedThreadId);

            while (!cts.Token.IsCancellationRequested)
            {
                try
                {
                    rwls.EnterReadLock();
                    if (numList.Count == ENDNUM)
                    {
                        cts.Cancel();
                        Console.WriteLine("Current Thread Id={0},Current Count={1}",
                            Thread.CurrentThread.ManagedThreadId,
                            GetTheListCount());
                        break;
                    }
                }
                finally
                {
                    rwls.ExitReadLock();
                }

                var insertNum = GenerateInt32Num();
                if (numList.Contains(insertNum))
                {
                    insertNum = GenerateInt32Num();
                }

                AddToList(insertNum);
            }
        }

        private int GenerateInt32Num()
        {
            return random.Next(1, ENDNUM);
        }
    }
}

這時候,那位 Go 語言的同學和我說,我們試試 1000w 的數據插入,看看需要多少時間?於是我讓他用 Go 語言實現了一下上面的邏輯,1000w數據用了 三分鍾,我讓他看看總共生成了多少隨機數,他查看了一下生成了 1億4千多萬的數據。

最開始我用上面的代碼來測,發現我插入 1000w 的數據,CPU 到100% 而且花了挺長時間,程序根本沒反應,查看了一下我判斷重復的語句numList.Contains()

底層實現的代碼為:

[__DynamicallyInvokable]
    public bool Contains(T item)
    {
        if ((object) item == null)
        {
            for (int index = 0; index < this._size; ++index)
            {
                if ((object) this._items[index] == null)
                    return true;
            }
            return false;
        }
        EqualityComparer<T> equalityComparer = EqualityComparer<T>.Default;
        for (int index = 0; index < this._size; ++index)
        {
            if (equalityComparer.Equals(this._items[index], item))
                return true;
        }
        return false;
    }

可想而知,如果數據量很大的話,這個循環不就 及其緩慢嗎?

我於是請教了那位 GO 的同學,判斷重復的邏輯用什么來實現的,他和我說了一個位圖 bitmap 的概念,

我用其重寫了一下判斷重復的邏輯,代碼如下:

int[] bitmap = new int[MAX_SIZE];

var index = num % TOTAL_NUM;
bitMap[index] = 1;

return bitMap[num] == 1;

在添加到 List 的時候,順便插入到 bitmap 中,判斷重復只需要根據當前元素的位置是否 等於 1 即可,

我修改代碼后,跑了一下 1000w 的數據用來 3000+ ms。

這時候,引起了他的極度懷疑,一向以高性能並發 著稱的 Go 速度竟然這么慢嗎?他一度懷疑我的邏輯有問題。

下午結束了一個階段的工作后,我又拾起了我上午寫的代碼,果不其然,發現了邏輯錯誤:

如下:

var insertNum = GenerateInt32Num();
if (numList.Contains(insertNum))
{
    insertNum = GenerateInt32Num();
}

生成隨機數這里,這里有個大問題,就是其實只判斷了一次,導致速度那么快,正確的寫法應該是

while (ContainsNum(currentNum))
{
    currentNum = GenerateInt32Num();
}

private int GenerateInt32Num()
{
    var num = random.Next(0, TOTAL_NUM);
    //Console.WriteLine(num);

    return num;
}

最后的代碼如下:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CSharpFundamental
{
    class MultipleThreadCompete
    {
        List<int> numList = new List<int>();
        Random random = new Random();
        CancellationTokenSource cts = new CancellationTokenSource();
        private const int TOTAL_NUM = 1000000;
        private const int CURRENT_THREAD_COUNT = 35;

        ReaderWriterLockSlim rwls = new ReaderWriterLockSlim();

        int[] bitMap = new int[TOTAL_NUM];

        internal void DoTheCompete()
        {
            //ThreadPool.SetMinThreads(CURRENT_THREAD_COUNT, CURRENT_THREAD_COUNT);
            Stopwatch sw = new Stopwatch();
            sw.Start();
            Task[] tasks = new Task[CURRENT_THREAD_COUNT];

            for (int i = 0; i < CURRENT_THREAD_COUNT; ++i)
            {
                int num = i;
                tasks[i] = Task.Run(() => ExecuteTheTask(num, cts), cts.Token);
            }

            Task.WaitAny(tasks);

            Console.WriteLine("ExecuteTime={0}", sw.ElapsedMilliseconds);
        }

        private int GetTheListCount()
        {
            return numList.Count;
        }

        private void AddToList(int num)
        {
            if (numList.Count == TOTAL_NUM)
                return;
            numList.Add(num);

            var index = num % TOTAL_NUM;
            bitMap[index] = 1;
        }

        private void ExecuteTheTask(object state, CancellationTokenSource cts)
        {
            Console.WriteLine("This is the {0} thread,Current ThreadId={1}",
                state,
                Thread.CurrentThread.ManagedThreadId);

            while (!cts.Token.IsCancellationRequested)
            {
                try
                {
                    rwls.EnterReadLock();
                    if (numList.Count == TOTAL_NUM)
                    {
                        cts.Cancel();
                        Console.WriteLine("Current Thread Id={0},Current Count={1}",
                            Thread.CurrentThread.ManagedThreadId,
                            GetTheListCount());
                        break;
                    }
                }
                finally
                {
                    rwls.ExitReadLock();
                }

                var currentNum = GenerateInt32Num();

                while (ContainsNum(currentNum))
                {
                    currentNum = GenerateInt32Num();
                }

                rwls.EnterWriteLock();
                AddToList(currentNum);
                rwls.ExitWriteLock();
            }
        }

        private int GenerateInt32Num()
        {
            var num = random.Next(0, TOTAL_NUM);
            //Console.WriteLine(num);

            return num;
        }

        private bool ContainsNum(int num)
        {
            rwls.EnterReadLock();
            var contains = bitMap[num] == 1;
            rwls.ExitReadLock();

            return contains;
        }
    }
}

結果如下:

但是這個代碼執行 1000w的數據需要好久。 這個問題繼續研究。

源碼地址:https://github.com/doublnt/dotnetcore/tree/master/CSharpFundamental

歡迎大佬指點,還望不吝賜教。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM