最近着手個項目,整體數據量有5億多,每個月增量9000w。應用場景是Oltp 根據用戶id直接計算各種維度值。
因為是Oltp 場景,直接根據用戶id %2000分(方便后續橫向擴展),有些喜歡扯分區表的或者順序分表的請復習下數據庫原理以及硬件原理。
分完表oltp 訪問速度上了幾個level。但是牽涉到一個實時統計的問題,需要對2000張表進行實時統計。因暫時沒gp、hadoop 這種分布式數據庫環境,以及怎么解決Oltp 到分布式數據庫之間實時同步的問題。
想了個惡心的辦法。對2000張表開啟cdc 變更,記錄時間段發生的變更userid,寫了個多線程腳本實時根據這些userid 去更新數據。基本做到了實時統計,數據時間間隔差10分鍾左右。
明年計划結構化數據先通過Gp計算,需要寫個小程序來滿足Cdc 變更到Gp的實時同步。
順便附帶 多線程統計腳本,還是powershell 寫的。
#region hostinfo $hostinfos=[System.Collections.ArrayList]@() [void] $hostinfos.add('192.168.1.1') [void] $hostinfos.add('1433') [void] $hostinfos.add( $ClientSqlAccount) [void] $hostinfos.add($ClientSqlPassWord) [void] $hostinfos.add('db') #endregion #region 生成2000張表 $tables=[System.Collections.ArrayList]@() <# foreach($s in 0..1999) { switch([void] $s) { {$s -lt 10 }{ [void] $tables.add('Tab'+'000'+ $s.ToString());} {$s -ge 10 -and $s -lt 100 }{ [void] $tables.add(('Tab'+'00'+ $s.ToString())); } {$s -ge 100 -and $s-lt 1000 }{ [void] $tables.add(('Tab'+'0'+ $s.ToString())); } {$s -ge 1000 }{ [void] ($tables.add(('Tab'+ $s.ToString()))); } } } #> #endregion $ClientSqlAccount=$hostinfos[2]; $ClientSqlPassWord=$hostinfos[3]; $ClientDB=$hostinfos[4]; $log='d:' $SqlServer=$hostinfos[0] ; $Port=$hostinfos[1] ; $SqlString="Data Source="+$SqlServer+","+$Port+";uid="+$ClientSqlAccount+";Password="+$ClientSqlPassWord; $SqlConn = [System.Data.SqlClient.SqlConnection] $SqlString; $SqlConn.Open() ; $SqlConn.ChangeDatabase($ClientDB); $CC = $SqlConn.CreateCommand(); $CC.CommandTimeout = 0; $CC.CommandText='select tabname from Cdc_Change_userid where isdelete=0 group by tabname ' $Reader = $CC.ExecuteReader(); while ($Reader.read()) { [void] $tables.add($Reader.GetString(0)); } $SqlConn.Close(); #region Get SqlserverObjectScriptBlock $SBbillcellphone={ param($hostinfos,$sqlcmd) Function Sqler_BillCellPhones {param( [array] $hostinfos ,[string] $sqlcmd ) try { $ClientSqlAccount=$hostinfos[2]; $ClientSqlPassWord=$hostinfos[3]; $ClientDB=$hostinfos[4]; $log='d:' $SqlServer=$hostinfos[0] ; $Port=$hostinfos[1] ; $SqlString="Data Source="+$SqlServer+","+$Port+";uid="+$ClientSqlAccount+";Password="+$ClientSqlPassWord; $SqlConn = [System.Data.SqlClient.SqlConnection] $SqlString; $SqlConn.Open() ; $SqlConn.ChangeDatabase($ClientDB); $CC = $SqlConn.CreateCommand(); $CC.CommandTimeout = 0; $CC.CommandText=$sqlcmd $CC.ExecuteScalar(); $SqlConn.Close(); } catch { $day=(Get-Date -Format "yyyyMMdd").tostring(); $return='Error'; ( 'Sqler_BillCellPhones : '+((Get-Date).tostring())+' '+ $SqlServer+','+$Port +' '+$_.Exception.Message )|Out-File -FilePath "$log\tab_$day.log" -Append -Force } } Sqler_BillCellPhones $hostinfos $sqlcmd } $throttleLimit=5 $sqlcmd='exec csp_billcellphone_Score ''@tabname'''; $SessionState = [system.management.automation.runspaces.initialsessionstate]::CreateDefault() $Pool = [runspacefactory]::CreateRunspacePool(1, $throttleLimit, $SessionState, $Host) $Pool.Open() $threads = @() $handles = foreach($table in $tables) { $sqlcmd='exec csp_billcellphone_Score ''@tabname'''; $sqlcmd=$sqlcmd-replace '@tabname',$table $powershell = [powershell]::Create().AddScript($SBbillcellphone).AddArgument($hostinfos).AddArgument($sqlcmd) $powershell.RunspacePool = $Pool $powershell.BeginInvoke() $threads += $powershell } do { $i = 0 $done = $true foreach ($handle in $handles) { if ($handle -ne $null) { if ($handle.IsCompleted) { $threads[$i].EndInvoke($handle) $threads[$i].Dispose() $handles[$i] = $null } else { $done = $false } } $i++ } if (-not $done) { Start-Sleep -second 1 } } until ($done) Remove-Variable -Name handles, threads,powershell; [System.GC]::Collect(); [System.GC]::WaitForPendingFinalizers()