ETL之Tungsten Replicator


1 概述

1.1 介紹

Tungsten Replicator是數據庫集群和復制供應商Continuent推出的高性能、開源的數據復制引擎,是Continuent最先進的集群解決方案的核心組件之一,特別適合作為異構數據庫之間數據遷移的解決方案。

Tungsten Replicator最新的穩定版本是tungsten-replicator-oss-4.0.0-18,官網下載地址為:https://code.google.com/p/tungsten-replicator/wiki/Downloads

1.2 特點

Tungsten Replicator所具有的特點主要有以下幾點:

1)支持高版本MySQL向低版本復制,如5.1->5.0;

2)支持跨數據庫系統的復制,如MySQL->Oracle,並且所支持的數據庫不僅包括MySQL、PostgreSQL和Amazon RDS等傳統關系型數據庫,還包括MongoDB等NoSQL數據庫以及Vertica、InfiniDB、Hadoop和Amazon RedShift等數據倉庫;

3)支持多種復制拓撲結構,如Master-Slave、Multi-Master、Direct、Fan-In和Star等。

1.3 其他功能

除了核心的數據復制外,Tungsten Replicator還提供了數據備份與恢復以及數據庫基准測試工具等其他輔助功能點。

1.4 說明

    本文檔旨在闡述Tungsten Replicator的流程和實現細節,為方便起見,文中所有的解釋與案例都基於源端數據庫和目的端數據庫均為MySQL的場景,特此說明。

 

2 部署與安裝

2.1 准備

參與復制的所有主機都需要在部署安裝前完成必要的准備工作,如增加特定用戶、用戶組和所需目錄、修改相應權限、環境變量設置、網絡配置、安裝相關依賴、SSH配置和MySQL配置等。詳見官方文檔相關章節:http://docs.continuent.com/tungsten-replicator-4.0/prerequisite-staging.html

http://docs.continuent.com/tungsten-replicator-4.0/prerequisite-host.html

http://docs.continuent.com/tungsten-replicator-4.0/prerequisite-mysql.html

2.2 配置與部署

下載完Tungsten Replicator二進制包並解壓后,可以利用其自帶的tpm工具實現復制拓撲結構的配置和環境搭建,提供了Staging和INI兩種部署方式:

1)Staging方式

通過一台Staging host(可以是集群中的某台主機,也可以是單獨的)配置好一份集群的部署文件,隨后使用tpm configure命令根據配置統一向所有參與復制的主機安裝和配置相應服務,其部署示意圖如圖2.1所示。

圖 2.1

2)INI方式

每台參與復制的主機各自建立並維護一個INI配置文件,分別安裝相應的服務,其部署示意圖如圖2.2所示。

 

圖 2.2

在此,本文以Staging的方式完成后續的部署與安裝過程。以最基本的Master-Slave拓撲結構為例,Staging host上的配置命令類似如圖2.3:

 

 

圖 2.3

在上述配置中,很多選項和參數都使用了默認值(如端口號、文件路徑和命名方式等),因此無需顯式指定。在此,指定了host1作為Master,host2作為Slave,serviceName表明了本服務的名字。成功配置后,會在Staging host上生成一個deploy.cfg文件,記錄了剛才的配置,可用tpm reverse命令查看。

2.3 安裝

在Staging host上使用tpm install命令實現所有主機的安裝與配置,此過程由Staging host負責統一完成,若途中安裝失敗,則回滾此次操作,命令如下:

$ tpm install serviceName–v

此過程,Staging host利用之前配置的SSH無密碼連通向每台主機完成所需文件的拷貝和服務配置,若SSH的端口不是默認的22,則需加入選項--net-ssh-option=port=XXX指定。成功安裝后,每台主機相應的目錄中會生成若干目錄和文件。

2.4 啟動與監控

安裝完成后,需要使用replicator start命令啟動Tungsten Replicator服務,同時所配置的所有服務也同時會隨之上線,啟動命令為:

$ replicator start

如果成功啟動,則所配置的服務進入可用狀態,復制工作可正常進行。

可使用replicator status命令查看當前運行狀態:

$ replicator status

2.5 服務上下線與停止

若希望將某服務下線,則使用trepctl offline命令實現:

$ trepctl -service serviceName offline

若希望將某服務重新上線,則使用trepctl online命令實現:

$ trepctl -service serviceName online

若想停止Tungsten Replicator服務,則需使用replicator stop命令,此時所有運行着的服務均會自動下線:

$ replicator stop

 

3 Tungsten Replicator復制引擎

3.1組件

3.1.1 角色

角色主要包括Master和Slave等,即運行該TungstenReplicator實例的主機在復制集群中扮演的角色。

3.1.2 組件

1)ReplicatorPlugin

TungstenReplicator采用了插件式編程的方式,幾乎所有核心部件都是以插件的形式存在的,因此具備高度的可插拔性,用戶只需將所需插件設置在配置文件中,並且用戶自定義的插件也可輕松集成進系統。所有插件都實現了該接口,包括configure()和prepare()階段,分別完成初始化和預處理過程。

2)ReplicatorRuntime

整個系統運行的上下文,包括了所有加載的插件和配置文件信息。
3)Pipeline

系統運行的工作流,每個Pipeline根據其角色(Master/Slave)確定不同的業務處理階段(Stage),每個Stage之間存在先后依賴關系,還可以設置各類PipelineService和Store。
4)PipelineService

Pipeline中設置的服務,包括ChannelAssignmentService和DataSourceService,其中ChannelAssignmentService用於並行復制,而DataSourceService指明了該Pipeline獲取數據的來源,如MySQL等。
5)Stage

TungstenReplicator中的核心概念,業務流程被拆分為若干Stage先后執行,每個Stage由一個StageTaskGroup來執行,而StageTaskGroup由多個SingleThreadStageTask組成(默認1個,即不開啟並行復制),每個SingleThreadStageTask設置一個Extractor、多個Filter和一個Applier。
6)Store

數據存儲的組件,包括THL和QueueStore,分別代表Tungsten統一數據格式化日志和內存隊列,前后Stage之間的數據交換就是依靠Store完成的。
7)THL

其中一種Store,以一系列只可追加寫的THL文件形式存在,從不同數據源抽取來的數據經過格式化后以一種統一的數據存儲格式存放於THL中,是TungstenReplicator實現異構數據庫復制的核心部件,其特點包括:

A. THL的記錄單元稱為一個事件,是順序記錄的;

B. THL的事件是不可變的,不能修改或刪除,除非整個THL文件刪除;

C. 每個THL事件對應一個數據庫事務,被分配一個全局唯一的ID作為標識,稱為seqno,從0開始記錄

8)Extractor

數據抽取器,用於從數據源或Store中獲取所需數據,解析處理並生成相應的ReplDBMSEvent實例,如MySQLExtractor等,核心方法為extract()。
9)Filter

數據過濾器,可以根據需要設置零到多個,Extractor抽取后生成的ReplDBMSEvent會依次經過所有Filter的過濾處理,如ReplicateFilter等,核心方法為filter()。
10)Applier

過濾后的ReplDBMSEvent會交給Applier最終處理,將結果作用於目的端,如MySQLApplier等,核心方法為apply()。

3.2 元數據庫表

默認每個服務會在源端和目的端數據庫中於啟動時自動創建一個用於存放關鍵服務信息的元數據庫(Catalog schema),其命名默認為tungsten_serviceName,其中會創建五張元數據表(Catalog tables),分別為consistency、heartbeat、trep_commit_seqno、trep_shard和trep_shard_channel,其中最重要的就是trep_commit_seqno表,里面維護了最近一條復制信息(如啟動了並行復制,則每個channel會維護各自的一條記錄),當服務重啟時需要讀取該信息用於確定數據同步的起始點。

3.3 服務配置信息

除了Tungsten Replicator全局的配置信息外,每個服務都有其各自獨立的配置信息,並且根據主機擔任的角色不同,配置文件的內容也有所差異,包括三類配置文件,均在安裝目錄的conf目錄下存放:

1)靜態配置文件(核心):static-serviceName.properties

2)動態配置文件(非必須):dynamic-serviceName.properties

3)動態角色文件(非必須):dynamic-serviceName.role

與配置信息相關的類包括:

1)ReplicatorRuntimeConf:整個系統的重要目錄和配置文件獲取,包括程序主目錄、日志目錄、配置目錄等;

2)TungstenProperties:封裝了從配置文件中讀取到的屬性鍵值對,系統運行時每個組件所需的配置信息都是從中獲得的;

3)ReplicatorConf:定義了讀取配置文件時的一些常量前綴;

4)ReplicatorRole:枚舉類型,只有MASTER/SLAVE/OTHER三種,代表角色。

3.4 總體架構

TungstenReplicator作為一款數據復制工具,其核心作用就是將數據從源端數據庫中抽取,隨后依次完成解析、轉換、過濾、封裝等操作,最終在目的端數據庫中應用所有數據,以完成兩者的狀態同步。

TungstenReplicator所支持的源端數據庫類型目前只限於MySQL和Oracle等幾款流行的關系型數據庫,而這些數據庫對於數據修改的記錄都存放在相應的日志文件中,如MySQL的二進制日志binlog和Oracle的CDC(Change Data Capture),因此TungstenReplicator會通過相應的Extractor向這些日志文件中抽取數據記錄,經過處理后應用於目的端數據庫。此外對於MySQL而言,binlog的日志記錄方式無論是基於語句的還是基於行的都可以有效解析與處理。

同時,每種類型的數據庫由於其實現機制和存儲格式的不同,無法直接完成數據復制。而Tungsten Replicator之所以可以實現異構數據庫的復制,就在於它將從不同數據庫中抽取的數據以一種統一的數據存儲格式存放於一系列THL文件中(以thl.data.000000XXX方式命名),並且每個記錄以事務為單位,擁有全局唯一的序列號,除了數據本身外還包括必要的元數據信息,有利於后續的狀態監控和故障恢復。因此如果希望數據復制到不同類型的目的端數據庫時,只需編寫對應的Applier從THL文件中讀取數據進行相應的轉換處理即可應用於目的端數據庫。而Tungsten Replicator目前支持的目的端數據庫種類非常豐富,同時涵蓋多款關系型數據庫和新型數據庫,應用場景極為廣泛。總體的架構圖如圖3.1所示。

圖 3.1

3.5集群架構

由於TungstenReplicator所支持的復制拓撲結構包括了Master-Slave、Multi-Master、Direct、Fan-In和Star等,每種結構構成的集群方式不盡相同,因此本文選取Direct和Master-Slave這兩種結構進行介紹。

3.5.1 Direct架構

參與復制的主機本質上都屬於Slave,其角色為Direct,而源端數據庫則充當Master。這是最簡單的復制拓撲結構,總體示意圖如圖3.2所示。

 

圖 3.2

Direct架構默認包含四個Stage:

1)binlog-> queue

默認Extractor是MySQLExtractor,Applier是InMemoryQueueAdapter。

在此階段,MySQLExtractor從MySQL數據源獲取binlog數據(默認先將binlog數據dump到本地relay log中再進行處理),解析binlog中的各類LogEvent,最終以事務為單位創建一個DBMSEvent,添加必要屬性(如seqno和eventId等)后封裝成ReplDBMSEvent,經過Filters處理后,InMemoryQueueAdapter直接將ReplDBMSEvent放入InMemoryQueueStore中;

2)queue->THL

默認Extractor是InMemoryQueueAdapter,Applier是THLStoreApplier。

在此階段,InMemoryQueueAdapter從InMemoryQueueStore中取出ReplDBMSEvent數據,經過Filters處理后,THLStoreApplier負責將數據封裝成THLEvent后存入本地的THL文件中;

3)THL-> queue

默認Extractor是THLStoreExtractor,Applier是InMemoryQueueAdapter。

在此階段,THLStoreExtractor從本地THL文件中獲取ReplDBMSEvent數據,經過Filter處理后,InMemoryQueueAdapter將其放入InMemoryQueueStore中;

4)queue->MySQL

默認Extractor是InMemoryQueueAdapter,Applier是MySQLDrizzleApplier,Filter是MySQLSessionSupportFilter和PrimaryKeyFilter。

在此階段,InMemoryQueueAdapter從InMemoryQueueStore中取出ReplDBMSEvent數據,經過Filters處理后,MySQLDrizzleApplier將ReplDBMSEvent中的數據轉換為相應的SQL語句,通過JDBC向目的端的MySQL執行命令,完成主從數據同步。

3.5.2 Master-Slave架構

參與復制的主機有N台,其中一台的角色為Master,另外N-1台為Slave。總體示意圖如圖3.3所示。

圖 3.3

3.5.2.1Master端架構

Master端共有兩個Stage,架構圖如圖3.4所示:

1) binlog->queue

同Direct第(1)階段;
2) queue->THL

同Direct第(2)階段。

圖3.4

3.5.2.2Slave端架構

Slave端共有三個Stage,架構圖如圖3.5所示:

1) MasterTHL->THL

默認Extractor是RemoteTHLExtractor,Applier是THLStoreApplier。

在此階段,RemoteTHLExtractor與Master建立連接並獲取其THL文件中的ReplDBMSEvent數據,經過Filter處理后,THLStoreApplier將ReplDBMSEvent存入本地THL;

2) THL->queue

同Direct第(3)階段;

3) queue->MySQL

同Direct第(4)階段。

圖3.5

3.6 關鍵技術點

    在TungstenReplicator的實現過程中涉及到一些較為重要的技術細節,在此逐一闡述。

3.6.1 重啟點

在Tungsten Replicator運行的過程中,一旦將數據同步在目的端數據庫時,同時還會更新元數據庫trep_commit_seqno表中的記錄,用於標記最近一條同步記錄的相關元數據信息(如seqno、eventId等),這是為了保證服務重啟后可以准確初始化重啟點,接着上次的進度恢復服務。以Slave為例具體說明其實現過程:

假設Slave端的THL文件中記錄的最近一條記錄所持有的seqno為N,而目的端數據庫中元數據庫trep_commit_seqno表最近一條記錄所持有的seqno為M,N>M,說明目的端數據庫的同步進度尚未追上THL寫入的進度。此時,假設服務由於某些原因停止,重新啟動后的流程如下:

1)由3.5.2.2可知,Slave端默認存在三個Stage,MySQLDrizzleApplier作為最后一個Stage的Applier會訪問目的端數據庫,將trep_commit_seqno表最近一條記錄(seqno=M)讀出封裝成一個ReplDBMSHeader實例,依次往前向本Stage的Extractor(InMemoryQueueAdapter)傳遞,並且會持續傳遞到第二個Stage的Extractor(THLStoreExtractor),這樣該Extractor就知道應該向THL請求第M+1條記錄相關的信息。

2)與此同時,第一個Stage的Applier(THLStoreApplier)會從本地THL文件中獲取最近一條記錄(seqno=N)讀出封裝成一個ReplDBMSHeader實例,往前向本Stage的Extractor(RemoteTHLExtractor)傳遞,這樣該Extractor就知道應該向Master請求第N+1條記錄相關的信息。整個服務的重啟點就此恢復到上次運行的最后進度,保證不會丟失同步信息。

3)而如果從trep_commit_seqno表取出的最近一條記錄為空,則說明是首次啟動該記錄尚不存在,Slave此時會向源端數據庫發送相應的SQL命令獲得其當前binlog位置信息作為數據同步的起始點。

Master端的重啟點獲取類似Slave的2)階段。

3.6.2 事務分片

為了特殊處理大事務,TungstenReplicator中可由配置文件里的transaction_frag_size指定事務分片的閾值,即當一個事務所占的字節數大於此值時需要進行相應的分片。此分片行為在解析抽取binlog的過程中完成,即由MySQLExtractor負責。

完成分片后,一個事務可能對應N個分片,每個分片都是一個ReplDBMSEvent實例,而每個分片的seqno都相同(因為seqno是按照事務為單位分配的全局唯一ID),而它們的fragno(分片號)從0~N-1連續分配,且最后一個分片的lastFragment標志位為true(代表其是該事務最后一個分片)。盡管一個事務可能被分片,但在最終同步到目的端數據庫時還是通過相應的機制保證每個分片重新合並成完整的事務並提交,滿足事務原子性的特點。

3.6.3 塊提交

    TungstenReplicator為了提升數據復制的性能,降低連續提交單事務造成的開銷,提供了塊提交功能(Block Commit),即可以多個事務可作為一個單元一同提交。觸發一次提交的滿足條件由blockCommitPolicy、blockCommitRowCount和blockCommitInterval三個屬性值共同確定,同時涉及較為復雜的邏輯判斷過程,在此不具體詳述。

3.6.4 並行復制

TungstenReplicator默認是不開啟並行復制的,但是可以通過相應的配置實現。由於數據最終同步到目的端的過程相對於數據抽取的過程通常會滯后,因此在數據同步階段采用多線程並行可以加快復制的速度,但是需要相應的處理邏輯來解決可能產生的數據不一致和數據丟失等問題,因為每個並行復制線程的處理進度無法保持一致。

3.6.4.1 實現基礎

使用並行復制時,TungstenReplicator將每個並行復制單元稱為一個channel或partition,而實現並行復制的基礎就在於原本的單隊列(InMemoryQueueStore)被替換為多隊列(ParallelQueueStore),每個隊列對應一個channel。下面具體介紹並行復制相關的插件:

1)ParallelQueueStore

   並行復制的基礎數據結構,由N個隊列組成。同時設定了一個Partitioner,即分區策略,用來確定一個事件應該被分配給哪個channel進行處理;

2)ParallelQueueApplier

   ParallelQueueStore之前的Applier,以單線程的方式將某個事件放入ParallelQueueStore中對應的channel中,以這種串行化的方式是為了保證在數據寫入階段的順序性;

3)ParallelQueueExtractor

   ParallelQueueStore之后的Extractor,開啟N個線程,每個線程指定對應一個ParallelQueueStore中的channel,用於將事件從並行隊列中取出並進行后續處理。

4)Partitioner

   事件分區的策略,包括RoundRobinPartitioner、HashPartitioner和ShardListPartitioner等。其中RoundRobinPartitioner根據事件的seqno通過對channel個數取模進行輪詢分配到各個channel中,HashPartitioner根據事件涉及的schema的哈希值對channel個數取模分配,而ShardListPartitioner通過事先配置shard.list文件指定每個schema與channel的映射關系進行分配。值得注意的是,無論采用哪種分區策略,同屬一個事務的分片一定會連續地進入同一個channel中進行處理,因為這些分片的seqno都是相同的,保證了事務的完整性。

N個channel組成的並行復制,就會在目的端數據庫的元數據表trep_commit_seqno中維護N條記錄,每個channel對應一條,即記錄了每個channel最近一次同步的數據信息。

與此同時,並行復制中很關鍵的一環就是判斷某事務是否可以與其他事務並行執行,因為事務之間可能存在依賴關系或事務為跨數據庫事務,此時相關操作必須臨時調整為串行過程來保證數據一致性。為了應對這種場景,必須使用ShardListPartitioner作為分區的策略(因為其他分區策略並未采取相關保護機制,因此可能出現數據不一致現象),具體的情況如下:

1)由於該分區策略保證同屬一個數據庫的事件會划分到同一個channel,因此同一數據庫的所有事務都是串行執行的,這就表明Tungsten Replicator的並行復制只適用於多數據庫的業務場景,否則並不能發揮其效率優勢;

2)每個事件在被分區的時候,還會設置其critical標志位,如果該值為真則說明該事件不允許與其他數據庫相關事件並行,如跨數據庫事務或shard.list中配置的數據庫相關事務。

3)當將某個critical為真的事件放入並行隊列時會判斷當前所有的並行隊列是否為空,如果是則允許放入;反之就阻塞等待。這種機制保證了某個必須串行的事件在執行的時候不會與其他任何事件並行執行,從而確保復制結果的正確性。

3.6.4.2 實現原理

運行時,在預處理階段每個channel對應的終端Applier(MySQLDrizzleApplier)會從元數據表trep_commit_seqno中取出各自channel對應的最近同步數據的seqno,以其中最小的seqno對應的事件作為重啟點向上請求。這種請求策略是保守但卻必要的,因為可能有些請求來的事件是已經處理過的,本無需再次請求,但為了保證最小seqno和最大seqno之間連續的所有事件都不遺漏,因此采取這種方式可以確保主從數據一致性。

隨后在正式處理階段,每個channel對應的Extractor從所屬隊列中取出事件,判斷該事件與其保留的最近事件seqno之間的大小關系:

1)如果是小於或等於,說明之前已經同步過了,無需再次同步,直接忽略;

2)如果是大於,說明是新的事件,需要處理。

因此,采用這種處理流程,雖然由於每個channel的執行進度不統一可能導致某一刻Slave的狀態跟Master不同,但服務重啟后之前沒有處理的事件一定會被重新處理,而已經處理過的事件絕不會重復處理。

3.7狀態監控

Tungsten Replicator運行時的狀態監控主要包括服務監控與THL日志查看。

3.7.1 服務監控

所有的服務監控都是通過trepctl命令實現的。

3.7.1.2 總體狀態監控

通過trepctl status命令查看服務的總體狀態,執行結果如圖3.6所示。

 

圖 3.6

其中最重要的輸出信息包括:

1)appliedLastEventId:最后一條event對應的MySQL binlog和位置偏移。

2)appliedLastSeqno:最后一條event對應的seqno。

3)appliedLatency:對於Master而言,該值代表最后一條event從事務在源端數據庫提交到存入本地THL中的時間間隔;對於Slave而言,該值代表最后一條event從事務在源端數據庫提交到目的端數據庫提交的時間間隔。因此,該值可用於判斷Slave數據復制相對於Master的進度。

4)state:當前服務的運行狀態,如ONLINE、OFFLINE:NORMAL和OFFLINE:ERROR等。

每個輸出字段詳細的解釋見官方文檔相關章節:

http://docs.continuent.com/tungsten-replicator-4.0/terminology-output-fields.html

3.7.1.3 Stage狀態監控

通過trepctl status-name stages命令查看服務的每個Stage狀態,執行結果如圖3.7所示。

 

 

圖 3.7

3.7.1.4Store狀態監控

通過trepctl status -name stores命令查看服務的每個Store狀態,執行結果如圖3.8所示。

 

圖 3.8

3.7.1.5 Shard狀態監控

通過trepctl status -name shards命令查看服務的每個Shard狀態,主要用於並行復制的場景,執行結果如圖3.9所示。

 

圖 3.9

3.7.2 THL日志查看

THL日志相關查看通過thl命令實現的。

3.7.2.1 THL總體信息

通過thl info命令查看服務的THL總體信息,包括THL文件數量、存放路徑、文件大小、事件個數、事件ID等信息,執行結果如圖3.10所示。

 

圖 3.10

3.7.2.2 THL索引信息

通過thl index命令查看服務所有的THL文件以及其記錄的事件范圍,執行結果如圖3.11所示。

 

圖 3.11

3.7.2.3 THL事件查看

通過thl list指令查看THL中某條或某幾條事件記錄的詳細信息,執行結果如圖3.12和圖3.13所示,分別顯示了MySQL中的一條SQL數據和RowChange數據。

 

圖3.12

 

圖3.13


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM