clickhouse數據復制表replication以及數據轉換和恢復


一、概述

  分布式存儲要保證高可用,就必須有數據冗余——即副本(replica)。ClickHouse依靠ReplicatedMergeTree引擎族與ZooKeeper實現了復制表機制,成為其高可用的基礎。

      在實際操作中,為了最大化性能與穩定性,分片和副本幾乎總是一同使用。

  僅 MergeTree 系列中的表支持復制,以下為正交圖:

 

  復制工作在單個表的級別,而不是整個服務器。服務器可以同時存儲復制表和非復制表。

  復制不依賴於分片。每個分片都有自己獨立的復制。

  INSERT復制和查詢的壓縮數據ALTER

  CREATEDROPATTACH,DETACHRENAME查詢在單個服務器上執行並且不會被復制:

  • CREATE TABLE查詢在運行查詢的服務器上創建一個新的可復制表。如果此表已存在於其他服務器上,則會添加一個新副本。
  • DROP TABLE查詢將刪除位於運行查詢的服務器上的副本。
  • RENAME查詢重命名副本之一上的表。換句話說,復制的表在不同的副本上可以有不同的名稱。

  ClickHouse 使用Apache ZooKeeper存儲副本元信息。使用 ZooKeeper 版本 3.4.5 或更高版本。

  要使用復制,請在zookeeper服務器配置部分設置參數。

  注意:不要忽視安全設置。ClickHouse 支持 ZooKeeper 安全子系統的digest ACL 方案。

  ZooKeeper集群地址設置示例:

<zookeeper>
    <node>
        <host>example1</host>
        <port>2181</port>
    </node>
    <node>
        <host>example2</host>
        <port>2181</port>
    </node>
    <node>
        <host>example3</host>
        <port>2181</port>
    </node>
</zookeeper>

  ClickHouse 還支持通過提供 ZooKeeper 集群名稱和路徑作為引擎參數將副本元信息存儲在輔助 ZooKeeper 集群中。也就是說,它支持將不同表的元數據存儲在不同的 ZooKeeper 集群中。

  設置輔助 ZooKeeper 集群地址的示例:

<auxiliary_zookeepers>
    <zookeeper2>
        <node>
            <host>example_2_1</host>
            <port>2181</port>
        </node>
        <node>
            <host>example_2_2</host>
            <port>2181</port>
        </node>
        <node>
            <host>example_2_3</host>
            <port>2181</port>
        </node>
    </zookeeper2>
    <zookeeper3>
        <node>
            <host>example_3_1</host>
            <port>2181</port>
        </node>
    </zookeeper3>
</auxiliary_zookeepers>

  要將表數據元存儲在輔助 ZooKeeper 集群而不是默認 ZooKeeper 集群中,我們可以使用 SQL 使用 ReplicatedMergeTree 引擎創建表,如下所示:

CREATE TABLE table_name ( ... ) ENGINE = ReplicatedMergeTree('zookeeper_name_configured_in_auxiliary_zookeepers:path', 'replica_name') ...
可以指定任何現有的 ZooKeeper 集群,系統將使用其上的目錄存儲自己的數據(該目錄在創建可復制表時指定)。

  如果配置文件中沒有設置 ZooKeeper,則無法創建復制表,並且任何現有的復制表都將是只讀的。

  ZooKeeper 不用於SELECT查詢,因為復制不會影響性能,SELECT並且查詢的運行速度與非復制表一樣快。查詢分布式復制表時,ClickHouse 行為由設置max_replica_delay_for_distributed_queriesfallback_to_stale_replicas_for_distributed_queries控制。

  對於每個INSERT查詢,通過幾個事務將大約十個條目添加到 ZooKeeper。(更准確地說,這是針對每個插入的數據塊;INSERT 查詢包含一個塊或每行一個塊。)與非復制表相比,max_insert_block_size = 1048576這會導致稍長的延遲。INSERT但是,如果按照建議以不超過INSERT每秒一個的批量插入數據,則不會產生任何問題。INSERTs用於協調一個 ZooKeeper 集群的整個 ClickHouse 集群每秒總共有幾百個。數據插入的吞吐量(每秒的行數)與非復制數據的吞吐量一樣高。

  對於非常大的集群,可以為不同的分片使用不同的 ZooKeeper 集群。但是,根據我們的經驗,基於大約 300 台服務器的生產集群,這並沒有被證明是必要的。

  復制是異步的和多主的。INSERT查詢(以及ALTER)可以發送到任何可用的服務器。數據插入到運行查詢的服務器上,然后復制到其他服務器。因為它是異步的,所以最近插入的數據會以一定的延遲出現在其他副本上。如果部分副本不可用,則在它們可用時寫入數據。如果副本可用,則延遲是通過網絡傳輸壓縮數據塊所需的時間。為復制表執行后台任務的線程數可以通過background_schedule_pool_size設置來設置。

  ReplicatedMergeTree引擎使用單獨的線程池進行復制提取。池的大小受background_fetches_pool_size設置的限制,可以通過服務器重新啟動進行調整。

  默認情況下,INSERT 查詢等待確認僅從一個副本寫入數據。如果數據僅成功寫入一個副本,並且具有該副本的服務器不復存在,則存儲的數據將丟失。要啟用對來自多個副本的數據寫入的確認,請使用該insert_quorum選項。

  每個數據塊都是原子寫入的。INSERT 查詢被分成最多max_insert_block_size = 1048576行的塊。換句話說,如果INSERT查詢的行數少於 1048576,則自動生成。

  數據塊被重復數據刪除。對於同一個數據塊的多次寫入(相同大小的數據塊包含相同順序的相同行),該塊只被寫入一次。這樣做的原因是當客戶端應用程序不知道數據是否已寫入數據庫時​​發生網絡故障,因此INSERT可以簡單地重復查詢。將相同數據發送到哪個副本 INSERT 並不重要。INSERTs是冪等的。重復數據刪除參數由merge_tree服務器設置控制。

  在復制期間,只有要插入的源數據通過網絡傳輸。進一步的數據轉換(合並)以相同的方式在所有副本上進行協調和執行。這最大限度地減少了網絡使用,這意味着當副本駐留在不同的數據中心時,復制工作得很好。(請注意,在不同的數據中心復制數據是復制的主要目標。)

  可以擁有相同數據的任意數量的副本。根據經驗,一個相對可靠和方便的解決方案可以在生產中使用雙重復制,每台服務器使用 RAID-5 或 RAID-6(在某些情況下使用 RAID-10)。

  系統監控副本上的數據同步性,並能夠在發生故障后恢復。故障轉移是自動的(對於數據的微小差異)或半自動的(當數據差異太大時,這可能表示配置錯誤)。

  數據插入流程圖如下:

二、創建復制表

  Replicated前綴被添加到表引擎名稱中例如:ReplicatedMergeTree

  Replicated*MergeTree 參數

  • zoo_path— ZooKeeper 中表的路徑。
  • replica_name— ZooKeeper 中的副本名稱。
  • other_parameters— 用於創建復制版本的引擎的參數,例如ReplacingMergeTree.

  例子:

CREATE TABLE table_name
(
    EventDate DateTime,
    CounterID UInt32,
    UserID UInt32,
    ver UInt16
) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{layer}-{shard}/table_name', '{replica}', ver)
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID);

  不推薦使用的語法示例

CREATE TABLE table_name
(
    EventDate DateTime,
    CounterID UInt32,
    UserID UInt32
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/table_name', '{replica}', EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID), EventTime), 8192);

  如示例所示,這些參數可以包含大括號中的替換。替換值取自配置文件的部分。

  例子:

<macros>
    <layer>05</layer>
    <shard>02</shard>
    <replica>example05-02-1</replica>
</macros>

  ZooKeeper 中表的路徑對於每個復制的表應該是唯一的。不同分片上的表應該有不同的路徑。在這種情況下,路徑由以下部分組成:

  /clickhouse/tables/是通用前綴。我們建議完全使用這個。

  {layer}-{shard}是分片標識符。在此示例中,它由兩部分組成,因為示例集群使用雙層分片。對於大多數任務,可以只保留 {shard} 替換,它將擴展為分片標識符。

  table_name是 ZooKeeper 中表的節點名稱。使其與表名相同是個好主意。它是顯式定義的,因為與表名相比,它在 RENAME 查詢后不會更改。 提示:也可以在前面添加數據庫名稱table_name例如db_name.table_name

  這兩個內置替換{database}{table}可以使用,它們分別擴展為表名和數據庫名(除非這些宏在macros節中定義)。所以zookeeper路徑可以指定為'/clickhouse/tables/{layer}-{shard}/{database}/{table}'使用這些內置替換時要小心表重命名。Zookeeper 中的路徑無法更改,重命名表時,宏會擴展為不同的路徑,表將引用 Zookeeper 中不存在的路徑,並進入只讀模式。

  副本名稱標識同一張表的不同副本。可以為此使用服務器名稱,如示例中所示。該名稱只需要在每個分片中是唯一的。

  可以顯式定義參數,而不是使用替換。這對於測試和配置小型集群可能很方便。ON CLUSTER但是,在這種情況下,不能使用分布式 DDL 查詢 ( )。

  在處理大型集群時,我們建議使用替換,因為它們可以降低出錯的可能性。

  Replicated可以在服務器配置文件中為表引擎指定默認參數。例如:

<default_replica_path>/clickhouse/tables/{shard}/{database}/{table}</default_replica_path>
<default_replica_name>{replica}</default_replica_name>

  在這種情況下,可以在創建表時省略參數:

CREATE TABLE table_name (
    x UInt32
) ENGINE = ReplicatedMergeTree
ORDER BY x;

  它相當於:

CREATE TABLE table_name (
    x UInt32
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/table_name', '{replica}')
ORDER BY x;

  CREATE TABLE在每個副本上運行查詢。此查詢創建一個新的復制表,或向現有表添加一個新副本。

  如果在表已經包含其他副本上的一些數據之后添加新副本,則在運行查詢后數據將從其他副本復制到新副本。換句話說,新副本將自己與其他副本同步。

  要刪除副本,請運行DROP TABLE但是,只有一個副本被刪除 - 駐留在運行查詢的服務器上的副本。

# 復制表的創建,其中創建的幾個重要監控線程
StorageReplicatedMergeTree::StorageReplicatedMergeTree
    queue // 表的隊列,保存了需要處理的操作
    restarting_thread // 重連zk的線程,保證能夠一直連接zk ReplicatedMergeTreeRestartingThread::run
    queue_updating_task // 隊列更新任務,監控zk上的log,將它們加載到queue中
    mutations_updating_task // mutation更新任務,監控zk上的mutations節點
    merge_selecting_task // 選擇part進行merge或者mutate的任務
    background_executor // 后台處理隊列的線程
    
# 數據的寫入
InterpreterFactory::get
InterpreterInsertQuery::execute
    StorageReplicatedMergeTree::write
        ReplicatedMergeTreeBlockOutputStream
    ReplicatedMergeTreeBlockOutputStream::write
        writeTempPart // 數據寫入本地磁盤
        commitPart // 向zk提交插入的part信息
            renameTempPartAndAdd // 臨時目錄變為正式目錄
            makeCreateRequest // 提交log_entry,zk上會創建log-xxx
            mergeSelectingTask // 主動觸發merge任務

# 隊列更新線程 queueUpdatingTask,也就是log entry的監控線程
ReplicatedMergeTreeQueue::pullLogsToQueue // 監控zk上是否有新的log產生
    makeCreateRequest // 將log的內容放到zk上的/queue節點下
    insertUnlocked // 在queue中新增當前entry
    
# 隊列任務處理進程 scheduling_task
StorageReplicatedMergeTree::getDataProcessingJob
    selectQueueEntry
        queue.selectEntryToProcess // 從queue中選擇可以執行的log entry
    processQueueEntry // 處理隊列中的任務
        processEntry
            executeLogEntry // 如果當前entry指定的part在本節點上,則return,否則從副本fetch
                executeFetch // 找到一個有該part的副本就可以
                    fetchPart
                        fetcher.fetchPart // 通過http獲取具體數據
            removeProcessedEntry // 刪除zk上的隊列

 

三、故障后恢復

  如果服務器啟動時 ZooKeeper 不可用,復制的表將切換到只讀模式。系統會定期嘗試連接到 ZooKeeper。

  如果 ZooKeeper 在 期間不可用INSERT,或者與 ZooKeeper 交互時發生錯誤,則會引發異常。

  連接到 ZooKeeper 后,系統會檢查本地文件系統中的數據集是否與預期的數據集匹配(ZooKeeper 存儲此信息)。如果存在輕微的不一致,系統會通過將數據與副本同步來解決它們。

  如果系統檢測到損壞的數據部分(文件大小錯誤)或無法識別的部分(寫入文件系統但未記錄在 ZooKeeper 中的部分),則會將它們移動到detached子目錄(它們不會被刪除)。任何缺失的部分都會從副本中復制。

  請注意,ClickHouse 不會執行任何破壞性操作,例如自動刪除大量數據。

  當服務器啟動(或與 ZooKeeper 建立新會話)時,它只檢查所有文件的數量和大小。如果文件大小匹配但字節在中間某處發生了更改,則不會立即檢測到,但僅在嘗試讀取SELECT查詢數據時才檢測到。該查詢引發關於不匹配校驗和或壓縮塊大小的異常。在這種情況下,數據部分被添加到驗證隊列中,並在必要時從副本中復制。

  如果本地數據集與預期的數據差異太大,則會觸發安全機制。服務器在日志中輸入此內容並拒絕啟動。這樣做的原因是這種情況可能表明配置錯誤,例如,如果一個分片上的副本被意外配置為不同分片上的副本。但是,此機制的閾值設置得相當低,並且在正常故障恢復期間可能會發生這種情況。在這種情況下,通過“按下按鈕”半自動恢復數據。

  要開始恢復,請在 ZooKeeper 中使用任何內容創建節點/path_to_table/replica_name/flags/force_restore_data,或者運行命令來恢復所有復制的表:

sudo -u clickhouse touch /var/lib/clickhouse/flags/force_restore_data

  然后重新啟動服務器。啟動時,服務器會刪除這些標志並開始恢復。

四、數據完全丟失后恢復

  如果其中一台服務器上的所有數據和元數據都消失了,請按照以下步驟進行恢復:

  1. 在服務器上安裝 ClickHouse。如果使用它們,請在包含分片標識符和副本的配置文件中正確定義替換。
  2. 如果有必須在服務器上手動復制的未復制表,請從副本(在目錄中/var/lib/clickhouse/data/db_name/table_name/)復制它們的數據。
  3. 從副本復制位於的表定義/var/lib/clickhouse/metadata/如果在表定義中明確定義了分片或副本標識符,請更正它以使其對應於該副本。(或者,啟動服務器並進行所有ATTACH TABLE應該在 .sql 文件中的查詢/var/lib/clickhouse/metadata/。)
  4. 要開始恢復,請創建包含任何內容的 ZooKeeper 節點/path_to_table/replica_name/flags/force_restore_data,或運行命令以恢復所有復制的表:sudo -u clickhouse touch /var/lib/clickhouse/flags/force_restore_data

  然后啟動服務器(重新啟動,如果它已經在運行)。數據將從副本下載。

  另一種恢復選項是從 ZooKeeper ( ) 中刪除有關丟失副本的信息,然后按照“創建復制表/path_to_table/replica_name”中所述再次創建副本

  恢復期間對網絡帶寬沒有限制。如果要同時恢復多個副本,請記住這一點。

五、從 MergeTree 轉換為 ReplicatedMergeTree 

  我們使用該術語MergeTree來指代 中的所有表引擎MergeTree family,與 for 相同ReplicatedMergeTree

  如果有一個MergeTree手動復制的表,則可以將其轉換為復制表。如果已經在表中收集了大量數據MergeTree並且現在想要啟用復制,則可能需要執行此操作。

  如果各個副本上的數據不同,請先同步它,或者刪除除一個之外的所有副本上的此數據。

  重命名現有的 MergeTree 表,然后ReplicatedMergeTree使用舊名稱創建一個表。將舊表中的數據移動到detached包含新表數據的目錄內的子目錄中 ( /var/lib/clickhouse/data/db_name/table_name/)。然后ALTER TABLE ATTACH PARTITION在其中一個副本上運行以將這些數據部分添加到工作集中。

六、從 ReplicatedMergeTree 轉換為 MergeTree 

  創建一個具有不同名稱的 MergeTree 表。將包含ReplicatedMergeTree表數據的目錄中的所有數據移動到新表的數據目錄中。然后刪除ReplicatedMergeTree表並重新啟動服務器。

  如果想在ReplicatedMergeTree不啟動服務器的情況下刪除表:

  • .sql刪除元數據目錄 ( /var/lib/clickhouse/metadata/)中的相應文件。
  • 刪除 ZooKeeper ( /path_to_table/replica_name) 中的對應路徑。

  之后,可以啟動服務器,創建MergeTree表,將數據移動到其目錄,然后重新啟動服務器。

七、Zookeeper 集群中元數據丟失或損壞時的恢復 

  如果 ZooKeeper 中的數據丟失或損壞,可以通過將數據移動到如上所述的未復制表來保存數據。

八、ReplicatedMergeTree表刪除更新數據等操作

1)drop 表相關操作

  在我們刪除本地表和分布式表后,立即重建是沒有問題的。唯一有問題的就是復制表,因為復制表需要在zookeeper上建立一個路徑,存放相關數據。clickhouse默認的庫引擎是原子數據庫引擎,刪除Atomic數據庫中的表后,它不會立即刪除,而是會在480秒后刪除。由下面這個參數控制:

config.xml

<database_atomic_delay_before_drop_table_sec>480</database_atomic_delay_before_drop_table_sec>

  ①使用普通數據庫而不是原子數據庫。 create database … Engine=Ordinary。
  ②使用uniq ZK路徑。{uuid}/clickhouse/tables/{layer}-{shard}-{uuid}/。
  ③減少database_atomic_delay_before_drop_table_sec = 0 & drop table … sync

2)alter update  delete 等操作

  對於非副本表,所有ALTER查詢都是同步執行的。對於副本表,查詢只是向ZooKeeper添加適當操作的指令,操作本身會盡快執行,可以通過replication_alter_partitions_sync設置控制執行等待,如果為0表示不等待,如果為1表示只等待自己執行(默認,即一個),如果為2表示需要等待所有節點完成。另外還可以通過replication_wait_for_inactive_replica_timeout參數設置等待時間,0表示不等待,負整數表示無限制等待,正整數表示等待秒數。如果 replication_alter_partitions_sync = 2,某些副本ALTER操作超過 replication_wait_for_inactive_replica_timeout 時間,則會拋出 UNFINISHED 異常。

3)truncate table操作

  會等待所有的副本處理結束,即使某些副本出現故障了,也還會繼續等待,沒有設置超時時間。truncate 操作沒有回滾機制,在它分別遍歷分區和副本時,如果存在多個分區和多個副本,且某個副本存在故障時,那么遍歷到故障副本時,流程會一直卡住(無法滿足stop_waiting條件,該副本的log一直無法處理)。但是,此時,之前遍歷過的副本已經處理完了truncate操作。這就導致兩點不一致:

  ①副本間的數據已經不一致了。
  ②已經遍歷過的副本只刪除了部分數據,一般是只有第一個分區的數據被刪除了

整體流程如下:

StorageReplicatedMergeTree::truncate // 入口
    // 如下遍歷所有分區
    dropAllPartsInPartition // 停止merge並在zk上創建刪除分區的log, 類型為LogEntry::DROP_RANGE
    waitForAllReplicasToProcessLogEntry // 等待所有副本執行完刪除分區操作
        // 如下遍歷所有副本,包括自己
        waitForTableReplicaToProcessLogEntry
            // 根據log_pointer和log編號判斷是否已經處理完當前log-entry,沒有處理完或者沒有滿足stop_waiting的條件,則一直等待
            // 等待queue節點下文件消失

// 將log節點下的內容拉取到queue節點下,並將entry放到變量queue中
ReplicatedMergeTreeQueue::pullLogsToQueue
    ...
    
// 處理隊列的后台線程
StorageReplicatedMergeTree::getDataProcessingJob
    selectQueueEntry
    processQueueEntry // 處理隊列中的任務
        processEntry
            executeLogEntry
                executeDropRange // 真正刪除part
            removeProcessedEntry /

 

如執行中未立即得到結果,建議等待,如果等待太久,只能進行節點重啟,之后再檢查數據是否刪除成功。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM