C#中異步多線程的常見用法


先來看幾個基本概念(純屬個人見解,可能不准確):

進程:程序運行時,占用的全部運行資源的總和。

線程:線程是隸屬於操作系統管理的,也可以有自己的計算資源,是程序執行流的最小單位。任何的操作都是由線程來完成的。

每個線程都在操作系統的進程內執行,而操作系統進程提供了程序運行的獨立環境。

多線程:多核cpu協同工作,多個執行流同時運行,是用資源換時間。(單核cpu,不存在所謂的多線程)。

單線程應用:在進程的獨立環境中只跑一個線程,所以該線程擁有獨立權。

多線程應用:單個進程中會跑多個線程,它們會共享當前的執行環境(尤其是內存)。

在單核計算機上,操作系統必須為每個線程分配“時間片”來模擬並發。而在多核或多處理器計算機上,多個線程可以真正的並行執行。(可能會受到計算機上其他活動進程的競爭)。

win10上的時間片(使用微軟官方小工具測得):

Thread

  Thread的對象是非線程池中的線程,有自己的生命周期(有創建和銷毀的過程),所以不可以被重復利用(一個操作中,不會出現二個相同Id的線程)。

Thread的常見屬性:

  • 線程一旦開始執行,IsAlive就是true,線程結束就變成false。
  • 線程結束的條件是:線程構造函數傳入的委托結束了執行。
  • 線程一旦結束,就無法再重啟。
  • 每個線程都有一個Name屬性,通常用於調試,線程的Name屬性只能設置一次,以后更改會拋出異常。
  • 靜態的Thread.CurrentThread屬性,會返回當前線程。

Thread的常見用法:

join

調用join方法可以等待另一個線程結束。

private void button5_Click(object sender, EventArgs e) {     
Console.WriteLine($
"===============Method start time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")},Thread ID is {Thread.CurrentThread.ManagedThreadId},is back ground: {Thread.CurrentThread.IsBackground}==================="); //開啟一個線程,構造方法可重載兩種委托,一個是無參無返回值,一個是帶參無返回值 Thread thread = new Thread(a => DoSomeThing("Thread")); //當前線程狀態 Console.WriteLine($"thread's state is {thread.ThreadState},thread's priority is {thread.Priority} ,thread is alived :{thread.IsAlive},thread is background:{thread.IsBackground},thread is pool threads: {thread.IsThreadPoolThread}"); //告知操作系統,當前線程可以被執行了。 thread.Start(); //阻塞當前執行線程,等待此thread線程實例執行完成。無返回值 thread.Join(); //最大等待的時間是5秒(不管是否執行完成,不再等待),返回一個bool值,如果是true,表示執行完成並終止。如果是false,表示已到指定事件但未執行完成。 thread.Join(5000); Console.WriteLine($"===============Method end time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")},,Thread ID is {Thread.CurrentThread.ManagedThreadId},is back ground: {Thread.CurrentThread.IsBackground}==================="); }
private void DoSomeThing(string name) { Console.WriteLine($"do some thing start time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")},Thread ID is {Thread.CurrentThread.ManagedThreadId},is back ground: {Thread.CurrentThread.IsBackground}"); long result = 0; for (long i = 0; i < 10000 * 10000; i++) { result += i; } Console.WriteLine($"do some thing end time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")},Thread ID is {Thread.CurrentThread.ManagedThreadId},is back ground: {Thread.CurrentThread.IsBackground}"); }

注意 :thread 默認是前台線程,啟動后一定要完成任務的,即使程序關掉(進程退出)也要執行完。可以把thread 指定為后台線程,隨着進程的退出而終止。

//false,默認是前台線程,啟動后一定要完成任務的,即使程序關掉(進程退出)也要執行完。
Console.WriteLine(thread.IsBackground); 
thread.IsBackground = true;//指定為后台線程。(隨着進程的退出而退出)

Sleep

Thread.Sleep()會暫停當前線程,,並等待一段時間。其實,Thread.Sleep只是放棄時間片的剩余時間,讓系統重新調度並選擇一個合適的線程。

在沒有其他活動線程的情況下,使用Thread.Sleep(0)還是會選上自身,即連任,系統不會對其做上下文切換。

static void Main(string[] args)
{
    Stopwatch stopwatch=new Stopwatch();
    stopwatch.Start();
    Thread.Sleep(0);
    stopwatch.Stop();
    System.Console.WriteLine(stopwatch.ElapsedMilliseconds); //返回0
}

而Thread.Sleep(大於0)卻讓當前線程沉睡了,即使只有1ms也是沉睡了,也就是說當前線程放棄下次的競選,所以不能連任,系統上下文必然發生切換。

阻塞

如果線程的執行由於某種原因導致暫停,那么就認為該線程被阻塞了。例如在Sleep或者Join等待其他線程結束。被阻塞的線程會立即將其處理器的時間片生成給其他線程,從此就不再消耗處理器時間。

 

 

 

 

Thread的回調用法:

Thread沒有像Framework中的delegate的回調用法,如果需要回調得自動動手改造:

private void CallBack(Action action, Action calback)
{
    Thread thread = new Thread(() => { action(); calback(); });
    thread.Start();
}
//無參無返回值
CallBack(() => Console.WriteLine("好嗎?"), () => Console.WriteLine("好的!"));
private Func<T> CallBackReturn<T>(Func<T> func)
{
    T t = default(T);
    Thread thread = new Thread(() =>
    {
        t = func();
    });
    thread.Start();
    return () =>
    {
        thread.Join();
        return t;
    };
}
//帶返回值得用法
Func<int> func = CallBackReturn<int>(() => DateTime.Now.Second);
Console.WriteLine("線程未阻塞");
int result = func.Invoke();
Console.WriteLine("result:" + result);

ThreadPool 線程池

Thread的功能太過強大,像我這樣的小白是用不好的(之前在項目中大量使用Thread的API,出現了許多意想不到的bug)。線程池中的線程在同一操作中可以被重復利用。

 //開啟多線程
 ThreadPool.QueueUserWorkItem(n => DoSomeThing("ThreadPool"));

 個人覺得盡量不要阻塞線程池的線程,因為線程池里的線程數量是有限的,當線程池中沒有線程可用時,會出現死鎖。如果非要等待,用法如下:

ManualResetEvent manualResetEvent = new ManualResetEvent(false);
ThreadPool.QueueUserWorkItem(n =>
{
    DoSomethingLong("ThreadPool");
    manualResetEvent.Set();
});
//等待線程完成
manualResetEvent.WaitOne();

 Task

Task是基於ThreadPool的基礎上做的封裝,屬於線程池中的線程。

Task啟動多線程的方式:

方式一:指定任務的開始時機

/// <summary>
/// 使用Task或Task<T>創建任務,需指定任務的開始時機(任務調度)。
/// </summary>
public static void Demo1()
{
    Task task = new Task(() =>
    {
        Thread.Sleep(3000); 
Console.WriteLine($"Current thread id is {Thread.CurrentThread.ManagedThreadId}"); }); task.Start();//任務調度(開始任務) Console.WriteLine($"Current thread name is {Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine($"當前任務狀態:{task.Status}"); task.Wait(); //等待任務執行完成 Console.WriteLine($"當前任務狀態:{task.Status}"); }

 

 

方式二:一步完成多線程的創建和啟動

/// <summary>
/// 使用Task.Run()方法一步完成多線程的創建和啟動(當前線程立即准備啟動任務)。
/// <remark>
/// 如果不需要對任務的創建和調度做更多操作,Task.Run()方法是創建和啟動任務的首選方式。
/// </remark>
/// </summary>
public static void Demo2()
{
    Task task = Task.Run(() => { Thread.Sleep(3000); Console.WriteLine($"Current thread id is {Thread.CurrentThread.ManagedThreadId}"); });
    task.Wait(); //等待,直到任務完成
}

 

方式三:需要想多線程任務傳遞狀態參數

/// <summary>
/// Task和Task<TResult>都有靜態屬性Factory,它返回默認的實例TaskFactory.
/// 使用Task.Factory.StartNew()方法也可以一步完成任務的創建和啟動。
/// 當前需要向任務傳遞一個狀態(參數)。可以使用此方法。
/// </summary>
public static void Demo3()
{
    Task[] tasks = new Task[10];
    for (int i = 0; i < tasks.Length; i++)
    {
        tasks[i] = Task.Factory.StartNew((obj) =>
        {
            CustomData data = obj as CustomData;
            data.ThreadId = Thread.CurrentThread.ManagedThreadId;
        }, new CustomData { CreationTime = DateTime.Now.Ticks, Index = i});
    }
//以阻塞當前線程的方式,等待所以子線程的完成 Task.WaitAll(tasks);
foreach (var task in tasks) { //通過任務的AsyncState屬性,可以獲取任務狀態(提供給任務的參數). var data = task.AsyncState as CustomData; Console.WriteLine(JsonConvert.SerializeObject(data)); } } //Task.Factory.StartNew() 調用無返回值的任務 //Task<TResult>.Factory.StartNew() 調用有返回值的任務

 

Task<TResult>

public static void Demo4()
{
    Task<Double>[] tasks = {
Task<Double>.Factory.StartNew(() => DoComputation(1.0)),
Task<Double>.Factory.StartNew(() => DoComputation(100.0)),
Task<Double>.Factory.StartNew(() => DoComputation(1000.0)) };
    var results = new Double[tasks.Length];
    Double sum = 0;
    for (int i = 0; i < tasks.Length; i++)
    {
        //Task<TResult>.Result屬性包含任務的計算結果,如果在任務完成之前調用,則會阻塞線程直到任務完成
        results[i] = tasks[i].Result; 
        Console.Write("{0:N1} {1}", results[i],
                          i == tasks.Length - 1 ? "= " : "+ ");
        sum += results[i];
    }
    Console.WriteLine("{0:N1}", sum);
}
private static Double DoComputation(Double start)
{
    Double sum = 0;
    for (var value = start; value <= start + 10; value += .1)
        sum += value;
    return sum;
}

 

Task的常用API

WaitAny和WaitAll,會阻塞當前線程(主線程)的執行:

List<Task> tasks = new List<Task>();
tasks.Add(Task.Run(() => DoSomeThing("Task1")));
tasks.Add(Task.Run(() => DoSomeThing("Task2")));
tasks.Add(Task.Run(() => DoSomeThing("Task3")));
//阻塞當前線程的執行,等待任意一個子線程任務完成后繼續往下執行
Task.WaitAny(tasks.ToArray());
//阻塞當前線程的執行,等待所有子線程任務完成后繼續往下執行
Task.WaitAll(tasks.ToArray());

 

WhenAll和WhenAny,是通過返回一個Task 對象的方式,來達到非阻塞式的等待

//不阻塞當前線程的執行,等待所有子線程任務完成后,異步執行后續的操作
Task.WhenAll(tasks).ContinueWith(t =>
{
    Console.WriteLine($"不阻塞,{Thread.CurrentThread.ManagedThreadId}");
});

//工廠模式的實現
Task.Factory.ContinueWhenAll(tasks.ToArray(), s =>
{
Console.WriteLine("不阻塞" + s.Length);
});

ContinueWith,是一個實例方式,並且返回Task實例,所以可以使用這種鏈式結構來完成按順序執行。

public static void Demo8()
{
    var task = Task.Factory
        .StartNew(() => { Console.WriteLine("1"); return 10; })
        .ContinueWith(i => { Console.WriteLine("2"); return i.Result + 1; })
        .ContinueWith(i => { Console.WriteLine("3"); return i.Result + 1; });
    Console.WriteLine(task.Result);
}

控制線程數量的使用,(核心思想來自別人,我感覺控制的很好):

/// <summary>
/// 線程數量的控制
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void Test(object sender, EventArgs e)
{
    //完成10000個任務,但只要11個線程。
    List<int> intList = new List<int>();
    for (int i = 0; i < 10000; i++)
    {
        intList.Add(i);
    }
    Action<int> action = i =>
    {
        Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(new Random(i).Next(100, 300));
    };
    List<Task> tasks = new List<Task>();
    foreach (var item in intList)
    {
        int i = item;
        tasks.Add(Task.Run(() => action(i)));
        //當已使用了11個線程的時候,即時釋放已完成的線程。
        if (tasks.Count > 10)
        {
            Task.WaitAny(tasks.ToArray());
            tasks = tasks.Where(n => n.Status != TaskStatus.RanToCompletion).ToList();
        }
    }
    Task.WaitAll(tasks.ToArray());
}

 注意:應當避免在子線程委托的內部直接使用主線程變量(閉包的弊端問題)

 public static void Demo5()
 {
     Task[] taskArray = new Task[10];
     for (int i = 0; i < taskArray.Length; i++)
     {
         taskArray[i] = Task.Factory.StartNew(() =>
         {
             //當您使用lambda表達式創建委托時,雖然可以訪問變量范圍內可見的所有變量。
             //但是在某些情況下(最明顯的是在循環中),lambda不能像預期的那樣捕獲變量
             //(本例中,它只能捕獲最后一個值,而不每次迭代的值)。 
             //因為任務的運行時機不確定。可以通過傳遞參數的方式,避免此問題的發生。
             Console.WriteLine(i);//輸出10個10
         });
     }
     Task.WaitAll(taskArray);
 }
創建分離的子任務

  在父任務中創建子任務,如果未指定AttachedToParent選項時,子任務不會與父任務同步。

public static void Demo9()
{
    //創建父任務
    var outer = Task.Run(() =>
    {
        Console.WriteLine("父任務開始啟動!");
        //創建子任務
        var child = Task.Run(() =>
        {
            Thread.SpinWait(5000000);
            Console.WriteLine("分離的任務完成");
        });
    });
    outer.Wait(); //父任務不會等待子任務的完成
    Console.WriteLine("父任務完成.");
}

當在任務中運行的代碼使用AttachedToParent選項創建新任務時,新任務稱為父任務的附加子任務。可以使用AttachedToParent選項來表達結構化任務並行性,因為父任務隱式等待所有附加的子任務完成。

 public static void Demo10()
 {
     var parent = Task.Factory.StartNew(() => {
         Console.WriteLine("Parent task beginning.");
         for (int i = 0; i < 10; i++)
         {
             Task.Factory.StartNew((x) => {
                 Thread.SpinWait(5000000);
                 Console.WriteLine("Attached child #{0} completed.",x);
             }, i, TaskCreationOptions.AttachedToParent);  
         }
     });
     parent.Wait();
     Console.WriteLine("Parent task completed.");
 }

注意:如果父任務啟動DenyChildAttach選項,子任務即時啟用AttachedToParent選項也不會附加到父任務。

Parallel
parallel為並行計算,主線程也參與計算
 //Parallel.For:
public static void Main(string[] args)
 {
     //計算目錄的大小
     long totalSize = 0;
     String[] files =Directory.GetFiles(@"C:\Users\Administrator\Desktop");
     Parallel.For(0, files.Length,
                  index => {
                      FileInfo fi = new FileInfo(files[index]);
                      long size = fi.Length;
                      Interlocked.Add(ref totalSize, size);  //將兩個64位整數相加,並用和替換第一個整數,作為
                  });
     Console.WriteLine("{0:N0} files, {1:N0} bytes", files.Length, totalSize); 
 }
//旋轉圖片 
static void Main(string[] args)
 {
     // A simple source for demonstration purposes. Modify this path as necessary.
     string[] files = Directory.GetFiles(@"C:\Users\Administrator\Desktop\test");
     string newDir = @"C:\Users\Administrator\Desktop\test\Modified";
     if (!Directory.Exists(newDir))
         Directory.CreateDirectory(newDir);
     // Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
     Parallel.ForEach(files, (currentFile) =>
     {
         // The more computational work you do here, the greater 
         // the speedup compared to a sequential foreach loop.
         string filename = Path.GetFileName(currentFile);
         var bitmap = new Bitmap(currentFile);
         bitmap.RotateFlip(RotateFlipType.Rotate180FlipNone);
         bitmap.Save(Path.Combine(newDir, filename));
         // Peek behind the scenes to see how work is parallelized.
         // But be aware: Thread contention for the Console slows down parallel loops!!!
         Console.WriteLine($"Processing {filename} on thread {Thread.CurrentThread.ManagedThreadId}");
         //close lambda expression and method invocation
     });
 }

 

 分區局部變量

 static void TestParallForeach()
 {
     int[] array = Enumerable.Range(1, 100).ToArray();
     long totalNum = 0;
     //int 為集合元素類型
     //long 為分區局部變量類型
     Parallel.ForEach<int, long>(array, //源集合
         () => 0, //初始化局部分區變量,每個分區執行一次
         (index, state, subtotal) => //每次迭代的時候執行
         {
             subtotal += index; //修改分區局部變量
             return subtotal; //傳遞給當前分區的下一次迭代
         },
         //每個分區結束的時間執行,並將該分區最后一次迭代的局部分區變量傳遞過來。
         (finalTotal) => Interlocked.Add(ref totalNum, finalTotal)
         );
     /*重載方式:public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, 
     Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);
     TSource:源數據類型。 source:源數據,必須實現 IEnumerable<T>接口。
     TLocal:局部分區變量類型。localInit:初始化局部分區變量的函數。每個分區都是執行此函數一次。
     body:並行循環的每次迭代都是調用此方法。
     body.TSource:當前元素。body.ParallelLoopState:ParallelLoopState類型的變量,可用來檢索循環的狀態。
     body.TLocal.1:局部分區變量。
     body.TLocal.2:返回值。將其傳遞給特定分區循環的下一個迭代。
     localFinally:每個分區的循環完成時調用此委托。*/
     Console.WriteLine(totalNum);

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM