C# 多線程並發處理數據庫數據,發送信號等待處理完統一插入.(轉)


http://hi.baidu.com/jiang_yy_jiang/blog/item/23294013384be4c7f6039e2e.html

 

public class JPService
    {
        public JPService()
        {
            //設定最大的Net並發連接數
            System.Net.ServicePointManager.DefaultConnectionLimit = 500;
            ThreadPool.SetMinThreads(15 , 2);//設置最小的工作線程數和IO線程數
            Config.Load();
        }
        private int MaxThread = 10;//最大的糾偏線程數

        private static Random m_rand = new Random(Guid.NewGuid().GetHashCode());//創建隨機數
        private static object m_objLock = new object();//添加列表鎖定

        private AutoResetEvent[] InitAutoResetEvent()
        {
            //對每個線程提供一個完成信號,初始化為未終止狀態
           AutoResetEvent[] autoEvents = new AutoResetEvent[MaxThread];
            for (int i = 0; i < MaxThread; i++)
                autoEvents[i] = new AutoResetEvent(false);
            return autoEvents;
        }
        /// <summary>
        /// 將處理完的信息添加至List列表
        /// </summary>
        /// <param name="lst"></param>
        /// <param name="info"></param>
        public void AddItemToList(List<EntityJPGpsInfo> lst , EntityJPGpsInfo info)
        {
            lock (m_objLock)
            {
                lst.Add(info);
            }
        }

        public void OnStart()
        {          

              try
            {

                 //在線程池中引入可執行的方法,一個循環不斷判斷

                ThreadPool.QueueUserWorkItem(_ =>
                    {

                       //不斷循環,直到當前有數據可以糾偏,則將該方法加入線程池

                        while (true)
                        {

                           AutoResetEvent[] autoEvents = InitAutoResetEvent();
                           List<EntityJPGpsInfo> lst = new List<EntityJPGpsInfo>();
                            //得到處理數據  
                            DataSet ds = GetBeforeJP();
                            //驗證數據是否存在(null)
                            if (!DataHelper.VerifyDataSet(ds))
                            {
                                Thread.Sleep(100);
                                continue;  //如果檢測DataSet可行,則繼續
                            }

                            int iCount = ds.Tables[0].Rows.Count; //獲取要處理的數據行數
                            int iBlock = (int)Math.Ceiling((double)iCount / MaxThread);//根據行數,線程數設定每個線程要處理的數據量
                            for (int i = 0; i < MaxThread; i++) //像線程池加入執行隊列
                            {
                                ThreadPool.QueueUserWorkItem(data =>
                                    {
                                        int iIndex = (int)data;//線程序號0->MaxThread
                                        DataRowCollection rows = ds.Tables[0].Rows;
                                        for (int j = 0; j < iBlock; j++)
                                        {
                                            try
                                            {
                                                if (iIndex * iBlock + j >= iCount)//避免最后一個線程索引越界
                                                    break;
                                                int iSequence = DataHelper.ReadInt(rows[iIndex * iBlock + j] , "SEQUENCE");
                                                int iMCUID = DataHelper.ReadInt(rows[iIndex * iBlock + j] , "MCUID");
                                                int iLng = DataHelper.ReadInt(rows[iIndex * iBlock + j] , "LONGITUDE") / 36;
                                                int iLat = DataHelper.ReadInt(rows[iIndex * iBlock + j] , "LATITUDE") / 36;
                                                DateTime dtmPostionTime = DataHelper.ReadDateTime(rows[iIndex * iBlock + j] , "POSITIONTIME");
                                                DateTime dtmReceiveTime = DataHelper.ReadDateTime(rows[iIndex * iBlock + j] , "RECEIVETIME");
                                                try
                                                {
                                                    //請求指定網頁地址進行處理返回結果
                                                    WebRequest jpRequest = WebRequest.Create(Config.BMSSettings.JPUrl + iLng.ToString() + "," + iLat.ToString() + "&t=" + m_rand.Next());
                                                    jpRequest.Method = "get";
                                                    jpRequest.ContentType = "application/x-www-form-urlencoded";
                                                    jpRequest.Timeout = 8000;//設置超時
                                                    WebResponse jpResponse = jpRequest.GetResponse();

                                                    string strJP;
                                                    using (StreamReader sr = new StreamReader(jpResponse.GetResponseStream()))
                                                    {
                                                        strJP = sr.ReadToEnd();
                                                        if (!string.IsNullOrEmpty(strJP))
                                                        {
                                                            strJP = strJP.Remove(strJP.Length - 1 , 1);
                                                            string[] strs = strJP.Split(new char[] { ',' });
                                                            //處理完加入列表
                                                            AddItemToList(lst , new EntityJPGpsInfo(iSequence , iMCUID , Convert.ToInt32(strs[0]) * 36 , Convert.ToInt32(strs[1]) * 36 , dtmPostionTime , dtmReceiveTime));
                                                        }
                                                    }
                                                    jpResponse.Close();
                                                }
                                                catch (Exception ex)
                                                {
                                                    LogHelper.Writeln("Error1:" + ex.Message);
                                                }
                                            }
                                            catch (Exception e)
                                            {
                                                LogHelper.Writeln("Error2:" + e.StackTrace);
                                            }
                                        }
                                        autoEvents[iIndex].Set();
                                    } , i);
                            }
                            //等待收到所有的信號
                            ManualResetEvent.WaitAll(autoEvents);
                            InsertToDB(lst); //處理完集體插入到數據庫
                        }
                    });
            }
            catch (Exception exc)
            {
                LogHelper.Writeln("Error3:" + exc.StackTrace);
            }
        }
        /// <summary>
        /// 數據庫提取預處理數據
        /// </summary>
        /// <returns>預處理數據集</returns>
        public DataSet GetBeforeJP()
        {
            OracleParameter[] paras = new OracleParameter[]
            {
                OracleHelper.MakeOutParam("curCursor",OracleType.Cursor)
            };
            return OracleHelper.ExecuteDataSet(Config.BMSSettings.ConnectionString , "SP_GIS_GET_BEFOREJP" , paras);
        }
        /// <summary>
        /// 全部循環插入到數據庫
        /// </summary>
        /// <param name="lst">插入數據列表</param>
        public void InsertToDB(List<EntityJPGpsInfo> lst)
        {
            using (OracleConnection conn = OracleHelper.GetConnection(Config.BMSSettings.ConnectionString))
            {
                conn.Open();
                OracleCommand command = conn.CreateCommand();
                OracleTransaction trans = conn.BeginTransaction();
                try
                {
                    command.Transaction = trans;
                    command.Parameters.AddRange(new OracleParameter[]
                    {
                        new OracleParameter("iSequence",OracleType.Number),
                        new OracleParameter("iMCUID",OracleType.Number),
                        new OracleParameter("iLongitude",OracleType.Number),
                        new OracleParameter("iLatitude",OracleType.Number),
                        new OracleParameter("dtmPositionTime",OracleType.DateTime),
                        new OracleParameter("dtmReceiveTime",OracleType.DateTime)
                    });

                    foreach (EntityJPGpsInfo info in lst)
                    {
                        command.Parameters[0].Value = info.Sequence;
                        command.Parameters[1].Value = info.MCUID;
                        command.Parameters[2].Value = info.Longitude;
                        command.Parameters[3].Value = info.Latitude;
                        command.Parameters[4].Value = info.PositionTime;
                        command.Parameters[5].Value = info.ReceiveTime;
                        command.CommandType = CommandType.StoredProcedure;
                        command.CommandText = "SP_GIS_ADD_JP";
                        command.ExecuteNonQuery();
                    }
                    trans.Commit();
                }
                catch (Exception ex)
                {
                    trans.Rollback();
                    throw ex;
                }
            }
        }
}

 

該小程序,主要是實現糾偏,將Oracle數據庫中所有需要糾偏的數據全部提取出來存入DataSet,然后使用網絡上使用的糾偏地址(程序未給出,設置在配置文件中了)全部糾偏。

糾偏部分為了加快運行速度和效率,使用了多線程,根據總數據條數分配每個線程該糾偏多少條,每次解決一條數據加入到List列表,每個線程有一個完成信號,等所有的線程任務完成時,統一將List中的數據遍歷插入到數據庫。

文章注明來源: http://hi.baidu.com/jiang_yy_jiang


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM