在沒Hadoop 、GP 前提下怎么進行實時數據統計。


    最近着手個項目,整體數據量有5億多,每個月增量9000w。應用場景是Oltp 根據用戶id直接計算各種維度值。

因為是Oltp 場景,直接根據用戶id %2000分(方便后續橫向擴展),有些喜歡扯分區表的或者順序分表的請復習下數據庫原理以及硬件原理。

分完表oltp 訪問速度上了幾個level。但是牽涉到一個實時統計的問題,需要對2000張表進行實時統計。因暫時沒gp、hadoop 這種分布式數據庫環境,以及怎么解決Oltp 到分布式數據庫之間實時同步的問題。

想了個惡心的辦法。對2000張表開啟cdc 變更,記錄時間段發生的變更userid,寫了個多線程腳本實時根據這些userid 去更新數據。基本做到了實時統計,數據時間間隔差10分鍾左右。

明年計划結構化數據先通過Gp計算,需要寫個小程序來滿足Cdc 變更到Gp的實時同步。 

   

 

 

 順便附帶 多線程統計腳本,還是powershell 寫的。

#region hostinfo
$hostinfos=[System.Collections.ArrayList]@()
[void] $hostinfos.add('192.168.1.1')
[void] $hostinfos.add('1433')
[void] $hostinfos.add( $ClientSqlAccount)
[void] $hostinfos.add($ClientSqlPassWord)
[void] $hostinfos.add('db')
#endregion

#region 生成2000張表
$tables=[System.Collections.ArrayList]@()
<#
foreach($s in 0..1999)
{
    switch([void] $s)
    {
      {$s -lt 10 }{ [void] $tables.add('Tab'+'000'+ $s.ToString());}
      {$s -ge 10 -and $s -lt 100 }{ [void] $tables.add(('Tab'+'00'+ $s.ToString()));  }
      {$s -ge 100 -and $s-lt 1000 }{ [void] $tables.add(('Tab'+'0'+ $s.ToString()));  }
      {$s -ge 1000  }{  [void] ($tables.add(('Tab'+ $s.ToString()))); }
    }

}
#>
#endregion

    $ClientSqlAccount=$hostinfos[2];
    $ClientSqlPassWord=$hostinfos[3];
    $ClientDB=$hostinfos[4];
    $log='d:'
    $SqlServer=$hostinfos[0]  ;
    $Port=$hostinfos[1] ;
    $SqlString="Data Source="+$SqlServer+","+$Port+";uid="+$ClientSqlAccount+";Password="+$ClientSqlPassWord;
    $SqlConn = [System.Data.SqlClient.SqlConnection] $SqlString;
    $SqlConn.Open() ;
    $SqlConn.ChangeDatabase($ClientDB);
    $CC = $SqlConn.CreateCommand();
    $CC.CommandTimeout = 0;
    $CC.CommandText='select tabname  from Cdc_Change_userid where isdelete=0 group by tabname '
    $Reader = $CC.ExecuteReader();
    while ($Reader.read())
     { 
        [void] $tables.add($Reader.GetString(0)); 
     }
    $SqlConn.Close();


#region Get SqlserverObjectScriptBlock
  $SBbillcellphone={
    param($hostinfos,$sqlcmd) 
    Function Sqler_BillCellPhones
    {param(
         [array] $hostinfos
        ,[string] $sqlcmd
       )
        try
        {
               $ClientSqlAccount=$hostinfos[2];
               $ClientSqlPassWord=$hostinfos[3];
               $ClientDB=$hostinfos[4];
               $log='d:'
               $SqlServer=$hostinfos[0]  ;
               $Port=$hostinfos[1] ;
               $SqlString="Data Source="+$SqlServer+","+$Port+";uid="+$ClientSqlAccount+";Password="+$ClientSqlPassWord;
               $SqlConn = [System.Data.SqlClient.SqlConnection] $SqlString;
               $SqlConn.Open() ;
               $SqlConn.ChangeDatabase($ClientDB);
               $CC = $SqlConn.CreateCommand();
               $CC.CommandTimeout = 0;
               $CC.CommandText=$sqlcmd
               $CC.ExecuteScalar();
               $SqlConn.Close();
        }
        catch
        {

           $day=(Get-Date -Format "yyyyMMdd").tostring();
           $return='Error';
           ( 'Sqler_BillCellPhones  : '+((Get-Date).tostring())+' '+ $SqlServer+','+$Port +' '+$_.Exception.Message )|Out-File  -FilePath  "$log\tab_$day.log"   -Append -Force

        }
    }
    Sqler_BillCellPhones $hostinfos $sqlcmd
 }

    $throttleLimit=5
    $sqlcmd='exec csp_billcellphone_Score ''@tabname''';
    $SessionState = [system.management.automation.runspaces.initialsessionstate]::CreateDefault()
    $Pool = [runspacefactory]::CreateRunspacePool(1, $throttleLimit, $SessionState, $Host)
    $Pool.Open()
    $threads = @() 
    $handles = foreach($table in $tables) { 
        $sqlcmd='exec csp_billcellphone_Score ''@tabname''';
        $sqlcmd=$sqlcmd-replace '@tabname',$table
        $powershell = [powershell]::Create().AddScript($SBbillcellphone).AddArgument($hostinfos).AddArgument($sqlcmd)
        $powershell.RunspacePool = $Pool
        $powershell.BeginInvoke()
      $threads += $powershell
    }
    do {
      $i = 0
      $done = $true
      foreach ($handle in $handles) {
        if ($handle -ne $null) {
          if ($handle.IsCompleted) {
            $threads[$i].EndInvoke($handle)
            $threads[$i].Dispose()
            $handles[$i] = $null
          } else {
            $done = $false
          }
        }
        $i++
      }
      if (-not $done) { Start-Sleep -second 1 }
    } until ($done)
 Remove-Variable -Name handles, threads,powershell;
 [System.GC]::Collect(); [System.GC]::WaitForPendingFinalizers()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM