簡化ETL工作,編寫一個Canal膠水層


前提

這是一篇憋了很久的文章,一直想寫,卻又一直忘記了寫。整篇文章可能會有點流水賬,相對詳細地介紹怎么寫一個小型的"框架"。這個精悍的膠水層已經在生產環境服役超過半年,這里嘗試把耦合業務的代碼去掉,提煉出一個相對簡潔的版本。

之前寫的幾篇文章里面其中一篇曾經提到過Canal解析MySQLbinlog事件后的對象如下(來源於Canal源碼com.alibaba.otter.canal.protocol.FlatMessage):

如果直接對此原始對象進行解析,那么會出現很多解析模板代碼,一旦有改動就會牽一發動全身,這是我們不希望發生的一件事。於是花了一點點時間寫了一個Canal膠水層,讓接收到的FlatMessage根據表名稱直接轉換為對應的DTO實例,這樣能在一定程度上提升開發效率並且減少模板化代碼,這個膠水層的數據流示意圖如下:

要編寫這樣的膠水層主要用到:

  • 反射。
  • 注解。
  • 策略模式。
  • IOC容器(可選)。

項目的模塊如下:

  • canal-glue-core:核心功能。
  • spring-boot-starter-canal-glue:適配SpringIOC容器,添加自動配置。
  • canal-glue-example:使用例子和基准測試。

下文會詳細分析此膠水層如何實現。

引入依賴

為了不污染引用此模塊的外部服務依賴,除了JSON轉換的依賴之外,其他依賴的scope定義為provide或者test類型,依賴版本和BOM如下:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spring.boot.version>2.3.0.RELEASE</spring.boot.version>
        <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
        <lombok.version>1.18.12</lombok.version>
        <fastjson.version>1.2.73</fastjson.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <scope>import</scope>
            <type>pom</type>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
</dependencies>

其中,canal-glue-core模塊本質上只依賴於fastjson,可以完全脫離spring體系使用。

基本架構

這里提供一個"后知后覺"的架構圖,因為之前為了快速懟到線上,初版沒有考慮這么多,甚至還耦合了業務代碼,組件是后來抽離出來的:

設計配置模塊(已經移除)

設計配置模塊在設計的時候考慮使用了外置配置文件和純注解兩種方式,前期使用了JSON外置配置文件的方式,純注解是后來增加的,二選一。這一節簡單介紹一下JSON外置配置文件的配置加載,純注解留到后面處理器模塊時候分析。

當初是想快速進行膠水層的開發,所以配置文件使用了可讀性比較高的JSON格式:

{
  "version": 1,
  "module": "canal-glue",
  "databases": [
    {
      "database": "db_payment_service",
      "processors": [
        {
          "table": "payment_order",
          "processor": "x.y.z.PaymentOrderProcessor",
          "exceptionHandler": "x.y.z.PaymentOrderExceptionHandler"
        }
      ]
    },
    {
      ......
    }
  ]
}

JSON配置在設計的時候盡可能不要使用JSON Array作為頂層配置,因為這樣做設計的對象會比較怪

因為使用該模塊的應用有可能需要處理Canal解析多個上游數據庫的binlog事件,所以配置模塊設計的時候需要以databaseKEY,掛載多個table以及對應的表binlog事件處理器以及異常處理器。然后對着JSON文件的格式擼一遍對應的實體類出來:

@Data
public class CanalGlueProcessorConf {

    private String table;

    private String processor;

    private String exceptionHandler;
}

@Data
public class CanalGlueDatabaseConf {

    private String database;

    private List<CanalGlueProcessorConf> processors;
}

@Data
public class CanalGlueConf {

    private Long version;

    private String module;

    private List<CanalGlueDatabaseConf> database;
}

實體編寫完,接着可以編寫一個配置加載器,簡單起見,配置文件直接放ClassPath之下,加載器如下:

public interface CanalGlueConfLoader {

    CanalGlueConf load(String location);
}

// 實現
public class ClassPathCanalGlueConfLoader implements CanalGlueConfLoader {

    @Override
    public CanalGlueConf load(String location) {
        ClassPathResource resource = new ClassPathResource(location);
        Assert.isTrue(resource.exists(), String.format("類路徑下不存在文件%s", location));
        try {
            String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            return JSON.parseObject(content, CanalGlueConf.class);
        } catch (IOException e) {
            // should not reach
            throw new IllegalStateException(e);
        }
    }
}

讀取ClassPath下的某個location為絕對路徑的文件內容字符串,然后使用Fasfjson轉成CanalGlueConf對象。這個是默認的實現,使用canal-glue模塊可以覆蓋此實現,通過自定義的實現加載配置。

JSON配置模塊在后來從業務系統抽離此膠水層的時候已經完全廢棄,使用純注解驅動和核心抽象組件繼承的方式實現。

核心模塊開發

主要包括幾個模塊:

  • 基本模型定義。
  • 適配器層開發。
  • 轉換器和解析器層開發。
  • 處理器層開發。
  • 全局組件自動配置模塊開發(僅限於Spring體系,已經抽取到spring-boot-starter-canal-glue模塊)。
  • CanalGlue開發。

基本模型定義

定義頂層的KEY,也就是對於某個數據庫的某一個確定的表,需要一個唯一標識:

// 模型表對象
public interface ModelTable {

    String database();

    String table();

    static ModelTable of(String database, String table) {
        return DefaultModelTable.of(database, table);
    }
}

@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
public class DefaultModelTable implements ModelTable {

    private final String database;
    private final String table;

    @Override
    public String database() {
        return database;
    }

    @Override
    public String table() {
        return table;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        DefaultModelTable that = (DefaultModelTable) o;
        return Objects.equals(database, that.database) &&
                Objects.equals(table, that.table);
    }

    @Override
    public int hashCode() {
        return Objects.hash(database, table);
    }
}

這里實現類DefaultModelTable重寫了equals()hashCode()方法便於把ModelTable實例應用為HashMap容器的KEY,這樣后面就可以設計ModelTable -> Processor的緩存結構。

由於Canal投放到Kafka的事件內容是一個原始字符串,所以要定義一個和前文提到的FlatMessage基本一致的事件類CanalBinLogEvent

@Data
public class CanalBinLogEvent {

    /**
     * 事件ID,沒有實際意義
     */
    private Long id;

    /**
     * 當前更變后節點數據
     */
    private List<Map<String, String>> data;

    /**
     * 主鍵列名稱列表
     */
    private List<String> pkNames;

    /**
     * 當前更變前節點數據
     */
    private List<Map<String, String>> old;

    /**
     * 類型 UPDATE\INSERT\DELETE\QUERY
     */
    private String type;

    /**
     * binlog execute time
     */
    private Long es;

    /**
     * dml build timestamp
     */
    private Long ts;

    /**
     * 執行的sql,不一定存在
     */
    private String sql;

    /**
     * 數據庫名稱
     */
    private String database;

    /**
     * 表名稱
     */
    private String table;

    /**
     * SQL類型映射
     */
    private Map<String, Integer> sqlType;

    /**
     * MySQL字段類型映射
     */
    private Map<String, String> mysqlType;

    /**
     * 是否DDL
     */
    private Boolean isDdl;
}

根據此事件對象,再定義解析完畢后的結果對象CanalBinLogResult

// 常量
@RequiredArgsConstructor
@Getter
public enum BinLogEventType {
    
    QUERY("QUERY", "查詢"),

    INSERT("INSERT", "新增"),

    UPDATE("UPDATE", "更新"),

    DELETE("DELETE", "刪除"),

    ALTER("ALTER", "列修改操作"),

    UNKNOWN("UNKNOWN", "未知"),

    ;

    private final String type;
    private final String description;

    public static BinLogEventType fromType(String type) {
        for (BinLogEventType binLogType : BinLogEventType.values()) {
            if (binLogType.getType().equals(type)) {
                return binLogType;
            }
        }
        return BinLogEventType.UNKNOWN;
    }
}

// 常量
@RequiredArgsConstructor
@Getter
public enum OperationType {

    /**
     * DML
     */
    DML("dml", "DML語句"),

    /**
     * DDL
     */
    DDL("ddl", "DDL語句"),
    ;

    private final String type;
    private final String description;
}

@Data
public class CanalBinLogResult<T> {

    /**
     * 提取的長整型主鍵
     */
    private Long primaryKey;


    /**
     * binlog事件類型
     */
    private BinLogEventType binLogEventType;

    /**
     * 更變前的數據
     */
    private T beforeData;

    /**
     * 更變后的數據
     */
    private T afterData;

    /**
     * 數據庫名稱
     */
    private String databaseName;

    /**
     * 表名稱
     */
    private String tableName;

    /**
     * sql語句 - 一般是DDL的時候有用
     */
    private String sql;

    /**
     * MySQL操作類型
     */
    private OperationType operationType;
}

開發適配器層

定義頂層的適配器SPI接口:

public interface SourceAdapter<SOURCE, SINK> {

    SINK adapt(SOURCE source);
}

接着開發適配器實現類:

// 原始字符串直接返回
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
class RawStringSourceAdapter implements SourceAdapter<String, String> {

    @Override
    public String adapt(String source) {
        return source;
    }
}

// Fastjson轉換
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
class FastJsonSourceAdapter<T> implements SourceAdapter<String, T> {

    private final Class<T> klass;

    @Override
    public T adapt(String source) {
        if (StringUtils.isEmpty(source)) {
            return null;
        }
        return JSON.parseObject(source, klass);
    }
}

// Facade
public enum SourceAdapterFacade {

    /**
     * 單例
     */
    X;

    private static final SourceAdapter<String, String> I_S_A = RawStringSourceAdapter.of();

    @SuppressWarnings("unchecked")
    public <T> T adapt(Class<T> klass, String source) {
        if (klass.isAssignableFrom(String.class)) {
            return (T) I_S_A.adapt(source);
        }
        return FastJsonSourceAdapter.of(klass).adapt(source);
    }
}

最終直接使用SourceAdapterFacade#adapt()方法即可,因為實際上絕大多數情況下只會使用原始字符串和String -> Class實例,適配器層設計可以簡單點。

開發轉換器和解析器層

對於Canal解析完成的binlog事件,dataold屬性是K-V結構,並且KEY都是String類型,需要遍歷解析才能推導出完整的目標實例。

轉換后的實例的屬性類型目前只支持包裝類,int等原始類型不支持

為了更好地通過目標實體和實際的數據庫、表和列名稱、列類型進行映射,引入了兩個自定義注解CanalModel@CanalField,它們的定義如下:

// @CanalModel
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface CanalModel {

    /**
     * 目標數據庫
     */
    String database();

    /**
     * 目標表
     */
    String table();

    /**
     * 屬性名 -> 列名命名轉換策略,可選值有:DEFAULT(原始)、UPPER_UNDERSCORE(駝峰轉下划線大寫)和LOWER_UNDERSCORE(駝峰轉下划線小寫)
     */
    FieldNamingPolicy fieldNamingPolicy() default FieldNamingPolicy.DEFAULT;
}

// @CanalField
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface CanalField {

    /**
     * 行名稱
     *
     * @return columnName
     */
    String columnName() default "";

    /**
     * sql字段類型
     *
     * @return JDBCType
     */
    JDBCType sqlType() default JDBCType.NULL;

    /**
     * 轉換器類型
     *
     * @return klass
     */
    Class<? extends BaseCanalFieldConverter<?>> converterKlass() default NullCanalFieldConverter.class;
}

定義頂層轉換器接口BinLogFieldConverter

public interface BinLogFieldConverter<SOURCE, TARGET> {

    TARGET convert(SOURCE source);
}

目前暫定可以通過目標屬性的Class和通過注解指定的SQLType類型進行匹配,所以再定義一個抽象轉換器BaseCanalFieldConverter

public abstract class BaseCanalFieldConverter<T> implements BinLogFieldConverter<String, T> {

    private final SQLType sqlType;
    private final Class<?> klass;

    protected BaseCanalFieldConverter(SQLType sqlType, Class<?> klass) {
        this.sqlType = sqlType;
        this.klass = klass;
    }

    @Override
    public T convert(String source) {
        if (StringUtils.isEmpty(source)) {
            return null;
        }
        return convertInternal(source);
    }

    /**
     * 內部轉換方法
     *
     * @param source 源字符串
     * @return T
     */
    protected abstract T convertInternal(String source);

    /**
     * 返回SQL類型
     *
     * @return SQLType
     */
    public SQLType sqlType() {
        return sqlType;
    }

    /**
     * 返回類型
     *
     * @return Class<?>
     */
    public Class<?> typeKlass() {
        return klass;
    }
}

BaseCanalFieldConverter是面向目標實例中的單個屬性的,例如對於實例中的Long類型的屬性,可以實現一個BigIntCanalFieldConverter

public class BigIntCanalFieldConverter extends BaseCanalFieldConverter<Long> {

    /**
     * 單例
     */
    public static final BaseCanalFieldConverter<Long> X = new BigIntCanalFieldConverter();

    private BigIntCanalFieldConverter() {
        super(JDBCType.BIGINT, Long.class);
    }

    @Override
    protected Long convertInternal(String source) {
        if (null == source) {
            return null;
        }
        return Long.valueOf(source);
    }
}

其他類型以此類推,目前已經開發好的最常用的內建轉換器如下:

JDBCType JAVAType 轉換器
NULL Void NullCanalFieldConverter
BIGINT Long BigIntCanalFieldConverter
VARCHAR String VarcharCanalFieldConverter
DECIMAL BigDecimal DecimalCanalFieldConverter
INTEGER Integer IntCanalFieldConverter
TINYINT Integer TinyIntCanalFieldConverter
DATE java.time.LocalDate SqlDateCanalFieldConverter0
DATE java.sql.Date SqlDateCanalFieldConverter1
TIMESTAMP java.time.LocalDateTime TimestampCanalFieldConverter0
TIMESTAMP java.util.Date TimestampCanalFieldConverter1
TIMESTAMP java.time.OffsetDateTime TimestampCanalFieldConverter2

所有轉換器實現都設計為無狀態的單例,方便做動態注冊和覆蓋。接着定義一個轉換器工廠CanalFieldConverterFactory,提供API通過指定參數加載目標轉換器實例:

// 入參
@SuppressWarnings("rawtypes")
@Builder
@Data
public class CanalFieldConvertInput {

    private Class<?> fieldKlass;
    private Class<? extends BaseCanalFieldConverter> converterKlass;
    private SQLType sqlType;

    @Tolerate
    public CanalFieldConvertInput() {

    }
}

// 結果
@Builder
@Getter
public class CanalFieldConvertResult {

    private final BaseCanalFieldConverter<?> converter;
}

// 接口
public interface CanalFieldConverterFactory {

    default void registerConverter(BaseCanalFieldConverter<?> converter) {
        registerConverter(converter, true);
    }

    void registerConverter(BaseCanalFieldConverter<?> converter, boolean replace);

    CanalFieldConvertResult load(CanalFieldConvertInput input);
}

CanalFieldConverterFactory提供了可以注冊自定義轉化器的registerConverter()方法,這樣就可以讓使用者注冊自定義的轉換器和覆蓋默認的轉換器。

至此,可以通過指定的參數,加載實例屬性的轉換器,拿到轉換器實例,就可以針對目標實例,從原始事件中解析對應的K-V結構。接着需要編寫最核心的解析器模塊,此模塊主要包含三個方面:

  • 唯一BIGINT類型主鍵的解析(這一點是公司技術規范的一條鐵規則,MySQL每個表只能定義唯一的BIGINT UNSIGNED自增趨勢主鍵)。
  • 更變前的數據,對應於原始事件中的old屬性節點(不一定存在,例如INSERT語句中不存在此屬性節點)。
  • 更變后的數據,對應於原始事件中的data屬性節點。

定義解析器接口CanalBinLogEventParser如下:

public interface CanalBinLogEventParser {

    /**
     * 解析binlog事件
     *
     * @param event               事件
     * @param klass               目標類型
     * @param primaryKeyFunction  主鍵映射方法
     * @param commonEntryFunction 其他屬性映射方法
     * @return CanalBinLogResult
     */
    <T> List<CanalBinLogResult<T>> parse(CanalBinLogEvent event,
                                         Class<T> klass,
                                         BasePrimaryKeyTupleFunction primaryKeyFunction,
                                         BaseCommonEntryFunction<T> commonEntryFunction);
}

解析器的解析方法依賴於:

  • binlog事件實例,這個是上游的適配器組件的結果。
  • 轉換的目標類型。
  • BasePrimaryKeyTupleFunction主鍵映射方法實例,默認使用內建的BigIntPrimaryKeyTupleFunction
  • BaseCommonEntryFunction非主鍵通用列-屬性映射方法實例,默認使用內建的ReflectionBinLogEntryFunction這個是非主鍵列的轉換核心,里面使用到了反射)。

解析返回結果是一個List,原因是FlatMessage在批量寫入的時候的數據結構本來就是一個List<Map<String,String>>,這里只是"順水推舟"。

開發處理器層

處理器是開發者處理最終解析出來的實體的入口,只需要面向不同類型的事件選擇對應的處理方法即可,看起來如下:

public abstract class BaseCanalBinlogEventProcessor<T> extends BaseParameterizedTypeReferenceSupport<T> {

    protected void processInsertInternal(CanalBinLogResult<T> result) {
    }

    protected void processUpdateInternal(CanalBinLogResult<T> result) {
    }

    protected void processDeleteInternal(CanalBinLogResult<T> result) {
    }

    protected void processDDLInternal(CanalBinLogResult<T> result) {
    }
}

例如需要處理Insert事件,則子類繼承BaseCanalBinlogEventProcessor,對應的實體類(泛型的替換)使用@CanalModel注解聲明,然后覆蓋processInsertInternal()方法即可。期間子處理器可以覆蓋自定義異常處理器實例,如:

@Override
protected ExceptionHandler exceptionHandler() {
    return EXCEPTION_HANDLER;
}

/**
    * 覆蓋默認的ExceptionHandler.NO_OP
    */
private static final ExceptionHandler EXCEPTION_HANDLER = (event, throwable)
        -> log.error("解析binlog事件出現異常,事件內容:{}", JSON.toJSONString(event), throwable);

另外,有些場景需要對回調前或者回調后的結果做特化處理,因此引入了解析結果攔截器(鏈)的實現,對應的類是BaseParseResultInterceptor

public abstract class BaseParseResultInterceptor<T> extends BaseParameterizedTypeReferenceSupport<T> {

    public BaseParseResultInterceptor() {
        super();
    }

    public void onParse(ModelTable modelTable) {

    }

    public void onBeforeInsertProcess(ModelTable modelTable, T beforeData, T afterData) {

    }

    public void onAfterInsertProcess(ModelTable modelTable, T beforeData, T afterData) {

    }

    public void onBeforeUpdateProcess(ModelTable modelTable, T beforeData, T afterData) {

    }

    public void onAfterUpdateProcess(ModelTable modelTable, T beforeData, T afterData) {

    }

    public void onBeforeDeleteProcess(ModelTable modelTable, T beforeData, T afterData) {

    }

    public void onAfterDeleteProcess(ModelTable modelTable, T beforeData, T afterData) {

    }

    public void onBeforeDDLProcess(ModelTable modelTable, T beforeData, T afterData, String sql) {

    }

    public void onAfterDDLProcess(ModelTable modelTable, T beforeData, T afterData, String sql) {

    }

    public void onParseFinish(ModelTable modelTable) {

    }

    public void onParseCompletion(ModelTable modelTable) {

    }
}

解析結果攔截器的回調時機可以參看上面的架構圖或者BaseCanalBinlogEventProcessor的源代碼。

開發全局組件自動配置模塊

如果使用了Spring容器,需要添加一個配置類來加載所有既有的組件,添加一個全局配置類CanalGlueAutoConfiguration(這個類可以在項目的spring-boot-starter-canal-glue模塊中看到,這個模塊就只有一個類):

@Configuration
public class CanalGlueAutoConfiguration implements SmartInitializingSingleton, BeanFactoryAware {

    private ConfigurableListableBeanFactory configurableListableBeanFactory;

    @Bean
    @ConditionalOnMissingBean
    public CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory() {
        return InMemoryCanalBinlogEventProcessorFactory.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public ModelTableMetadataManager modelTableMetadataManager(CanalFieldConverterFactory canalFieldConverterFactory) {
        return InMemoryModelTableMetadataManager.of(canalFieldConverterFactory);
    }

    @Bean
    @ConditionalOnMissingBean
    public CanalFieldConverterFactory canalFieldConverterFactory() {
        return InMemoryCanalFieldConverterFactory.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public CanalBinLogEventParser canalBinLogEventParser() {
        return DefaultCanalBinLogEventParser.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public ParseResultInterceptorManager parseResultInterceptorManager(ModelTableMetadataManager modelTableMetadataManager) {
        return InMemoryParseResultInterceptorManager.of(modelTableMetadataManager);
    }

    @Bean
    @Primary
    public CanalGlue canalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {
        return DefaultCanalGlue.of(canalBinlogEventProcessorFactory);
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory;
    }

    @SuppressWarnings({"rawtypes", "unchecked"})
    @Override
    public void afterSingletonsInstantiated() {
        ParseResultInterceptorManager parseResultInterceptorManager
                = configurableListableBeanFactory.getBean(ParseResultInterceptorManager.class);
        ModelTableMetadataManager modelTableMetadataManager
                = configurableListableBeanFactory.getBean(ModelTableMetadataManager.class);
        CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory
                = configurableListableBeanFactory.getBean(CanalBinlogEventProcessorFactory.class);
        CanalBinLogEventParser canalBinLogEventParser
                = configurableListableBeanFactory.getBean(CanalBinLogEventParser.class);
        Map<String, BaseParseResultInterceptor> interceptors
                = configurableListableBeanFactory.getBeansOfType(BaseParseResultInterceptor.class);
        interceptors.forEach((k, interceptor) -> parseResultInterceptorManager.registerParseResultInterceptor(interceptor));
        Map<String, BaseCanalBinlogEventProcessor> processors
                = configurableListableBeanFactory.getBeansOfType(BaseCanalBinlogEventProcessor.class);
        processors.forEach((k, processor) -> processor.init(canalBinLogEventParser, modelTableMetadataManager,
                canalBinlogEventProcessorFactory, parseResultInterceptorManager));
    }
}

為了更好地讓其他服務引入此配置類,可以使用spring.factories的特性。新建resources/META-INF/spring.factories文件,內容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.throwx.canal.gule.config.CanalGlueAutoConfiguration

這樣子通過引入spring-boot-starter-canal-glue就可以激活所有用到的組件並且初始化所有已經添加到Spring容器中的處理器。

CanalGlue開發

CanalGlue其實就是提供binlog事件字符串的處理入口,目前定義為一個接口:

public interface CanalGlue {

    void process(String content);
}

此接口的實現DefaultCanalGlue也十分簡單:

@RequiredArgsConstructor(access = AccessLevel.PUBLIC, staticName = "of")
public class DefaultCanalGlue implements CanalGlue {

    private final CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory;

    @Override
    public void process(String content) {
        CanalBinLogEvent event = SourceAdapterFacade.X.adapt(CanalBinLogEvent.class, content);
        ModelTable modelTable = ModelTable.of(event.getDatabase(), event.getTable());
        canalBinlogEventProcessorFactory.get(modelTable).forEach(processor -> processor.process(event));
    }
}

使用源適配器把字符串轉換為CanalBinLogEvent實例,再委托處理器工廠尋找對應的BaseCanalBinlogEventProcessor列表去處理輸入的事件實例。

使用canal-glue

主要包括下面幾個維度,都在canal-glue-exampletest包下:

  • [x] 一般情況下使用處理器處理INSERT事件。
  • [x] 自定義針對DDL變更的預警父處理器,實現DDL變更預警。
  • [x] 單表對應多個處理器。
  • [x] 使用解析結果處理器針對特定字段進行AES加解密處理。
  • [x] 非Spring容器下,一般編程式使用。
  • [ ] 使用openjdk-jmh進行Benchmark基准性能測試。

這里簡單提一下在Spring體系下的使用方式,引入依賴spring-boot-starter-canal-glue

<dependency>
    <groupId>cn.throwx</groupId>
    <artifactId>spring-boot-starter-canal-glue</artifactId>
    <version>版本號</version>
</dependency>

編寫一個實體或者DTOOrderModel

@Data
@CanalModel(database = "db_order_service", table = "t_order", fieldNamingPolicy = FieldNamingPolicy.LOWER_UNDERSCORE)
public static class OrderModel {

    private Long id;

    private String orderId;

    private OffsetDateTime createTime;

    private BigDecimal amount;
}

這里使用了@CanalModel注解綁定了數據庫db_order_service和表t_order,屬性名-列名映射策略為駝峰轉小寫下划線。接着定義一個處理器OrderProcessor和自定義異常處理器(可選,這里是為了模擬在處理事件的時候拋出自定義異常):

@Component
public class OrderProcessor extends BaseCanalBinlogEventProcessor<OrderModel> {

    @Override
    protected void processInsertInternal(CanalBinLogResult<OrderModel> result) {
        OrderModel orderModel = result.getAfterData();
        logger.info("接收到訂單保存binlog,主鍵:{},模擬拋出異常...", orderModel.getId());
        throw new RuntimeException(String.format("[id:%d]", orderModel.getId()));
    }

    @Override
    protected ExceptionHandler exceptionHandler() {
        return EXCEPTION_HANDLER;
    }

    /**
        * 覆蓋默認的ExceptionHandler.NO_OP
        */
    private static final ExceptionHandler EXCEPTION_HANDLER = (event, throwable)
            -> log.error("解析binlog事件出現異常,事件內容:{}", JSON.toJSONString(event), throwable);
}

假設一個寫入訂單數據的binlog事件如下:

{
  "data": [
    {
      "id": "1",
      "order_id": "10086",
      "amount": "999.0",
      "create_time": "2020-03-02 05:12:49"
    }
  ],
  "database": "db_order_service",
  "es": 1583143969000,
  "id": 3,
  "isDdl": false,
  "mysqlType": {
    "id": "BIGINT",
    "order_id": "VARCHAR(64)",
    "amount": "DECIMAL(10,2)",
    "create_time": "DATETIME"
  },
  "old": null,
  "pkNames": [
    "id"
  ],
  "sql": "",
  "sqlType": {
    "id": -5,
    "order_id": 12,
    "amount": 3,
    "create_time": 93
  },
  "table": "t_order",
  "ts": 1583143969460,
  "type": "INSERT"
}

執行結果如下:

如果直接對接Canal投放到KafkaTopic也很簡單,配合Kafka的消費者使用的示例如下:

@Slf4j
@Component
@RequiredArgsConstructor
public class CanalEventListeners {

    private final CanalGlue canalGlue;

    @KafkaListener(
            id = "${canal.event.order.listener.id:db-order-service-listener}",
            topics = "db_order_service", 
            containerFactory = "kafkaListenerContainerFactory"
    )
    public void onCrmMessage(String content) {
        canalGlue.process(content);
    }    
}

小結

筆者開發這個canal-glue的初衷是需要做一個極大提升效率的大型字符串轉換器,因為剛剛接觸到"小數據"領域,而且人手不足,而且需要處理下游大量的報表,因為不可能花大量人力在處理這些不停重復的模板化代碼上。雖然整體設計還不是十分優雅,至少在提升開發效率這個點上canal-glue做到了。

項目倉庫:

  • Giteehttps://gitee.com/throwableDoge/canal-glue

倉庫最新代碼暫時放在develop分支

(本文完 c-15-d e-a-20201005 鴿了快一個月)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM