Unity Job System


  參考鏈接 : 

  http://esprog.hatenablog.com/entry/2018/05/19/150313

  https://blogs.unity3d.com/2018/10/22/what-is-a-job-system/

 

  Job系統作為一個多線程系統, 它因為跟ECS有天生的融合關系所以比較重要的樣子, 我也按照使用類型的分類來看看Job System到底怎么樣.

  Job說實話就是一套封裝的多線程系統, 我相信所有開發人員都能自己封裝一套, 所以Unity推出這個的時候跟着ECS一起推出, 因為單獨推出來的話肯定推不動, 多線程, 線程安全, 線程鎖, 線程共享資源, 這些都沒什么區別, 我從一個簡單列表的功能來說吧.

  先來一個普通的多線程 :  

using System.Collections;
using System.Collections.Generic;
using UnityEngine;

using System;
using System.Threading;

public class NormalListAccessTest01 : MonoBehaviour
{
    public class RunData
    {
        public List<int> datas = new List<int>();
        public float speed;
        public float deltaTime;
    }

    public static void RunOnThread<T>(System.Action<T> call, T obj, System.Action endCall = null)
    {
        System.Threading.ThreadPool.QueueUserWorkItem((_obj) =>
        {
            call.Invoke(obj);
            if(endCall != null)
            { ThreadMaster.Instance.CallFromMainThread(endCall); }
        });
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Run Test"))
        {
            ThreadMaster.GetOrCreate();
            var data = new RunData();
            data.deltaTime = Time.deltaTime;
            data.speed = 100.0f;
            for(int i = 0; i < 10000; i++)
            {
                data.datas.Add(i);
            }
            RunOnThread<RunData>((_data) =>
            {
                // 這是在工作線程里
                Debug.Log("Start At : " + System.DateTime.Now.ToString("HH:mm:ss fff"));
                var move = _data.deltaTime * _data.speed;
                for(int i = 0; i < _data.datas.Count; i++)
                {
                    var val = _data.datas[i] + 1;
                    _data.datas[i] = val;
                }
            }, data, () =>
            {
                // 這是在主線程里
                Debug.Log(data.datas[0]);
                Debug.Log("End At : " + System.DateTime.Now.ToString("HH:mm:ss fff"));
            });
        }
    }
}

  線程轉換的一個簡單封裝ThreadMaster : 

using System.Collections;
using System.Collections.Generic;
using UnityEngine;

public class ThreadMaster : MonoBehaviour
{
    private static ThreadMaster _instance;
    public static ThreadMaster Instance
    {
        get
        {
            return GetOrCreate();
        }
    }

    private volatile List<System.Action> _calls = new List<System.Action>();

    public static ThreadMaster GetOrCreate()
    {
        if(_instance == false)
        {
            _instance = new GameObject("ThreadMaster").AddComponent<ThreadMaster>();
        }
        return _instance;
    }
    public void CallFromMainThread(System.Action call)
    {
        _calls.Add(call);
    }
    void Update()
    {
        if(_calls.Count > 0)
        {
            for(int i = 0; i < _calls.Count; i++)
            {
                var call = _calls[i];
                call.Invoke();
            }
            _calls.Clear();
        }
    }
}

  沒有加什么鎖, 簡單運行沒有問題, 下面來個Job的跑一下:  

using UnityEngine;
using Unity.Collections;
using Unity.Jobs;

public class JobSystemSample00 : MonoBehaviour
{
    struct VelocityJob : IJob
    {
        public NativeArray<int> datas;

        public void Execute()
        {
            for(var i = 0; i < datas.Length; i++)
            {
                datas[i] = datas[i] + 1;
            }
        }
    }

    public void Test()
    {
        var datas = new NativeArray<int>(100, Allocator.Persistent);

        var job = new VelocityJob()
        {
            datas = datas
        };

        JobHandle jobHandle = job.Schedule();
        JobHandle.ScheduleBatchedJobs();

        //Debug.Log(datas[0]);     // Error : You must call JobHandle.Complete()
        jobHandle.Complete();
        Debug.Log(datas[0]);

        datas.Dispose();
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
        {
            Test();
        }
    }
}

  這里就有一個大問題了, 在有注釋的地方 // Error : You must call JobHandle.Complete(), 是說在Job沒有調用Complete()時, 去獲取相關數組內容是非法的! 而這個jobHandle.Complete(); 無法通過工作線程去調用, 也就是說Job的運行它是無法自行結束的, 無法發出運行結束的通知的, 對比上面封裝的普通多線程弱爆了.  而這個Complete()函數如果在工作線程執行完成前調用, 會強制立即執行(文檔也是寫 Wait for the job to complete), 也就是說它只能在主線程調用並且會阻塞主線程, 這樣就可以定性了, 它的Job System不是為了提供一般使用的多線程封裝給我們用的, 可是它又是很強大的, 因為它能使用高效的內存結構, 能保證數據訪問安全, 能在需要的時候調用Complete方法強制等待工作線程執行完畢(如果沒猜錯的話, 引擎對這個做了很大優化, 並不是簡單等待), 還有BurstCompile等, 如果我們封裝成功了的話, 就是很好的多線程庫了.

  PS : 打個比方一個mesh的渲染, 在渲染之前必須計算完所有坐標轉換, Job的好處就是可以進行多線程並行的計算, 然后還能被主線程強制執行完畢, 比在主線程中單獨計算強多了. 而這個強制執行才是核心邏輯.

  經過幾次測試, 幾乎沒有辦法簡單擴展Job系統來讓它成為像上面一樣擁有自動完成通知的系統, 如下 : 

  1. 添加JobHandle變量到IJob中, 在Execute結束時調用  

    struct VelocityJob : IJob
    {
        public NativeArray<int> datas;

        [Unity.Collections.LowLevel.Unsafe.NativeDisableUnsafePtrRestriction]
        public JobHandle selfHandle;    // 是這個IJob調用Schedule的句柄

        public void Execute()
        {
            for(var i = 0; i < datas.Length; i++)
            {
                datas[i] = datas[i] + 1;
            }
            selfHandle.Complete();
        }
    }

  報錯, InvalidOperationException: VelocityJob.selfHandle.jobGroup uses unsafe Pointers which is not allowed. 無法解決, 直接就無法在IJob結構體中添加JobHandle變量. 並且無法在工作線程中調用Complete方法.

  2. 添加回調函數進去

    struct VelocityJob : IJob
    {
        public NativeArray<int> datas;

        public System.Action endCall;

        public void Execute()
        {
            for(var i = 0; i < datas.Length; i++)
            {
                datas[i] = datas[i] + 1;
            }
            if(endCall != null)
            {
                endCall.Invoke();
            }
        }
    }

  報錯, Job系統的struct里面只能存在值類型的變量 !!-_-

  3. 使用全局的引用以及線程轉換邏輯來做成自動回調的形式, 雖然可以使用了可是非常浪費資源 :

using UnityEngine;
using Unity.Collections;
using Unity.Jobs;
using System.Collections.Generic;

public class JobSystemSample01 : MonoBehaviour
{
    private static int _id = 0;
    public static int NewID => _id++;
    public static Dictionary<int, IJobCall> ms_handleRef = new Dictionary<int, IJobCall>();

    public class IJobCall
    {
        public JobHandle jobHandle;
        public System.Action endCall;
    }
    struct VelocityJob : IJob
    {
        public NativeArray<int> datas;

        public int refID;
        public void Execute()
        {
            for(var i = 0; i < datas.Length; i++)
            {
                datas[i] = datas[i] + 1;
            }
            var handle = ms_handleRef[refID];
            ThreadMaster.Instance.CallFromMainThread(() =>
            {
                handle.jobHandle.Complete();
                if(handle.endCall != null)
                {
                    handle.endCall.Invoke();
                }
            });
        }
    }

    public void Test()
    {
        ThreadMaster.GetOrCreate();
        var datas = new NativeArray<int>(100, Allocator.Persistent);
        int id = NewID;
        var job = new VelocityJob() { refID = id, datas = datas };
        ms_handleRef[id] = new IJobCall()
        {
            jobHandle = job.Schedule(),
            endCall = () => { Debug.Log(datas[0]); datas.Dispose(); }
        };
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
        {
            Test();
        }
    }
}

  通過上面封裝就可以作為一般多線程使用了, 並且我們獲得了引擎提供的數據安全和高效邏輯性, 再加上利用BurstCpmpile和只讀屬性, 能夠提升一些計算效率吧. ECS on Job已經在另外一篇中說過了, 這里忽略了.

  ----------------------------------------------

  當我測試到IJobParallelFor的時候, 發現並行並不像GPU那樣的並行那么美好, 因為GPU它本身就是全並行的, 像卷積之類的, 它跟像素的處理順序本身就沒有關系, 可是我們的邏輯有些會受順序的影響. 先看看下面的代碼 : 

using UnityEngine;
using Unity.Collections;
using Unity.Jobs;


public class IJobParallelForSample01 : MonoBehaviour
{
    struct VelocityJob : IJobParallelFor
    {
        public NativeArray<int> datas;

        public void Execute(int index)
        {
            if(index == 0)
            {
                index = datas.Length - 1;
            }
            datas[index] = datas[index - 1] + 1;
        }
    }

    public void Test()
    {
        var datas = new NativeArray<int>(100, Allocator.Persistent);
        for(int i = 0; i < datas.Length; i++)
        {
            datas[i] = i;
        }
        var job = new VelocityJob()
        {
            datas = datas
        };

        var jobHandle = job.Schedule(datas.Length, 20);
        JobHandle.ScheduleBatchedJobs();
        
        jobHandle.Complete();
        Debug.Log(datas[0]);

        datas.Dispose();
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
        {
            Test();
        }
    }
}

  主要的是Schedule的方法上 : public static JobHandle Schedule<T>(this T jobData, int arrayLength, int innerloopBatchCount, JobHandle dependsOn = default) where T : struct, IJobParallelFor;

  第二個參數innerloopBatchCount表示的是分塊的大小, 比如我們數組長度是100,  每20個元素分成一塊, 一共可以分5塊, 如果你的CPU核心數大於等於5它就能開5個線程來處理, 可是你不能去獲取這個塊之外的Index的數據:

  顯然這里數據每20個一組被分為了5組, 在5個線程里, 然后跨組獲取數據就報錯了.

  測試一下線程數是否5個 : 

    struct VelocityJob : IJobParallelFor
    {
        public NativeArray<int> datas;

        public void Execute(int index)
        {
            throw new System.Exception(index + " ERROR");
        }
    }

   5個線程報錯, 應該每個線程內的處理也是按照for的順序來的.

  把每個塊改成5的大小, 看看它能開幾個線程:

 var jobHandle = job.Schedule(datas.Length, 5);

  恩開了8個, 我的機器確實是8核的, 不過它的分塊不是我想的0-5-10-15, 或者0-12-24-36 而是整10的, 不知道為什么, 因為按照我設定每個分組是5, 而整體平均100/8=12.5而不應該是整10的, 具體不詳.

  如果我們要跟其它元素進行交互, 就只能把處理單元設置到跟數組一樣大, 才能在一個塊中處理:

using UnityEngine;
using Unity.Collections;
using Unity.Jobs;


public class IJobParallelForSample01 : MonoBehaviour
{
    struct VelocityJob : IJobParallelFor
    {
        public NativeArray<int> datas;

        public void Execute(int index)
        {
            if(index > 0 && index < datas.Length - 1)
            {
                datas[index] = datas[datas.Length - 1];
            }
        }
    }

    public void Test()
    {
        var datas = new NativeArray<int>(10, Allocator.Persistent);
        for(int i = 0; i < datas.Length; i++)
        {
            datas[i] = i;
        }
        var job = new VelocityJob()
        {
            datas = datas
        };

        var jobHandle = job.Schedule(datas.Length, datas.Length);
        JobHandle.ScheduleBatchedJobs();

        jobHandle.Complete();
        Debug .Log(datas[0]);

        datas.Dispose();
    }

    private void OnGUI()
    {
        if(GUI.Button(new Rect(100, 100, 100, 50), "Start Test"))
        {
            Test();
        }
    }
}

 

  順便測試一下各個線程的分配情況:

    private volatile static Dictionary<int, List<int>> ms_threads = new Dictionary<int, List<int>>();

    struct VelocityJob : IJobParallelFor
    {
        public NativeArray<int> datas;

        public void Execute(int index)
        {
            Debug.Log(index + " : " + System.Threading.Thread.CurrentThread.ManagedThreadId);
            lock(ms_threads)
            {
                List<int> val = null;
                ms_threads.TryGetValue(System.Threading.Thread.CurrentThread.ManagedThreadId, out val);
                if(val == null)
                {
                    val = new List<int>();
                    ms_threads[System.Threading.Thread.CurrentThread.ManagedThreadId] = val;
                }
                val.Add(index);
            }
        }
    }
        var jobHandle = job.Schedule(100, 5);

  結果是分為8個線程, 4個線程的塊為10, 4個為15

  所以不能想當然的去獲取其它Index的內容, 畢竟分塊邏輯不一定.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM