關於並發下內存及CPU使用情況的思考


鑒於昨天的文章<<使用Interlocked在多線程下進行原子操作,無鎖無阻塞的實現線程運行狀態判斷>>里面有一個封裝好的無鎖的類庫可以判斷並發下的結束狀況,我們可以完成並發時,以及並發的同時做一些事,因此,今天我做了個小demo:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp6
{
    
    public enum CoordinationStatus
    {
        AllDone,
        Timeout,
        Cancel
    };

    public sealed class AsyncCoordinator
    {
        private Int32 m_opCount = 1;        // Decremented when AllBegun calls JustEnded
        private Int32 m_statusReported = 0; // 0=false, 1=true
        private Action<CoordinationStatus> m_callback;
        private Timer m_timer;

        // This method MUST be called BEFORE initiating an operation
        public void AboutToBegin(Int32 opsToAdd = 1)
        {
            Interlocked.Add(ref m_opCount, opsToAdd);
        }

        // This method MUST be called AFTER an operations result has been processed
        public void JustEnded()
        {
            if (Interlocked.Decrement(ref m_opCount) == 0)
                ReportStatus(CoordinationStatus.AllDone);
        }

        // This method MUST be called AFTER initiating ALL operations
        public void AllBegun(Action<CoordinationStatus> callback, Int32 timeout = Timeout.Infinite)
        {
            m_callback = callback;
            // 是否需要永遠運行
            if (timeout != Timeout.Infinite)
            {
                // 在指定的時間點(dueTime) 調用回調函數,隨后在指定的時間間隔(period)調用回調函數
                m_timer = new Timer(TimeExpired, null, timeout, Timeout.Infinite);
            }
            JustEnded();
        }

        // 處理過時的線程
        private void TimeExpired(Object o)
        {
            ReportStatus(CoordinationStatus.Timeout);
        }

        public void Cancel()
        {
            if (m_callback == null)
                throw new InvalidOperationException("Cancel cannot be called before AllBegun");
            ReportStatus(CoordinationStatus.Cancel);
        }

        private void ReportStatus(CoordinationStatus status)
        {
            if (m_timer != null)
            {  // If timer is still in play, kill it
                Timer timer = Interlocked.Exchange(ref m_timer, null);
                if (timer != null) timer.Dispose();
            }

            // If status has never been reported, report it; else ignore it
            if (Interlocked.Exchange(ref m_statusReported, 1) == 0)
                m_callback(status);
        }
    }

    

    class Program
    {
        static AsyncCoordinator m_ac = new AsyncCoordinator();
        static void Main(string[] args)
        {
            Console.BufferHeight = Int16.MaxValue - 10;
            Console.BufferWidth = Int16.MaxValue - 10;

            ConcurrentQueue<int> concurrentQueue = new ConcurrentQueue<int>();

            for(int i =0;i<10000; i++)
            {
                concurrentQueue.Enqueue(i);
            }

            Console.WriteLine("添加完畢....");

            var t = new Task[50];
            
            for (int i=0; i<50; i++)
            {
                m_ac.AboutToBegin(1);
                t[i] = Task.Factory.StartNew((param) =>
                {
                    while (concurrentQueue.Count>0)
                    {
                        int x;
                        if(concurrentQueue.TryDequeue(out x))
                        {
                            Console.WriteLine(x + "   線程Id: {0},   線程數: {1}", Task.CurrentId, param.ToString());
                        }
                        //Thread.Sleep(150);
                    }
                    m_ac.JustEnded();
                }, i);
            }
            
            m_ac.AllBegun(AllDone, Timeout.Infinite);
            Console.ReadKey();
        }

        public static void AllDone(CoordinationStatus status)
        {
            switch (status)
            {
                case CoordinationStatus.Cancel:
                    Console.WriteLine("Operation canceled.");
                    break;

                case CoordinationStatus.Timeout:
                    Console.WriteLine("Operation timed-out.");
                    break;

                case CoordinationStatus.AllDone:
                    Console.WriteLine("處理完畢....");
                    break;
            }
        }
    }
}

但是發現了一個問題:

這CPU使用率....
然后我看了下輸出結果:

可以看到線程數才只有5個(我的線程數是從0開始算的),這不會啊,明明我們就開了50個線程啊,不過不管開多少個線程,這CPU扛不住啊,要是說在項目中的某個模塊需要用到並發,這CPU使用率你扛得住?服務器本來配置就不會太好,網站的其余模塊不要用CPU了?而且,我明明開了50個線程跑啊,為什么只有五個線程?其實很簡單,因此並發下,代碼只用了五個線程就跑完了這一萬個數據,剩下的線程開了沒有用武之地。找到只有五個線程開着的原因了之后,要想想怎么解決啊,多的45個線程也是要占內存的,盡管是線程池線程,但也是要占用內存啊,既然是因為並發下運行太快,只要五個線程就能跑滿一萬個數據,那我就阻塞一會線程就可以了,這樣讓剩下的45個線程能夠有機會運行。改代碼!
將上面的Thread.Sleep(150)的注釋給去掉哦!

嗯,這個結果還是可以的,但是有個Console host占用內存高啊,占就占唄,反正該用的內存還是要用。我們睡眠了一段時間的線程,那么與不睡眠相比,並發的CPU使用率是不是下降了?我們開線程最好的期待不就是跑滿CPU么?其實不然,開線程不過就是為了更快的運行程序,將耗時的程序分批次運行,但是如果期間占用CPU太高,我這里是個demo,占用CPU時間很短,也就幾十秒。但是真的項目中會允許么?具體情況具體分析吧,如果不介意的話,可以這么跑,大不了另外弄個服務器專門跑並發,然后將數據存儲到數據庫中(如果你的業務是: 並發調用第三方接口,然后將接口獲取的數據做處理,完全可以采用這種設計)。但是請注意,還是不要太耗費CPU的好。
並發線程的睡眠時間,我們也可以自己調節下,建議是100-200ms吧。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM