C# 讀寫鎖


讀寫鎖

    /// <summary>
    /// 提供異步獨占和並發執行支持
    /// </summary>
    public sealed class AsyncReaderWriter
    {
        /// <summary>
        /// 在當前實例中保護所有共享狀態的鎖
        /// </summary>
        private readonly object _lock = new object();

        /// <summary>
        /// 並發讀任務等待執行的隊列
        /// </summary>
        private readonly Queue<Task> _waitingCocurrent = new Queue<Task>();

        /// <summary>
        /// 獨占寫任務等待執行的隊列
        /// </summary>
        private readonly Queue<Task> _waitingExclusive = new Queue<Task>();

        /// <summary>
        /// 並發讀任務正在執行的數量
        /// </summary>
        private int _currentConcurrent = 0;

        /// <summary>
        /// 獨占寫任務是否正在執行
        /// </summary>
        private bool _currentlyExclusive = false;

        /// <summary>
        /// 非泛型的任務創建工廠
        /// </summary>
        private TaskFactory _factory;

        /// <summary>
        /// 初始化
        /// </summary>
        public AsyncReaderWriter()
        {
            this._factory = Task.Factory;
        }

        /// <summary>
        /// 使用指定的<see cref="TaskFactory"/>初始化<see cref="AsyncReaderWriter"/>, 為我們創建所有任務
        /// </summary>
        /// <param name="factory">用來創建所有任務的<see cref="TaskFactory"/></param>
        public AsyncReaderWriter(TaskFactory factory)
        {
            if (factory == null)
                throw new ArgumentNullException("factory");
            this._factory = factory;
        }

        /// <summary>
        /// 獲取當前隊列中正在等待的獨占寫任務數量
        /// </summary>
        public int WaitingExclusive { get { lock (_lock) return this._waitingExclusive.Count; } }

        /// <summary>
        /// 獲取當前隊列中正在等待的並發讀任務數量
        /// </summary>
        public int WaitingConcurrent { get { lock (_lock) return this._waitingCocurrent.Count; } }

        /// <summary>
        /// 獲取並發讀任務正在執行的數量
        /// </summary>
        public int CurrentConcurrent { get { lock (_lock) return this._currentConcurrent; } }

        /// <summary>
        /// 獲取當前獨占寫任務是否正在執行
        /// </summary>
        public bool CurrentlyExclusive { get { lock (_lock) return this._currentlyExclusive; } }

        /// <summary>
        /// 將獨占寫<see cref="Action"/>入隊到<see cref="AsyncReaderWriter"/>
        /// </summary>
        /// <param name="action">將要被以獨占寫方式執行的<see cref="Action"/></param>
        /// <returns>表示執行提供的<see cref="Action"/>的任務</returns>
        public Task QueueExclusiveWriter(Action action)
        {
            var task = new Task(state =>
            {
                try
                {
                    //運行提供的action
                    ((Action)state)();
                }
                finally
                {
                    //確保我們完成后清理
                    FinishExclusiveWriter();
                }
            }, action, this._factory.CancellationToken, this._factory.CreationOptions);

            lock (_lock)
            {
                //如果當前有任務正在運行,或者其他的獨占寫任務需要運行, 入隊
                //否則,沒有其他正在運行或將要運行的任務,現在就執行當前任務
                if (this._currentlyExclusive || this._currentConcurrent > 0 || this._waitingExclusive.Count > 0)
                {
                    this._waitingExclusive.Enqueue(task);
                }
                else
                {
                    RunExclusive_RequiresLock(task);
                }
            }
            return task;
        }

        /// <summary>
        /// 將獨占寫<see cref="Func{TResult}"/>入隊到<see cref="AsyncReaderWriter"/>
        /// </summary>
        /// <typeparam name="TResult"><see cref="Func{TResult}"/>委托封裝的方法的返回值類型。</typeparam>
        /// <param name="fun">將要被以獨占寫方式指定的<see cref="Func{TResult}"/></param>
        /// <returns>表示執行提供的<see cref="Func{TResult}"/>的任務</returns>
        public Task<TResult> QueueExclusiveWriter<TResult>(Func<TResult> fun)
        {
            var task = new Task<TResult>(state =>
            {
                try
                {
                    return ((Func<TResult>)state)();
                }
                finally
                {
                    FinishExclusiveWriter();
                }
            }, fun, this._factory.CancellationToken, this._factory.CreationOptions);

            lock (_lock)
            {
                //如果當前有任務正在運行,或者其他的獨占寫任務需要運行, 入隊
                //否則,沒有其他正在運行或將要運行的任務,現在就執行當前任務
                if (this._currentlyExclusive || this._currentConcurrent > 0 || this._waitingExclusive.Count > 0)
                {
                    this._waitingExclusive.Enqueue(task);
                }
                else
                {
                    RunExclusive_RequiresLock(task);
                }
            }
            return task;
        }

        /// <summary>
        /// 將並發讀<see cref="Action"/>入隊到<see cref="AsyncReaderWriter"/>
        /// </summary>
        /// <param name="action">將要被以並發讀方式執行的<see cref="Action"/></param>
        /// <returns>表示執行提供的<see cref="Action"/>的任務</returns>
        public Task QueueConcurrentReader(Action action)
        {
            var task = new Task(state =>
            {
                try
                {
                    ((Action)state)();
                }
                finally
                {
                    FinishConcurrentReader();
                }
            }, action, this._factory.CancellationToken, this._factory.CreationOptions);

            lock (_lock)
            {
                //如果現在有獨占寫任務正在運行或者等待
                //將當前任務入隊
                if (this._currentlyExclusive || this._waitingExclusive.Count > 0)
                {
                    this._waitingCocurrent.Enqueue(task);
                }
                else
                {
                    //否則立即調度
                    RunConcurrent_RequiresLock(task);
                }
            }
            return task;
        }

        /// <summary>
        /// 將並發讀<see cref="Func{TResult}"/>入隊到<see cref="AsyncReaderWriter"/>
        /// </summary>
        /// <typeparam name="TResult"><see cref="Func{TResult}"/>委托封裝的方法返回值類型</typeparam>
        /// <param name="fun">將要被以並發讀方式執行的<see cref="Func{TResult}"/></param>
        /// <returns>表示執行提供的<see cref="Func{TResult}"/>的任務</returns>
        public Task<TResult> QueueConcurrentReader<TResult>(Func<TResult> fun)
        {
            var task = new Task<TResult>(state =>
            {
                try
                {
                    return ((Func<TResult>)state)();
                }
                finally
                {
                    FinishConcurrentReader();
                }
            }, fun, this._factory.CancellationToken, this._factory.CreationOptions);

            lock (_lock)
            {
                if (_currentlyExclusive || this._waitingExclusive.Count > 0)
                {
                    this._waitingCocurrent.Enqueue(task);
                }
                else
                {
                    RunConcurrent_RequiresLock(task);
                }
            }
            return task;
        }

        #region 私有方法

        /// <summary>
        /// 開始指定的獨占任務
        /// </summary>
        /// <param name="exclusive">即將開始的獨占任務</param>
        private void RunExclusive_RequiresLock(Task exclusive)
        {
            this._currentlyExclusive = true;
            exclusive.Start(this._factory.Scheduler ?? TaskScheduler.Current);
        }

        /// <summary>
        /// 開始指定的並發任務
        /// </summary>
        /// <param name="concurrent">即將開始的並發任務</param>
        private void RunConcurrent_RequiresLock(Task concurrent)
        {
            this._currentConcurrent++;
            concurrent.Start(this._factory.Scheduler ?? TaskScheduler.Current);
        }

        /// <summary>
        /// 開始並發隊列中的所有任務
        /// </summary>
        private void RunConcurrent_RequiresLock()
        {
            while (this._waitingCocurrent.Count > 0)
            {
                RunConcurrent_RequiresLock(this._waitingCocurrent.Dequeue());
            }
        }

        /// <summary>
        /// 完成並發讀任務
        /// </summary>
        private void FinishConcurrentReader()
        {
            lock (_lock)
            {
                //運行到此處,表示一個並發任務已結束
                this._currentConcurrent--;

                //如果現在正在運行的並發任務數為0, 並且還有正在等待的獨占任務, 執行一個
                if (this._currentConcurrent == 0 && this._waitingExclusive.Count > 0)
                {
                    RunExclusive_RequiresLock(this._waitingExclusive.Dequeue());
                }
                //否則, 如果現在沒有等待的獨占任務,而有一些因為某些原因等待的並發任務(它們本應該在添加到隊列的時候就開始了), 運行所有正在等待的並發任務
                else if (this._waitingExclusive.Count == 0 && this._waitingCocurrent.Count > 0)
                {
                    RunConcurrent_RequiresLock();
                }
            }
        }

        /// <summary>
        /// 完成獨占寫任務
        /// </summary>
        private void FinishExclusiveWriter()
        {
            lock (_lock)
            {
                //運行到此處,表示一個獨占任務已結束
                this._currentlyExclusive = false;

                //如果當前仍有正在等待的獨占任務, 以內聯方式運行下一個
                if (this._waitingExclusive.Count > 0)
                {
                    RunExclusive_RequiresLock(this._waitingExclusive.Dequeue());
                }
                //否則, 如果當前仍有正在等待的並發任務, 運行所有
                else if (this._waitingCocurrent.Count > 0)
                {
                    RunConcurrent_RequiresLock();
                }
            }
        }
        #endregion
    }

使用方式:

var read = new Action(() =>
{
    Debug.WriteLine($"讀取:{File.ReadLines(fileName).EmptyToNull()?.Last()}");
});

var write = new Action(() =>
{
    var text = $"{DateTime.Now.Ticks.ToString()}\r\n";
    File.AppendAllText(fileName, text);
    Debug.Write($"\t寫入:{text}");
});

var rw = new AsyncReaderWriter();
for(var  i = 0; i < 10; i++)
{
    rw.QueueExclusiveWriter(write);
    rw.QueueConcurrentReader(read);
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM