使用Ring Buffer構建高性能的文件寫入程序



最近常收到SOD框架的朋友報告的SOD的SQL日志功能報錯:文件句柄丟失。經過分析得知,這些朋友使用SOD框架開發了訪問量比較大的系統,由於忘記關閉SQL日志功能所以出現了很高頻率的日志寫入操作,從而偶然引起錯誤。后來我建議只記錄出錯的或者執行時間較長的SQL信息,暫時解決了此問題。但是作為一個熱心造輪子的人,一定要看看能不能造一個更好的輪子出來。

前面說的錯誤原因已經很直白了,就是頻繁的日志寫入導致的,那么解決方案就是將多次寫入操作合並成一次寫入操作,並且采用異步寫入方式。要保存多次操作的內容就要有一個類似“隊列”的東西來保存,而一般的線程安全的隊列,都是“有鎖隊列”,在性能要求很高的系統中,不希望在日志記錄這個地方耗費多一點計算資源,所以最好有一個“無鎖隊列”,因此最佳方案就是Ring Buffer(環形緩沖區)了。

 什么是Ring Buffer?顧名思義,就是一個內存環,每一次讀寫操作都循環利用這個內存環,從而避免頻繁分配和回收內存,減輕GC壓力,同時由於Ring Buffer可以實現為無鎖的隊列,從而整體上大幅提高系統性能。Ring Buffer的示意圖如下,有關具體原理,請參考此文《Ring Buffer 有什么特別? 》。

 Ring buffer

上文並沒有詳細說明如何具體讀寫Ring Buffer,但是原理介紹已經足夠我們怎么寫一個Ring Buffer程序了,接下來看看我在 .NET上的實現。

首先,定一個存放數據的數組,記住一定要用數組,它是實現Ring Buffer的關鍵並且CPU友好。

const int C_BUFFER_SIZE = 10;//寫入次數緩沖區大小,每次的實際內容大小不固定
string[] RingBuffer = new string[C_BUFFER_SIZE];
int writedTimes = 0;


變量writedTimes 記錄寫入次數,它會一直遞增,不過為了線程安全的遞增且不使用托管鎖,需要使用原子鎖Interlocked。之后,根據每次 writedTimes 跟環形緩沖區的大小求余數,得到當前要寫入的數組位置:

 void SaveFile(string fileName, string text)
 {
            int currP= Interlocked.Increment(ref writedTimes);
            int writeP= currP % C_BUFFER_SIZE ;
            int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1;
            RingBuffer[index] = " Arr[" + index + "]:" + text;
  }


Ring Buffer的核心代碼就這么點,調用此方法,會一直往緩沖區寫入數據而不會“溢出”,所以寫入Ring Buffer效率很高。


一個隊列如果只生產不消費肯定不行的,那么如何及時消費Ring Buffer的數據呢?簡單的方案就是當Ring Buffer“寫滿”的時候一次性將數據“消費”掉。注意這里的“寫滿”僅僅是指寫入位置 index達到了數組最大索引位置,而“消費”也不同於常見的堆棧,隊列等數據結構,只是讀取緩沖區的數據而不會移除它。

所以前面的代碼只需要稍加改造:

 void SaveFile(string fileName, string text)
 {
            int currP= Interlocked.Increment(ref writedTimes);
            int writeP= currP % C_BUFFER_SIZE ;
            int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1;
            RingBuffer[index] = " Arr[" + index + "]:" + text;
            if (writeP == 0 )
            {
                 string result = string.Concat( RingBuffer);
                 FlushFile(fileName, result);
            }
  }


writeP == 0 表示當前一輪的緩沖區已經寫滿,然后調用函數 FlushFile 將Ring Buffer的數據連接起來,整體寫入文件。

        void FlushFile(string fileName, string text)
        {
            using (FileStream fs = new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.Write, 2048, FileOptions.Asynchronous))
            {
                byte[] buffer = System.Text.Encoding.UTF8.GetBytes(text);
                IAsyncResult writeResult = fs.BeginWrite(buffer, 0, buffer.Length,
                    (asyncResult) =>
                    {
                        fs.EndWrite(asyncResult);
                       
                    },
                    fs);
                //fs.EndWrite(writeResult);//這種方法異步起不到效果
                fs.Flush();
                
            }
        }

在函數 FlushFile 中我們使用了異步寫入文件的技術,注意 FileOptions.Asynchronous ,使用它才可以真正利用Windows的完成端口IOCP,將文件異步寫入。

當然這段代碼也可以使用.NET最新版本支持的 async/await ,不過我要讓SOD框架繼續支持.NET 2.0,所以只好這樣寫了。

現在,我們可以開多線程來測試這個循環隊列效果怎么樣:

            Task[] arrTask = new Task[20];
            for (int i = 0; i < arrTask.Length; i++)
            {
                arrTask[i] = new Task(obj => SaveFile( (int)obj) ,i);
            }
            for (int i = 0; i < arrTask.Length; i++)
            {
                arrTask[i].Start();
            }
            
            Task.WaitAll(arrTask);
            MessageBox.Show(arrTask.Length +" Task All OK.");

這里開啟20個Task任務線程來寫入文件,運行此程序,發現20個線程才寫入了10條數據,分析很久才發現,文件異步IO太快的話,會有緩沖區丟失,第一次寫入的10條數據無法寫入文件,多運行幾次就沒有問題了。所以還是得想法解決此問題。

通常情況下我們都是使用托管鎖來解決這種並發問題,但本文的目的就是要實現一個“無鎖環形緩沖區”,不能在此“功虧一簣”,所以此時“信號量”上場了。

同步可以分為鎖定和信號同步,信號同步機制中涉及的類型都繼承自抽象類WaitHandle,這些類型有EventWaitHandle(類型化為AutoResetEvent、ManualResetEvent)、Semaphore以及Mutex。見下圖:

首先聲明一個 ManualResetEvent對象:

ManualResetEvent ChangeEvent = new ManualResetEvent(true);

這里我們將 ManualResetEvent 對象設置成 “終止狀態”,意味着程序一開始是允許所有線程不等待的,當我們需要消費Ring Buffer的時候再將  ManualResetEvent 設置成“非終止狀態”,阻塞其它線程。簡單說就是當要寫文件的時候將環形緩沖區阻塞,直到文件寫完才允許繼續寫入環形緩沖區。

對應的新的代碼調整如下:

 void SaveFile(string fileName, string text)
 {
            ChangeEvent.WaitOne();
            int currP= Interlocked.Increment(ref writedTimes);
            int writeP= currP % C_BUFFER_SIZE ;
            int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1;
            RingBuffer[index] = " Arr[" + index + "]:" + text;
            if (writeP == 0 )
            {
                 ChangeEvent.Reset();
                 string result = string.Concat( RingBuffer);
                 FlushFile(fileName, result);
            }
  }

然后,再FlushFile 方法的 回掉方法中,加入設置終止狀態的代碼,部分代碼如下:

(asyncResult) =>
                    {
                        fs.EndWrite(asyncResult);
                        ChangeEvent.Set();
                     }

OK,現在我們的程序具備高性能的安全的寫入日志文件的功能了,我們來看看演示程序測試的日志結果實例:

 Arr[0]:Thread index:0--FFFFFFF
 Arr[1]:Thread index:1--FFFFFFF
 Arr[2]:Thread index:8--FFFFFFF
 Arr[3]:Thread index:9--FFFFFFF
 Arr[4]:Thread index:3--FFFFFFF
 Arr[5]:Thread index:2--FFFFFFF
 Arr[6]:Thread index:4--FFFFFFF
 Arr[7]:Thread index:10--FFFFFFF
 Arr[8]:Thread index:5--FFFFFFF
 Arr[9]:Thread index:6--FFFFFFF
 Arr[0]:Thread index:7--FFFFFFF
 Arr[1]:Thread index:11--FFFFFFF
 Arr[2]:Thread index:12--FFFFFFF
 Arr[3]:Thread index:13--FFFFFFF
 Arr[4]:Thread index:14--FFFFFFF
 Arr[5]:Thread index:15--FFFFFFF
 Arr[6]:Thread index:16--FFFFFFF
 Arr[7]:Thread index:17--FFFFFFF
 Arr[8]:Thread index:18--FFFFFFF
 Arr[9]:Thread index:19--FFFFFFF

測試結果符合預期!
到此,我們今天的主題就全部介紹完成了,不過要讓本文的代碼能夠符合實際的運行,還要解決每次只寫入少量數據並且將它定期寫入日志文件的問題,這里貼出真正的局部代碼:

 

PS:有朋友說采用信號量並不能完全保證程序安全,查閱了MSDN也說如果信號量狀態改變還沒有來得及應用,那么是起不到作用的,所以還需要檢查業務狀態標記,也就是在設置非終止狀態后,馬上設置一個操作標記,在其它線程中,需要檢查此標記,以避免“漏網之魚”引起不期望的結果。

再具體實現上,我們可以實現一個“自旋鎖”,循環檢查此狀態標記,為了防止發生死鎖,還需要有鎖超時機制,代碼如下:

 void SaveFile(string fileName, string text)
        {
            ChangeEvent.WaitOne(10000);
            int currP= Interlocked.Increment(ref WritedTimes);
            int writeP= currP % C_BUFFER_SIZE ;
            int index = writeP == 0 ? C_BUFFER_SIZE - 1 : writeP - 1;
           
            if (writeP == 0 )
            {
                ChangeEvent.Reset();
                IsReading = true;
                RingBuffer[index] = " Arr[" + index + "]:" + text;

                LastWriteTime = DateTime.Now;
                WritingIndex = 0;
                SaveFile(fileName,RingBuffer);
            }
            else if (DateTime.Now.Subtract(LastWriteTime).TotalSeconds > C_WRITE_TIMESPAN)
            {
                ChangeEvent.Reset();
                IsReading = true;
                RingBuffer[index] = " Arr[" + index + "]:" + text;

                int length = index - WritingIndex + 1;
                if (length <= 0)
                    length = 1;
                string[] newArr = new string[length];
                Array.Copy(RingBuffer, WritingIndex, newArr, 0, length);

                LastWriteTime = DateTime.Now;
                WritingIndex = index + 1;
                SaveFile(fileName, newArr);
            }
            else
            {
                //防止漏網之魚的線程在信號量產生作用之前修改數據
                //采用“自旋鎖”等待
                int count = 0;
                while (IsReading)
                {
                    if (count++ > 10000000)
                    {
                        Thread.Sleep(50);
                        break;
                    }
                }
                RingBuffer[index] = " Arr[" + index + "]:" + text;
            }
        }

 


完整的Ring Buffer代碼會在最新版本的SOD框架源碼中,有關本篇文章測試程序的完整源碼,請加QQ群討論獲取,

群號碼:SOD框架高級群 18215717 ,加群請注明 PDF.NET技術交流 ,否則可能被拒絕。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM