大數據高效復制的處理案例分析總結


一個老客戶提出這樣的需求,希望將SQLServer中的某個表的數據快速復制到SQLite數據庫里面以便進行定期的備份處理,數據表的記錄大概有50多萬條記錄,表有100個字段左右,除了希望能夠快速做好外,效率是第一位的,他自己測試總是在一兩個小時的時間以上。客戶提出這樣的需求,我我覺得肯定是沒有很好的利用事務的特性,否則速度應該會快得多,但是具體能快到什么程度,心里也不太確定。於是按照這個要求,把這樣大的表數據復制作為一個案例來進行研究,最終大數據的復制處理,不到20分鍾就可以完成全部數據的復制更新處理。本文主要介紹這個需求如何結合實際開發的需要進行處理,達到快速高效的復制數據的目的,並提供相關的實現思路和代碼供參考學習。

1、復制數據的需求及開發思路

由於客戶是需要做定期的數據備份,因此這樣的復制是進行的,因此大數據的復制效率肯定是很重要的,應該盡可能的短時間完成。數據表的記錄大概有50多萬條記錄,表有100個字段左右的需要也是比常規的表數據會多一些,因此需要做好很好的測試,我們根據這樣的需求背景,使用一個測試案例來對性能進行測試。

這樣多字段的表,數據字段的一一對應,手工肯定是很累的,所以我們使用代碼生成工具Database2Sharp來進行快速開發,這樣底層的處理我們就可以不用太過關注,而且可以為不同的數據處理,生成不同的數據訪問層即可。

在底層我們主要是采用了微軟的Enterprise Library的數據庫訪問模塊,因此它能夠很好抽象各種數據庫的事務,以適應各種不同數據庫的事務處理。使用微軟的Enterprise Library模塊,可以很好支持SQLSever、Oracle、Mysql、Access、SQLite等數據庫。

開發框架,常見的分層模式,可以分為UI層、BLL層、DAL層、IDAL層、Entity層、公用類庫層等等

框架的基類我們封裝了大量的通用性處理函數,包括數據訪問層、業務邏輯層的基類,所有的基類函數基本上都帶有一個DbTransaction trans = null 的定義,就是我們可以采用事務,也可以默認不采用事務,是一個可選性的事務參數。

如數據訪問接口和基於SQLServer的數據訪問類的實現圖示如下所示。

在最高級的抽象基類AbstractBaseDAL的數據訪問層里面,都有大量關於數據操作和相關事務的接口可以使用,因此我們在底層繼承的子類,如果我們處理數據的增刪改查等操作,基本上就不需要做任何擴展性代碼了,這樣很符合我們快速開發的目的。

在框架的整個數據訪問層,我們都定義了很多公用的、帶有事務參數的接口,如果我們在常規的數據處理里面,使用事務的話,那么也是很方便的事情。使用事務的批量處理,對於SQLite的操作來說,效率是非常明顯的,具體可以在我之前的隨筆里《使用事務操作SQLite數據批量插入,提高數據批量寫入速度,源碼講解》可以了解到,他們之間的處理效率是很大差距的。

 

2、使用代碼生成工具生成所需的代碼

上面講到,開發這樣的數據復制處理程序,這樣多字段的表,數據字段的一一對應,手工肯定是很累的,所以我們使用代碼生成工具Database2Sharp來進行快速開發。

因此使用代碼生成工具來快速生成所需要的代碼,展開數據庫后,從數據庫節點上,右鍵選擇【代碼生成】【Enterprise Library代碼生成】就可以生成標准的界面層一下的代碼了,由於我們整個案例是非標准的數據復制處理,界面部分不需要利用代碼生成工具進行Winform界面的生成的。

生成代碼的一步步操作,最后確認一下就可以生成相關的底層代碼了

最后我們生成這樣的BLL、DAL、IDAL、Entity幾個層的項目代碼,整個項目的代碼各種繼承關系已經處理好了,也就具有了基類擁有的增刪改查等基礎操作了。

我們做兩個不同數據庫的復制處理操作,關鍵還是要生成兩個不同數據庫訪問類的代碼(也就是生成一個標准的SQLServer后,復制一份代碼,修改下繼承基類即可實現),如下代碼是兩個數據訪問類的代碼,不用增加任何接口即可滿足當前項目的需要的了。

最終我們的項目結構如下所示。

 

3、進行數據復制處理的Winform界面代碼邏輯

為了方便整個復制過程的進度展示(很重要),我們設計了進度條以及文字內容,展示處理過程的進度和耗時等信息,最終界面設計如下所示。

整個界面設計利用后台線程的方式對數據復制進行處理,方便及時在界面顯示進度而不阻塞界面線程。

具體的界面代碼如下所示。

    public partial class FrmMain : Form
    {
        private TimeSpan ExecuteTime;
        private int currentCount = 0;
        private BackgroundWorker work = new BackgroundWorker();//使用后台線程進行處理,不阻塞界面顯示
        public FrmMain()
        {
            InitializeComponent();

            //定義后台線程的處理
            work.DoWork += work_DoWork;
            work.WorkerReportsProgress = true;
            work.ProgressChanged += work_ProgressChanged;
            work.RunWorkerCompleted += work_RunWorkerCompleted;
        }

        //線程完成后通知結束
        void work_RunWorkerCompleted(object sender, RunWorkerCompletedEventArgs e)
        {
            this.toolStripProgressBar1.Value = 100;
            this.toolStripProgressBar1.Visible = false;
            MessageUtil.ShowTips("操作完成");

            ShowMessage(this.toolStripProgressBar1.Value);//完成
        }

        /// <summary>
        /// 在界面顯示文本信息
        /// </summary>
        /// <param name="percent">完成百分比</param>
        private void ShowMessage(int percent)
        {
            if (this.ExecuteTime != null)
            {
                this.lblTips.Text = string.Format("[當前完成數量:{0},完成百分比:{1}, 執行耗時:{2}毫秒 | {3}分鍾{4}秒]",
                    this.currentCount, percent, this.ExecuteTime.TotalMilliseconds, this.ExecuteTime.Minutes, this.ExecuteTime.Seconds);
            }
        }

        /// <summary>
        /// 報告進度的時候,顯示相關的數量、耗時等內容
        /// </summary>
        void work_ProgressChanged(object sender, ProgressChangedEventArgs e)
        {
            this.toolStripProgressBar1.Value = e.ProgressPercentage;
            this.statusStrip1.Refresh();

            ShowMessage(e.ProgressPercentage);
        }

        /// <summary>
        /// 后台線程執行的邏輯代碼
        /// </summary>
        void work_DoWork(object sender, DoWorkEventArgs e)
        {
            CopyDataUtil util = new CopyDataUtil(); //使用一個Action的Lamda表達式,執行通知界面處理
            util.Start((percent, ts, current) =>
            {
                work.ReportProgress(percent);
                this.ExecuteTime = ts;
                this.currentCount = current;
            });
        }

        private void btnCopy_Click(object sender, EventArgs e)
        {
            if(!work.IsBusy)
            {
                //如果每次要求使用空白數據庫測試,那么先刪除舊數據庫,再復制備份過去即可
                string dbfile = Path.Combine(Environment.CurrentDirectory, "localdb.db");
                string bakfile = Path.Combine(Environment.CurrentDirectory, "db.db");
                if (this.chkCopyEmptyDb.Checked && File.Exists(dbfile))
                {
                    File.Delete(dbfile);
                    File.Copy(bakfile, dbfile, true);
                }

                //顯示進度條,並異步執行線程
                this.toolStripProgressBar1.Visible = true;
                work.RunWorkerAsync();
            }
        }

        private void FrmMain_FormClosing(object sender, FormClosingEventArgs e)
        {
            //取消注冊的相關事件,防止退出的時候出現異常
            if(work != null && work.IsBusy)
            {
                work.ProgressChanged -= work_ProgressChanged; //取消通知事件
                work.RunWorkerCompleted -= work_RunWorkerCompleted;//取消完成事件
                work.Dispose();
            }
        }
    }

在上面的窗體界面代碼里面,最為關鍵的代碼就是具體后台進程的處理邏輯,如下代碼所示。

        /// <summary>
        /// 后台線程執行的邏輯代碼
        /// </summary>
        void work_DoWork(object sender, DoWorkEventArgs e)
        {
            CopyDataUtil util = new CopyDataUtil(); //使用一個Action的Lamda表達式,執行通知界面處理
            util.Start((percent, ts, current) =>
            {
                work.ReportProgress(percent);
                this.ExecuteTime = ts;
                this.currentCount = current;
            });
        }

上面的處理邏輯為了方便,把數據的復制內容放到了一個輔助類里面,並在輔助類的Start方法里面傳入了界面通知的Action處理函數,這樣我們在CopyDataUtil 處理的時候就可以隨時進行消息的通知了。

數據復制的Start方法定義如下所示。

        /// <summary>
        /// 開始執行賦值
        /// </summary>
        public void Start(Action<int, TimeSpan, int> doFunc)
        {
            StartTime = DateTime.Now;//計時開始

            InternalCopry(doFunc);//處理數據復制邏輯,並執行外部的函數

            EndTime = DateTime.Now;//計時結束
        }

整個輔助類CopyDataUtil 類里面定義了兩個不同數據庫類型的對象,方便數據庫的賦值處理操作,並且定義了開始時間,結束時間,這樣可以統計總共的耗時信息,如下代碼所示。

    /// <summary>
    /// 復制數據的處理類
    /// </summary>
    public class CopyDataUtil
    {
        //使用一個計時器,對操作記錄進行計時
        private DateTime StartTime, EndTime;
        //SQLServer數據庫表對象
        private ProductSqlServer sqlserver = null;
        //SQLite數據表對象
        private ProductSqlite sqlite = null;

        public CopyDataUtil()
        {
            //構建對象,並指定SQLServer的數據庫配置項
            sqlserver = new ProductSqlServer();
            sqlserver.DbConfigName = "sqlserver";

            //構建對象,並指定SQLite的數據庫配置項
            sqlite = new ProductSqlite();
            sqlite.DbConfigName = "sqlite";
        }

整個復制數據的邏輯,主要就是基於事務性的處理,按照分頁規則,每次按照一定的數量,批量從SQLServer里面取出數據,然后插入SQLite數據庫里面,使用事務可以是的SQLite的數據寫入非常高效快速,具體代碼如下所示。

        /// <summary>
        /// 大數據復制的處理邏輯
        /// </summary>
        /// <param name="doFunc">外部調用的函數</param>
        private void InternalCopry(Action<int, TimeSpan, int> doFunc)
        {
            //設置主鍵,並指定分頁數量大小,提高檢索效率
            string primaryKey = "h_id";
            int pageSize = 1000;
            PagerInfo info = new PagerInfo(){PageSize = pageSize, CurrenetPageIndex =1};

            //根據數據的總數,取得總頁數
            int totalPageCount = 1;
            int totalCount = sqlserver.GetRecordCount();
            if (totalCount % pageSize == 0)
            {
                totalPageCount = totalCount / pageSize;
            }
            else
            {
                totalPageCount = totalCount / pageSize + 1;
            }
            totalPageCount = (totalPageCount < 1) ? 1 : totalPageCount;

            //利用事務進行SQLite數據寫入,提高執行響應效率
            DbTransaction trans = sqlite.CreateTransaction();
            if (trans != null)
            {
                //根據每頁數量,依次從指定的頁數取數據
                for (int i = 1; i <= totalPageCount; i++)
                {
                    info.CurrenetPageIndex = i;//設定當前的頁面,並進行數據獲取

                    int j = 1;
                    List<ProductInfo> list = sqlserver.FindWithPager("1=1", info, primaryKey, false);
                    foreach (ProductInfo entity in list)
                    {
                        //取得當前數量和進度百分比
                        int current = (i - 1) * pageSize + j;
                        int percent = GetPercent(totalCount, current);

                        //計算程序耗時,執行外部函數進行界面通知
                        TimeSpan ts = DateTime.Now - StartTime;
                        doFunc(percent, ts, current);//執行通知處理

                        //如果不存在主鍵記錄,則寫入,否則更新
                        if (!sqlite.IsExistKey(primaryKey, entity.H_id, trans))
                        {
                            sqlite.Insert(entity, trans);
                        }
                        else
                        {
                            sqlite.Update(entity, entity.H_id, trans);
                        }
                        j++;
                    }                    
                }
                trans.Commit();
            }
        }

至此,整個項目的代碼就基本上介紹完畢了,測試整個復制過程,單表50多萬的數據,100個字段左右,在開發機器上20分鍾不到就復制完成,確實是很不錯的成績了,如果修改為服務器的環境專門做復制處理,肯定速度還會提高不少。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM