Worker Service


一、概述

1、概念

Worker Service 是使用模板構建的 .NET 項目,在VS2019中可以找到。Worker Service可以用來編寫長時間運行的后台服務,並且能部署成windows服務或linux守護程序。Worker Service 沒有用戶界面,也不支持直接的用戶交互,它們特別適用於設計微服務架構。在微服務體系結構中,職責通常被划分為不同的、可單獨部署的、可伸縮的服務。隨着微服務架構的成長和發展,擁有大量的 Worker Service 會變得越來越常見。

2、應用場景

  • 處理來自隊列、服務總線或事件流的消息、事件
  • 響應對象、文件存儲中的文件更改
  • 聚合數據存儲中的數據
  • 豐富數據提取管道中的數據
  • 可以支持定期的批處理工作負載

二、創建Worker Service

  • 創建新項目->選擇 Worker Service

  • 項目創建成功之后,會自動創建兩個類:Program和Worker

  • Program.cs
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace WorkerService1
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<Worker>();
                });
    }
}

Program類跟ASP.NET Core Web應用程序非常類似,不同之處沒有了startup類,並且把worker服務添加到DI container中。

  • Worker.cs
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace WorkerService1
{
    public class Worker : BackgroundService
    {
        private readonly ILogger<Worker> _logger;

        public Worker(ILogger<Worker> logger)
        {
            _logger = logger;
        }
        /// <summary>
        /// 重寫BackgroundService.ExecuteAsync方法,封裝windows服務或linux守護程序中的處理邏輯
        /// </summary>
        /// <param name="stoppingToken"></param>
        /// <returns></returns>
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            //如果服務被停止,那么下面的IsCancellationRequested會返回true,我們就應該結束循環
            while (!stoppingToken.IsCancellationRequested)
            {
                //模擬服務中的處理邏輯,這里我們僅輸出一條日志,並且等待1秒鍾時間
                _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
                await Task.Delay(1000, stoppingToken);
            }
        }
    }
}

worker繼承自BackgroundService ,而后者又實現IHostedService接口。worker類的構造函數中,使用了.NET Core自帶的日志組件接口對象ILogger,它是通過DI(依賴注入)注入到worker類的構造函數中的。ExecuteAsync 方法用來完成相應的邏輯,該方法實際上屬於BackgroundService類,可以在worker類中重寫(override)它。通過ExecuteAsync方法傳入的CancellationToken參數對象,來判斷是否應該結束循環,例如如果windows服務被停止,那么參數中CancellationToken類的IsCancellationRequested屬性會返回true,ExecuteAsync方法就會停止循環,來結束整個windows服務。

  • 重寫BackgroundService類的StartAsync、ExecuteAsync、StopAsync方法

  我們也可以在worker類中重寫BackgroundService.StartAsync方法和BackgroundService.StopAsync方法,注意重寫時,不要忘記在worker類中調用base.StartAsync和base.StopAsync,因為BackgroundService類的StartAsync和StopAsync會執行一些Worker Service的核心代碼,在開始和結束Worker Service服務(例如開始和停止windows服務)的時候,來執行一些處理邏輯,本例中我們分別輸出了一條日志:

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace WorkerService1
{
    public class Worker : BackgroundService
    {
        private readonly ILogger<Worker> _logger;

        public Worker(ILogger<Worker> logger)
        {
            _logger = logger;
        }
        //重寫BackgroundService.StartAsync方法,在開始服務的時候,執行一些處理邏輯,這里我們僅輸出一條日志
        public override async Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Worker starting at: {time}", DateTimeOffset.Now);

            await base.StartAsync(cancellationToken);
        }

        /// <summary>
        /// 重寫BackgroundService.ExecuteAsync方法,封裝windows服務或linux守護程序中的處理邏輯
        /// </summary>
        /// <param name="stoppingToken"></param>
        /// <returns></returns>
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            //如果服務被停止,那么下面的IsCancellationRequested會返回true,我們就應該結束循環
            while (!stoppingToken.IsCancellationRequested)
            {
                //模擬服務中的處理邏輯,這里我們僅輸出一條日志,並且等待1秒鍾時間
                _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
                await Task.Delay(1000, stoppingToken);
            }
        }

        //重寫BackgroundService.StopAsync方法,在結束服務的時候,執行一些處理邏輯,這里我們僅輸出一條日志
        public override async Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Worker stopping at: {time}", DateTimeOffset.Now);

            await base.StopAsync(cancellationToken);
        }
    }
}

由於BackgroundService類的StartAsync、ExecuteAsync、StopAsync方法返回的都是Task類型,我們可以使用async和await關鍵字將它們重寫為異步函數,來提高程序的性能。運行結果如下所示,每隔1秒循環打印運行的時間(可以在啟動的控制台中使用快捷鍵"Ctrl+C"來停止Worker Service的運行,相當於停止windows服務或linux守護程序):

 

從下圖可以看到Worker Service項目從本質上來說就是一個控制台項目,只不過當它被部署為windows服務或linux守護程序后,不會顯示控制台窗口。所以實際上在Visual Studio中進行調試的時候,完全可以用Console.WriteLine等控制台方法來替代ILogger接口的日志輸出方法,不過由於ILogger接口的日志輸出方法也可以記錄到文件等媒介上,還是更推薦使用ILogger接口來輸出調試信息。

  • 避免線程阻塞

  不要讓線程阻塞worker類中重寫的StartAsync、ExecuteAsync、StopAsync方法,因為StartAsync方法負責啟動Worker Service,如果調用StartAsync方法的線程被一直阻塞了,那么Worker Service的啟動就一直完成不了。同理StopAsync方法負責結束Worker Service,如果調用StopAsync方法的線程被一直阻塞了,那么Worker Service的結束就一直完成不了。這里主要說明下為什么ExecuteAsync方法不能被阻塞,我們嘗試把本例中的ExecuteAsync方法改為如下代碼:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    while (!stoppingToken.IsCancellationRequested)
    {
        _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
        Thread.Sleep(1000);//使用Thread.Sleep進行同步等待,調用ExecuteAsync方法的線程會一直執行這里的循環,被不停地被阻塞
    }
    await Task.CompletedTask;
}

我們將ExecuteAsync方法中的異步等待方法Task.Delay,改為了同步等待方法Thread.Sleep。由於Thread.Sleep方法是將執行線程通過阻塞的方式來進行等待,所以現在調用ExecuteAsync方法的線程會一直執行ExecuteAsync方法中的循環,被不停地被阻塞,除非ExecuteAsync方法中的循環結束,那么調用ExecuteAsync方法的線程會被一直卡在ExecuteAsync方法中。現在我們在Visual Studio中運行Worker Service,執行結果如下:

我們可以看到當我們在控制台中使用快捷鍵"Ctrl+C"試圖停止Worker Service后(上圖紅色框中輸出的日志),ExecuteAsync方法中的循環還是在不停地運行來輸出日志,這說明ExecuteAsync方法的CancellationToken參數的IsCancellationRequested屬性還是返回的false,所以這就是問題所在,如果我們直接用調用ExecuteAsync方法的線程去做循環,來執行windows服務或linux守護程序的處理邏輯,會導致Worker Service無法被正常停止,因為ExecuteAsync方法的CancellationToken參數沒有被更新。所以,那些很耗時並且要循環處理的 windows服務或linux守護程序的處理邏輯,應該要放到另外的線程中去執行,而不是由調用ExecuteAsync方法的線程去執行。所以假設我們現在有三個windows服務或linux守護程序的邏輯現在要被處理,我們可以將它們放到三個新的線程中去執行,如下代碼所示:

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace WorkerService1
{
    public class Worker : BackgroundService
    {
        private readonly ILogger<Worker> _logger;

        public Worker(ILogger<Worker> logger)
        {
            _logger = logger;
        }
        //重寫BackgroundService.StartAsync方法,在開始服務的時候,執行一些處理邏輯,這里我們僅輸出一條日志
        public override async Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Worker starting at: {time}", DateTimeOffset.Now);

            await base.StartAsync(cancellationToken);
        }

        //第一個 windows服務或linux守護程序 的處理邏輯,由RunTaskOne方法內部啟動的Task任務線程進行處理,同樣可以從參數CancellationToken stoppingToken中的IsCancellationRequested屬性,得知Worker Service服務是否已經被停止
        protected Task RunTaskOne(CancellationToken stoppingToken)
        {
            return Task.Run(() =>
            {
                //如果服務被停止,那么下面的IsCancellationRequested會返回true,我們就應該結束循環
                while (!stoppingToken.IsCancellationRequested)
                {
                    _logger.LogInformation("RunTaskOne running at: {time}", DateTimeOffset.Now);
                    Thread.Sleep(1000);
                }
            }, stoppingToken);
        }

        //第二個 windows服務或linux守護程序 的處理邏輯,由RunTaskTwo方法內部啟動的Task任務線程進行處理,同樣可以從參數CancellationToken stoppingToken中的IsCancellationRequested屬性,得知Worker Service服務是否已經被停止
        protected Task RunTaskTwo(CancellationToken stoppingToken)
        {
            return Task.Run(() =>
            {
                //如果服務被停止,那么下面的IsCancellationRequested會返回true,我們就應該結束循環
                while (!stoppingToken.IsCancellationRequested)
                {
                    _logger.LogInformation("RunTaskTwo running at: {time}", DateTimeOffset.Now);
                    Thread.Sleep(1000);
                }
            }, stoppingToken);
        }

        //第三個 windows服務或linux守護程序 的處理邏輯,由RunTaskThree方法內部啟動的Task任務線程進行處理,同樣可以從參數CancellationToken stoppingToken中的IsCancellationRequested屬性,得知Worker Service服務是否已經被停止
        protected Task RunTaskThree(CancellationToken stoppingToken)
        {
            return Task.Run(() =>
            {
                //如果服務被停止,那么下面的IsCancellationRequested會返回true,我們就應該結束循環
                while (!stoppingToken.IsCancellationRequested)
                {
                    _logger.LogInformation("RunTaskThree running at: {time}", DateTimeOffset.Now);
                    Thread.Sleep(1000);
                }
            }, stoppingToken);
        }

        //重寫BackgroundService.ExecuteAsync方法,封裝windows服務或linux守護程序中的處理邏輯
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            try
            {
                Task taskOne = RunTaskOne(stoppingToken);
                Task taskTwo = RunTaskTwo(stoppingToken);
                Task taskThree = RunTaskThree(stoppingToken);

                await Task.WhenAll(taskOne, taskTwo, taskThree);//使用await關鍵字,異步等待RunTaskOne、RunTaskTwo、RunTaskThree方法返回的三個Task對象完成,這樣調用ExecuteAsync方法的線程會立即返回,不會卡在這里被阻塞
            }
            catch (Exception ex)
            {
                //RunTaskOne、RunTaskTwo、RunTaskThree方法中,異常捕獲后的處理邏輯,這里我們僅輸出一條日志
                _logger.LogError(ex.Message);
            }
            finally
            {
                //Worker Service服務停止后,如果有需要收尾的邏輯,可以寫在這里
            }
        }

        //重寫BackgroundService.StopAsync方法,在結束服務的時候,執行一些處理邏輯,這里我們僅輸出一條日志
        public override async Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Worker stopping at: {time}", DateTimeOffset.Now);

            await base.StopAsync(cancellationToken);
        }
    }
}

所以現在調用ExecuteAsync方法的線程就不會被阻塞了,執行結果如下:

可以看到這次,當我們在控制台中使用快捷鍵"Ctrl+C"試圖停止Worker Service后,ExecuteAsync方法就立即停止運行了,所以這里再次強調千萬不要去阻塞調用ExecuteAsync方法的線程!另外上面代碼中,我們在worker類重寫的ExecuteAsync方法中放了一個finally代碼塊,這個代碼塊可以用來執行一些Worker Service服務停止后的一些收尾代碼邏輯(例如關閉數據庫連接、釋放資源等),我更傾向於使用ExecuteAsync方法中的finally代碼塊來做Worker Service的收尾工作,而不是在worker類重寫的StopAsync方法中來做收尾工作,從BackgroundService的源代碼,我們可以看出worker類的StopAsync方法是有可能比ExecuteAsync方法先完成的,所以Worker Service的收尾工作應該放到ExecuteAsync方法中的finally代碼塊,因為ExecuteAsync方法中的finally代碼塊,肯定是在RunTaskOne、RunTaskTwo、RunTaskThree方法返回的三個Task對象執行完畢后才執行的。 

 

  • 在Worker Service中運行多個Worker類

  在前面的例子中,可以看到我們在一個Worker類中定義了三個方法RunTaskOne、RunTaskTwo、RunTaskThree,來執行三個 windows服務或linux守護程序 的邏輯。其實我們還可以在一個Worker Service項目中,定義和執行多個Worker類,而不是把所有的代碼邏輯都放在一個Worker類中。首先我們定義第一個Worker類WorkerOne:

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace WorkerService1
{
    public class WorkerOne:BackgroundService
    {
        private readonly ILogger<WorkerOne> _logger;

        public WorkerOne(ILogger<WorkerOne> logger)
        {
            _logger = logger;
        }

        //重寫BackgroundService.StartAsync方法,在開始服務的時候,執行一些處理邏輯,這里我們僅輸出一條日志
        public override async Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("WorkerOne starting at: {time}", DateTimeOffset.Now);

            await base.StartAsync(cancellationToken);
        }

        //重寫BackgroundService.ExecuteAsync方法,封裝windows服務或linux守護程序中的處理邏輯
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            //如果服務被停止,那么下面的IsCancellationRequested會返回true,我們就應該結束循環
            while (!stoppingToken.IsCancellationRequested)
            {
                //模擬服務中的處理邏輯,這里我們僅輸出一條日志,並且等待1秒鍾時間
                _logger.LogInformation("WorkerOne running at: {time}", DateTimeOffset.Now);
                await Task.Delay(1000, stoppingToken);
            }
        }

        //重寫BackgroundService.StopAsync方法,在結束服務的時候,執行一些處理邏輯,這里我們僅輸出一條日志
        public override async Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("WorkerOne stopping at: {time}", DateTimeOffset.Now);

            await base.StopAsync(cancellationToken);
        }
    }
}

接着我們定義第二個Worker類WorkerTwo:

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace WorkerService1
{
    public class WorkerTwo : BackgroundService
    {
        private readonly ILogger<WorkerTwo> _logger;

        public WorkerTwo(ILogger<WorkerTwo> logger)
        {
            _logger = logger;
        }

        //重寫BackgroundService.StartAsync方法,在開始服務的時候,執行一些處理邏輯,這里我們僅輸出一條日志
        public override async Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("WorkerTwo starting at: {time}", DateTimeOffset.Now);

            await base.StartAsync(cancellationToken);
        }

        //重寫BackgroundService.ExecuteAsync方法,封裝windows服務或linux守護程序中的處理邏輯
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            //如果服務被停止,那么下面的IsCancellationRequested會返回true,我們就應該結束循環
            while (!stoppingToken.IsCancellationRequested)
            {
                //模擬服務中的處理邏輯,這里我們僅輸出一條日志,並且等待1秒鍾時間
                _logger.LogInformation("WorkerTwo running at: {time}", DateTimeOffset.Now);
                await Task.Delay(1000, stoppingToken);
            }
        }

        //重寫BackgroundService.StopAsync方法,在結束服務的時候,執行一些處理邏輯,這里我們僅輸出一條日志
        public override async Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("WorkerTwo stopping at: {time}", DateTimeOffset.Now);

            await base.StopAsync(cancellationToken);
        }
    }
}

然后我們在Program類中,將WorkerOne和WorkerTwo服務添加到DI container中:

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace WorkerService1
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<WorkerOne>();
                    services.AddHostedService<WorkerTwo>();
                    //services.AddHostedService<Worker>();
                });
    }
}

然后在Visual Studio中運行Worker Service,執行結果如下:

 

可以看到WorkerOne和WorkerTwo類都被執行了,並且都輸出了日志信息

三、部署為Windows服務運行

  • 在program.cs內部,將UseWindowsService()添加到CreateHostBuilder
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace WorkerService1
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .UseWindowsService()
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<Worker>();
                });
    }
}

注意在非 Windows 平台上調用 UseWindowsService 方法也是不會報錯的,非 Windows 平台會忽略此調用。

  • 執行一下命令發布項目
dotnet publish -c Release -o D:\PersonalInfo\windowsservices\WindowsService1\WorkerService1\Release

在Powershell中執行:

也可以在Visual Studio中用項目自身的發布向導來將Worker Service項目發布到文件夾"bin\Release\net5.0\publish\"中:

  • 默認情況下Worker Service項目會被發布為一個exe文件:

 

  • 使用sc.exe工具來管理服務,輸入命令創建為windows服務(Run as administrator)啟動Powershell:
sc.exe create NETCoreWorkerService1 binPath=D:\PersonalInfo\windowsservices\WindowsService1\WorkerService1\bin\Release\net5.0\publish\WorkerService1.exe

  • 查看服務狀態,在powershell中執行(Run as administrator):
sc.exe query NETCoreWorkerService1

  • 啟動命令,在powershell中執行(Run as administrator):
sc.exe start NETCoreWorkerService1

  • 在windows服務列表查看,NETCoreWorkerService1已安裝成功:

  • 停用 、刪除命令:
sc.exe stop NETCoreWorkerService1
sc.exe delete NETCoreWorkerService1

四、部署作為Linux守護程序運行

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace WorkerService1
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>           
            Host.CreateDefaultBuilder(args)
                //.UseWindowsService()
                .UseSystemd()
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<Worker>();
                });
    }
}

在 Windows 平台上調用 UseSystemd 方法也是不會報錯的,Windows 平台會忽略此調用。具體如何添加守護進程可以參考https://www.cnblogs.com/qtiger/p/13853828.html

五、參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM