發布xxl-job executor dotnet core 執行器的實現


DotXxlJob

[github][https://github.com/xuanye/DotXxlJob]
xxl-job的dotnet core 執行器實現,支持XXL-JOB 2.0+

1 XXL-JOB概述

[XXL-JOB][1]是一個輕量級分布式任務調度平台,其核心設計目標是開發迅速、學習簡單、輕量級、易擴展。現已開放源代碼並接入多家公司線上產品線,開箱即用。以下是它的架構圖
架構圖

2. 關於DotXxlJob產生

在工作中調研過多個任務調度平台,如Hangfire、基於Quatz.NET的第三方擴展,都與實際的需求有一點差距。 之前一直使用Hangfire,Hangfire的執行器在同步調用業務服務時,如果即時業務服務正在重新部署或者重啟,有一定概率會出現死鎖,導致CPU100%,后來全部調整為異步,但是這樣就無法獲得執行結果,這樣的設計有蠻大問題,XxlJob的回調機制很好的解決了這個問題。本身如果通過http的方式調用,只要部署springbootd的一個執行器就可以解決問題,但是擴展性較差。所以萌生了實現DotNet版本的執行器的想法,為避免重復造輪子,開始之前也進行過調研,以下倉庫[https://github.com/yuniansheng/xxl-job-dotnet][2]給了較大的啟發,但是該庫只支持1.9版本的xxljob,還有一些其他小問題,所以還是自力更生。

3. 如何使用

目前只實現了BEAN的方式,即直接實現IJobHandler調用的方式,Glue源碼的方式實際上實現起來也並不復雜(有需求再說把),或者各位有需求Fork 實現一下

可參考sample

安裝:

dotnet add package DotXxlJob.Core

3.1 在AspNetCore中使用

  1. 生命一個AspNet的Middleware中間件,並擴展ApplicationBuilder,本質是攔截Post請求,解析Body中的流信息
public class XxlJobExecutorMiddleware
{
    private readonly IServiceProvider _provider;
    private readonly RequestDelegate _next;

    private readonly XxlRpcServiceHandler _rpcService;
    public XxlJobExecutorMiddleware(IServiceProvider provider, RequestDelegate next)
    {
        this._provider = provider;
        this._next = next;
        this._rpcService = _provider.GetRequiredService<XxlRpcServiceHandler>();
    }

    public async Task Invoke(HttpContext context)
    {

        if ("POST".Equals(context.Request.Method, StringComparison.OrdinalIgnoreCase) && 
            "application/octet-stream".Equals(context.Request.ContentType, StringComparison.OrdinalIgnoreCase))
        {
            var rsp =  await _rpcService.HandlerAsync(context.Request.Body);
            context.Response.StatusCode = (int) HttpStatusCode.OK;
            context.Response.ContentType = "text/plain;utf-8";
            await context.Response.Body.WriteAsync(rsp,0,rsp.Length);
            return;
        }
        await _next.Invoke(context);
    }
}

擴展ApplicationBuilderExtensions,可根據實際情況綁定在特殊的Url Path上

public static class ApplicationBuilderExtensions
{
    public static IApplicationBuilder UseXxlJobExecutor(this IApplicationBuilder @this)
    {
       return @this.UseMiddleware<XxlJobExecutorMiddleware>();
    }
}

在Startup中添加必要的引用,其中自動注冊。

public class Startup
{
    public Startup(IConfiguration configuration)
    {
        Configuration = configuration;
    }

    private IConfiguration Configuration { get; }
    
    public void ConfigureServices(IServiceCollection services)
    {
      
        services.AddXxlJobExecutor(Configuration);
        services.AddSingleton<IJobHandler, DemoJobHandler>(); // 添加自定義的jobHandler
        services.AddAutoRegistry(); // 自動注冊
    }


    public void Configure(IApplicationBuilder app,IHostingEnvironment env)
    {
        //啟用XxlExecutor
        app.UseXxlJobExecutor();
    }
}

編寫JobHandler,繼承AbstractJobHandler或者直接實現接口IJobHandler,通過context.JobLogger 記錄執行過程和結果,在AdminWeb上可查看的哦

[JobHandler("demoJobHandler")]
public class DemoJobHandler:AbstractJobHandler
{
    public override Task<ReturnT> Execute(JobExecuteContext context)
    {
        context.JobLogger.Log("receive demo job handler,parameter:{0}",context.JobParameter);

        return Task.FromResult(ReturnT.SUCCESS);
    }
}

3.2 配置信息

管理端地址和端口是必填信息,其他根據實際情況,選擇配置,配置項說明見下代碼中的注釋

 public class XxlJobExecutorOptions
{
   
    /// <summary>
    /// 管理端地址,多個以;分隔
    /// </summary>
    public string AdminAddresses { get; set; }
    /// <summary>
    /// appName自動注冊時要去管理端配置一致
    /// </summary>
    public string AppName { get; set; } = "xxl-job-executor-dotnet";
    /// <summary>
    /// 自動注冊時提交的地址,為空會自動獲取內網地址
    /// </summary>
    public string SpecialBindAddress { get; set; }
    /// <summary>
    /// 綁定端口
    /// </summary>
    public int Port { get; set; }
    /// <summary>
    /// 是否自動注冊
    /// </summary>
    public bool AutoRegistry { get; set; }
    /// <summary>
    /// 認證票據
    /// </summary>
    public string AccessToken { get; set; }
    /// <summary>
    /// 日志目錄,默認為執行目錄的logs子目錄下,請配置絕對路徑
    /// </summary>
    public string LogPath { get; set; } = Path.Combine(AppContext.BaseDirectory, "./logs");
    /// <summary>
    /// 日志保留天數
    /// </summary>
    public int LogRetentionDays { get; set; } = 30;
}

在其他Http服務中使用

只需要實現Http請求的攔截,並判斷post請求中content-Type="application/octet-stream",並使用XxlRpcServiceHandler來處理流 即可。

其他說明

XXL-JOB內置的RPC是使用Hessian協議,這個有點坑。很多都是java特有的屬性和標識,比如類名什么的。在本項目中,並沒有實現完整的Hessian2協議,只實現了使用到的類型,當然擴展起來也非常方便。如果有人要單獨使用Hessian 這個類庫的話,要特別注意這個問題。

有任何問題,可Issue反饋 ,最后感謝 xxl-job


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM