.Net Core 3.0使用BackgroudService運行多個后台服務


.Net Core 3.0使用BackgroudService運行多個后台服務

前言

由於業務系統需要向京東推送支付狀態,在接口層接收到京東的支付json之后推送給阿里的MNS隊列,然后后台程序監聽MNS隊列進行支付狀態同步等。由於之前的是基於控制台程序,開啟了兩個異步Task,而這個Task應該會隨着應用程序池回收而消失,也有可能是由於隊列消費出錯而被回收。導致后續消息都不消費了。所以基於dotnet core 3.0的BackgroudService來運行后台程序

創建HostBuilder

在微軟官方文檔里有關於HostBuilder還有BackgroudService的使用,這里就不深入了

public static IHostBuilder CreateHostBuilder (string[] args) =>
    Host.CreateDefaultBuilder (args)
    .ConfigureServices ((hostContext, services) => {
        var configBuilder = new ConfigurationBuilder ()
            .SetBasePath (Directory.GetCurrentDirectory ())
            .AddJsonFile ("appsettings.json", true, true);

        services.AddConfig (options => {
            options.ConfigurationBuilder = configBuilder;
            options.Namespace = "Dev001.ThirdService";
        });

        IServiceProvider serviceProvider = services.BuildServiceProvider ();
        IConfig _config = serviceProvider.GetService<IConfig> ();

        MqttConfig mqttConfig = new MqttConfig {
            Server = _config.Get ("EMQServer"),
            Port = int.Parse (_config.Get ("EMQPort")),
            ClientIdPre = "dyIotJDExpressListenerTaskCloud" //
        };

        services.AddCache (options => {
            options.RedisEndPoints = _config.Get ("RedisEndPoints");
            options.RedisServer = _config.Get ("RedisServer");
            options.RedisPwd = _config.Get ("RedisPwd");
            options.RedisDBId = _config.Get ("RedisDBId");
        }, cacheDbOptions => {
            cacheDbOptions.AssemblyName = "ExpressOrder.Dao";
        });
        serviceProvider = services.BuildServiceProvider ();
        ICache _cache = serviceProvider.GetService<ICache> ();
        services.AddSingleton (new AliMNSHelper (_config.Get ("MNSSH_AccessKey"), _config.Get ("MNSSH_SecretKey"), _config.Get ("MNSSH_EndPoint")));
        RedisCache cache = new RedisCache (_config.Get ("RedisServer"), int.Parse (_config.Get ("RedisDBId")));
        services.AddSingleton (new MqttNetClient (mqttConfig, null, cache));

        services.AddSingleton (
            new Dictionary<DbConnectionNameEnum, string> { { DbConnectionNameEnum.MainConnection, _config.Get ("MySqlServer") },
                { DbConnectionNameEnum.AccountConnection, _config.Get ("MySqlServerAcc") }
            });
        services.AddSingleton<IElasticClient> (es => new ElasticClient (new ConnectionSettings (new Uri (_config.Get ("ElasticSearchUrl"))).RequestTimeout (TimeSpan.FromSeconds (5)).DefaultFieldNameInferrer ((name) => name)));

        //以上是注入IOC的實例
        services.AddHostedService<SyncMessageReplyListenerServiceImpl> ();
        services.AddHostedService<JDPayNotifyListenerServiceImpl> ();
        //注入兩個后台運行服務
    });

京東支付后台運行服務

public class JDPayNotifyListenerServiceImpl : BackgroundService {
    private readonly AliMNSHelper _aliMNSHelper;
    private readonly IConfig _config;
    private readonly IKernelorderChargeDao _kernelorderChargeDao;
    private readonly IKernelPostOrderDao _kernelPostOrderDao;
    private readonly IUtilService _utilService;
    private readonly IMQTTDeviceService _deviceService;
    private readonly ICache _cache;
    private readonly IKernelDeviceGroupDao _kernelDeviceGroupDao;
    private readonly IKernelPostorderPayInfoDao _kernelPostorderPayInfoDao;
    private readonly IKernelSmartboxBoxactInfoDao _kernelSmartboxBoxactInfoDao;
    public JDPayNotifyListenerServiceImpl (AliMNSHelper aliMNSHelper, IConfig config, IKernelorderChargeDao kernelorderChargeDao,
        IUtilService utilService, IMQTTDeviceService deviceService, IKernelPostOrderDao kernelPostOrderDao, ICache cache, IKernelDeviceGroupDao kernelDeviceGroupDao, IKernelPostorderPayInfoDao kernelPostorderPayInfoDao, IKernelSmartboxBoxactInfoDao kernelSmartboxBoxactInfoDao) {
        _aliMNSHelper = aliMNSHelper;
        _config = config;
        _kernelorderChargeDao = kernelorderChargeDao;
        _utilService = utilService;
        _deviceService = deviceService;
        _kernelPostOrderDao = kernelPostOrderDao;
        _cache = cache;
        _kernelPostorderPayInfoDao = kernelPostorderPayInfoDao;
        _kernelDeviceGroupDao = kernelDeviceGroupDao;
        _kernelSmartboxBoxactInfoDao = kernelSmartboxBoxactInfoDao;
    }
    
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        Task.Run(() => Process(stoppingToken));
        return Task.CompletedTask;
    }

    protected void Process (CancellationToken stoppingToken) {
        Console.WriteLine ("開啟監聽京東支付回調");
        string queueName = _config.Get ("JDPayNotifyQueueName");
        while (!stoppingToken.IsCancellationRequested) {
            Message mqMsg = _aliMNSHelper.ReceiveMsg (queueName);
            if (mqMsg != null) {
                string mqResInfo = string.Empty;
                try {
                    //只要進了這里就干掉了
                    if (mqMsg != null && !string.IsNullOrEmpty (mqMsg.Body)) {
                        _aliMNSHelper.DeleteMsg (queueName, mqMsg.ReceiptHandle, out mqResInfo);
                        NLogger.Default.Info ("JDPayNotifyQueue支付回調接收信息:" + mqMsg.Body);
                        Console.WriteLine ($"JDPayNotifyQueue接收信息:{mqMsg.Body}:當前時間:{DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")}");
                        JDPayReturnModel payNotifyInModel = JsonConvert.DeserializeObject<JDPayReturnModel> (mqMsg.Body);
                        string signData = JDEncryptionUtil.SignReturnData (payNotifyInModel, _config.Get ("JDPayMd5Key"));

                        if (!signData.EqualIgnoreCase (payNotifyInModel.sign)) {
                            NLogger.Default.Info ("京東支付異步回調非法請求:" + mqMsg.Body);
                            continue;
                        }
                        string payNotifyInModelStr = EncryptUtil.Base64Decode (payNotifyInModel.data);
                        JDPayNotifyModel jDPayNotifyModel = JsonConvert.DeserializeObject<JDPayNotifyModel> (payNotifyInModelStr);
                        bool success = false;
                        //訂單號查詢是否支付
                        var rechargeOrderInfo = _kernelorderChargeDao.GetRechargeOrder (jDPayNotifyModel.orderNo).Result;
                        if (rechargeOrderInfo != null) {
                            //充值訂單還沒處理過
                            if (!rechargeOrderInfo.IsPaid) {
                                //處理京東支付類型
                                int payType = jDPayNotifyModel.payType.Equals ("WXPAY") ? 2 : 10;
                                success = _kernelorderChargeDao.SetRechargeOrderPaid (rechargeOrderInfo.OrderId, payType, Convert.ToDecimal (jDPayNotifyModel.payAmount), Convert.ToDateTime (jDPayNotifyModel.payTime), jDPayNotifyModel.orderType, jDPayNotifyModel.orderSource, jDPayNotifyModel.externalId, jDPayNotifyModel.businessType, jDPayNotifyModel.outBizNo, jDPayNotifyModel.termNo).Result;
                                NLogger.Default.Info ("訂單號{0},更新支付結果{1}", rechargeOrderInfo.OrderId, success ? "成功" : "失敗");
                            }
                        }
                    }
                } catch (Exception ex) {
                    //寫入異常日志
                    NLogger.Default.Error ("京東支付回調異常" + ex.ToString ());
                }
            }
        }
    }
}

如何支持多個后台運行服務

主要是實現

//該方法隨着HostBuilder啟動而啟動,即生命周期一致
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
   //必須要開啟異步處理才能支持多個后台運行程序
   Task.Run(() => Process(stoppingToken));
   return Task.CompletedTask;
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM