Mybaties插入、修改和刪除操作自動同步至Elasticsearch


前段時間寫了一個項目,用於攔截mybaties插入、更新和刪除操作,並將修改動作自動同步至elasticsearch。項目代碼中的更新操作帶上了@Param注解,用於定位更新的數據,且第一個參數用的是唯一鍵。上文已經介紹了Mybaties插件基本原理,項目基於該原理實現數據庫操作的同步。

代碼已開源至GitHub,點擊地址查看源碼。

本項目運行的Elasticsearch版本是7.9.0,java客戶端采用spring-data-elasticsearch-4.0.0,之前的很多接口方法都過時。

為了更清晰的理解這個同步器,項目中簡單模擬了一個簡單的電商場景,包含下單、支付、發貨、收貨和刪除操作,數據庫含兩張表:訂單主表“test_trade_order”和商品明細表“test_trade_order_line”,具體字段可看項目源碼。我這里給出了Elasticsearch實體,字段和數據庫對應。

下面列出了部分代碼。

1. OrderEntity和OrderLineEntity

/**
 * 單據ES實體
 *
 * @author Doflamingo
 */
@Data
// 用了表達式生成索引名
@Document(indexName = "#{esConfig.name(T (com.mingo.es.sync.constant.type.EsIndexType).ORDER)}")
public class OrderEntity implements Serializable {

    private static final long serialVersionUID = 1L;

    @Id
    @Field(type = FieldType.Keyword)
    private String id;

    @Field(type = FieldType.Keyword)
    private String tradeNo;

    @Field(type = FieldType.Long)
    private Long buyerId;

    @Field(type = FieldType.Long)
    private Long sellerId;

    @Field(type = FieldType.Integer)
    private Integer type;

    @Field(type = FieldType.Integer)
    private Integer status;

    @Field(type = FieldType.Double)
    private BigDecimal amount;

    @Field(type = FieldType.Double)
    private BigDecimal discountAmount;

    @Field(type = FieldType.Double)
    private BigDecimal originAmount;

    @Field(type = FieldType.Double)
    private BigDecimal payAmount;

    @Field(type = FieldType.Date)
    private Date payTime;

    @Field(type = FieldType.Date)
    private Date deliveryTime;

    @Field(type = FieldType.Date)
    private Date receivingTime;

    @Field(type = FieldType.Date)
    private Date createTime;

    @Field(type = FieldType.Date)
    private Date updateTime;

    @Field(type = FieldType.Integer)
    private Integer version;

    @Field(type = FieldType.Text)
    private String extData;

    @Field(type = FieldType.Integer)
    private Integer deleted;

    @Field(type = FieldType.Nested)
    private List<OrderLineEntity> lines;
}
/**
 * 單據明細es Entity
 */
@Data
public class OrderLineEntity implements Serializable {

    private static final long serialVersionUID = 1L;

    @Field(type = FieldType.Long)
    private Long id;

    @Field(type = FieldType.Keyword)
    private String tradeNo;

    @Field(type = FieldType.Text)
    private String lineNo;

    @Field(type = FieldType.Text)
    private String itemCode;

    @Field(type = FieldType.Text)
    private String itemName;

    @Field(type = FieldType.Text)
    private String unitCode;

    @Field(type = FieldType.Text)
    private String unitName;

    @Field(type = FieldType.Integer)
    private Integer type;

    @Field(type = FieldType.Double)
    private BigDecimal itemPrice;

    @Field(type = FieldType.Double)
    private BigDecimal price;

    @Field(type = FieldType.Double)
    private BigDecimal discountPrice;

    @Field(type = FieldType.Double)
    private BigDecimal itemQty;

    @Field(type = FieldType.Double)
    private BigDecimal totalPrice;

    @Field(type = FieldType.Double)
    private BigDecimal paidPrice;

    @Field(type = FieldType.Date)
    private Date createTime;

    @Field(type = FieldType.Date)
    private Date updateTime;

    @Field(type = FieldType.Integer)
    private Integer version;

    @Field(type = FieldType.Text)
    private String extData;

    @Field(type = FieldType.Integer)
    private Integer deleted;
}

2. ElasticsearchInterceptor攔截器

本文的場景還不到攔截到參數、結果集或Statement的粒度,所以將數據庫表插入、更新和刪除操作同步至elasticsearch,只需用mybaties插件攔截Executorint update(MappedStatement ms, Object parameter) throws SQLException方法,也就是業務在執行insert、update或者delete操作前后都會被該方法攔截。將操作攔截下來,判斷結果是否正確或者滿足,在執行elasticsearch的對應操作。

代碼如下

/**
 * MyBaties 同步Elasticsearch攔截
 * Executor.update(MappedStatement ms, Object parameter)
 *
 * @author Doflamingo
 */
@Component
@Slf4j
@Intercepts({
        @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
})
public class ElasticsearchInterceptor implements Interceptor {

    /**
     * ES同步處理器
     */
    private final Map<String, AbstractEsSyncHandler> handlerMap = new HashMap<>();

    /**
     * 攔截處理
     *
     * @param invocation
     * @return
     * @throws Throwable
     */
    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        Object res = invocation.proceed();

        Object[] args = invocation.getArgs();
        if (2 == args.length) {
            MappedStatement statement = (MappedStatement) args[0];

            // mapper中的方法
            // 如 com.mingo.es.sync.mybaties.mapper.OrderMapper.insert
            String key = statement.getId();
            // 處理
            AbstractEsSyncHandler esSynHandler = handlerMap.get(key);
            if (null != esSynHandler) {
                try {
                    esSynHandler.handler(statement, args[1], res);
                } catch (Exception e) {
                    log.error("ES同步異常:", e);
                    // 為了簡便,示例中這里拋出了異常,會影響業務,如果是含事務場景將會回滾數據庫
                    // 如果不想影響業務,不是強實時,這里可以走順序消息,異步存入ES
                    throw new EsSyncException("ES同步異常:" + e.getMessage());
                }
            }
        }

        return res;
    }

    /**
     * 生成代理對象
     *
     * @param target
     * @return
     */
    @Override
    public Object plugin(Object target) {
        // 代理對象,用於觸發intercept()
        return Plugin.wrap(target, this);
    }

    @Override
    public void setProperties(Properties properties) {
    }

    /**
     * 注冊es處理器
     *
     * @param key
     * @param synHandler
     */
    public void regHandler(String key, AbstractEsSyncHandler synHandler) {
        handlerMap.put(key, synHandler);
    }
}

3. ES同步處理器

這里只給出了插入和更新處理。

/**
 * 插入數據es同步處理器
 *
 * @author Doflamingo
 */
@Slf4j
@Component("insertEsSyncHandler")
class InsertSyncHandlerImpl extends AbstractEsSyncHandler {

    public InsertSyncHandlerImpl(ElasticsearchInterceptor interceptor) {
        // 注冊
        interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.insert", this);
    }

    /**
     * 創建文檔
     *
     * @param statement
     * @param parameter
     */
    private void create(MappedStatement statement, Object parameter) {
        log.info("保存索引:{}", ((OrderDO) parameter).getTradeNo());
        // es實體
        OrderEntity document = convertor.doToEsEntity((OrderDO) parameter);
        // 保存
        OrderEntity save = repository.save(document);
        log.info("保存索引成功:{}", save.getId());
    }

    /**
     * 同步處理器
     *
     * @param statement
     * @param parameter
     */
    @Override
    public void handler(MappedStatement statement, Object parameter, Object res) {
        // sql執行失敗不會保存,這里判斷了返回結果是否null或者大於0
        if (this.checkResult(res)) {
            this.create(statement, parameter);
        }
    }
}
/**
 * 更新數據es同步處理器
 *
 * @author Doflamingo
 */
@Slf4j
@Component("updateEsSyncHandler")
class UpdateSyncHandlerImpl extends AbstractEsSyncHandler {

    public UpdateSyncHandlerImpl(ElasticsearchInterceptor interceptor) {
        // 注冊
        interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.updatePayStatus", this);
        interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.updateDeliveryStatus", this);
        interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.updateReceivingStatus", this);
        interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.updateStatus", this);
    }

    /**
     * 修改文檔
     *
     * @param statement
     * @param parameter
     */
    private void update(MappedStatement statement, Object parameter) {
        String tradeNo = null;
        Object obj = ((MapperMethod.ParamMap) parameter).get("param1");
        if (obj instanceof OrderDO) {
            tradeNo = ((OrderDO) obj).getTradeNo();
        } else {
            tradeNo = obj.toString();
        }
        Optional<OrderEntity> optional = repository.findById(tradeNo);
        log.info("修改索引:{}", tradeNo);
        optional.ifPresent(orderEntity -> {
            this.updateEntity(orderEntity, (MapperMethod.ParamMap) parameter);
            repository.save(orderEntity);
            log.info("修改索引成功:{}", orderEntity.getTradeNo());
        });
    }

    /**
     * 更新索引字段
     *
     * @param orderEntity
     * @param paramMap
     */
    private void updateEntity(OrderEntity orderEntity, MapperMethod.ParamMap paramMap) {
        Field[] fields = orderEntity.getClass().getDeclaredFields();
        Map<String, Field> fieldMap = Arrays.stream(fields).collect(Collectors.toMap(Field::getName, Function.identity()));
        paramMap.forEach((k, v) -> {
            if (!"tradeNo".equals(k) && !"id".equals(k) && !k.toString().startsWith("param")) {
                Field field = fieldMap.get(k);
                if (null != field) {
                    field.setAccessible(true);
                    try {
                        field.set(orderEntity, v);
                    } catch (IllegalAccessException e) {
                    }
                }
            }
        });
    }

    /**
     * 同步處理器
     *
     * @param statement
     * @param parameter
     */
    @Override
    public void handler(MappedStatement statement, Object parameter, Object res) {
        if (this.checkResult(res)) {
            this.update(statement, parameter);
        }
    }
}

4. 測試類TestOrderServiceImplTest

package com.mingo.es.sync.service.impl;

import com.mingo.es.sync.mybaties.dataobjact.OrderDO;
import com.mingo.es.sync.mybaties.dataobjact.OrderLineDO;
import com.mingo.es.sync.repository.EsOrderDocRepository;
import com.mingo.es.sync.service.TestOrderService;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.math.BigDecimal;
import java.util.Date;

@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
class TestOrderServiceImplTest {

    @Autowired
    private TestOrderService testOrderService;

    @Autowired
    protected EsOrderDocRepository repository;

    // 測試單號
    private final String testTradeNo = "20200919018243198";

    // 插入一條數據
    @Test
    void create() {
        OrderDO orderDO = new OrderDO();
        orderDO.setTradeNo(testTradeNo);
        orderDO.setAmount(BigDecimal.ONE);
        orderDO.setBuyerId(9527L);
        orderDO.setCreateTime(new Date());
        orderDO.setDiscountAmount(BigDecimal.ZERO);
        orderDO.setOriginAmount(BigDecimal.ONE);
        orderDO.setType(1);
        orderDO.setSellerId(18899L);
        orderDO.setStatus(1);

        OrderLineDO lineDO = new OrderLineDO();
        lineDO.setCreateTime(orderDO.getCreateTime());
        lineDO.setDiscountPrice(BigDecimal.ZERO);
        lineDO.setItemCode("6352678819");
        lineDO.setItemName("泡椒鳳爪");
        lineDO.setUnitCode("DAI");
        lineDO.setUnitName("袋");
        lineDO.setItemPrice(BigDecimal.ONE);
        lineDO.setPrice(BigDecimal.ONE);
        lineDO.setPaidPrice(BigDecimal.ONE);
        lineDO.setDiscountPrice(BigDecimal.ZERO);
        lineDO.setTotalPrice(BigDecimal.ONE);
        lineDO.setItemQty(BigDecimal.ONE);
        lineDO.setLineNo("1");
        lineDO.setTradeNo(orderDO.getTradeNo());
        lineDO.setType(1);
        orderDO.setLines(Lists.newArrayList(lineDO));
        testOrderService.create(orderDO);
    }

    // 支付操作,更新數據
    @Test
    void updatePayStatus() {
        testOrderService.updatePayStatus(testTradeNo, 3, BigDecimal.ONE, new Date());
    }
    
    // 發貨回調操作,更新數據
    @Test
    void updateDeliveryStatus() {
        testOrderService.updateDeliveryStatus(testTradeNo, 5,  new Date());
    }

    // 收貨回調操作,更新數據
    @Test
    void updateReceivingStatus() {
        testOrderService.updateReceivingStatus(testTradeNo, 7,  new Date());
    }

    // 取消訂單,更新數據
    @Test
    void cancel() {
        testOrderService.cancel(testTradeNo);
    }

    // 物理刪除操作,刪除數據
    @Test
    void delete() {
        testOrderService.delete(testTradeNo);
    }
}

4.1 插入數據運行結果

數據庫表:

ES索引:

4.2 支付更新數據運行結果

數據庫表:

ES索引:

另外,發貨、收貨和邏輯刪除的更新操作一樣,這里沒列出結果。

4.3 物理刪除數據

數據庫表:

ES索引:

數據同步的一致。

5. 並發測試類ConcurrentTestOrderServiceImplTest

package com.mingo.es.sync.service.impl;

import com.mingo.es.sync.document.OrderEntity;
import com.mingo.es.sync.mybaties.dataobjact.OrderDO;
import com.mingo.es.sync.mybaties.dataobjact.OrderLineDO;
import com.mingo.es.sync.repository.EsOrderDocRepository;
import com.mingo.es.sync.service.TestOrderService;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
class ConcurrentTestOrderServiceImplTest {

    @Autowired
    private TestOrderService testOrderService;

    @Autowired
    protected EsOrderDocRepository repository;

    private final AtomicInteger atomicInteger = new AtomicInteger(1);
    private final String YYYYMMDDHHMMSS = "yyyyMMddHHmmssSSS";
    private final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern(YYYYMMDDHHMMSS);

    public String getTradeNo() {
        return LocalDateTime.now().format(FORMATTER) + atomicInteger.getAndAdd(1);
    }

    // 12個線程
    private final ExecutorService executorService =
            new ThreadPoolExecutor(
                    12,
                    12,
                    0,
                    TimeUnit.MILLISECONDS,
                    new LinkedBlockingDeque<>()
            );

    /**
     * 用於測試該時間后的數據刪除:2020-09-19 22:05:33
     */
    private final Date start = new Date(1600524351243L);

    @Test
    void create() throws Exception {

        for (int i = 500; i > 0; i--) {
            // 啟動
            executorService.execute(() -> {
                OrderDO orderDO = new OrderDO();
                orderDO.setTradeNo(this.getTradeNo());
                orderDO.setAmount(BigDecimal.ONE);
                orderDO.setBuyerId(9527L);
                orderDO.setCreateTime(new Date());
                orderDO.setDiscountAmount(BigDecimal.ZERO);
                orderDO.setOriginAmount(BigDecimal.ONE);
                orderDO.setType(1);
                orderDO.setSellerId(18899L);
                orderDO.setStatus(1);

                OrderLineDO lineDO = new OrderLineDO();
                lineDO.setCreateTime(orderDO.getCreateTime());
                lineDO.setDiscountPrice(BigDecimal.ZERO);
                lineDO.setItemCode("6352678819");
                lineDO.setItemName("泡椒鳳爪");
                lineDO.setUnitCode("DAI");
                lineDO.setUnitName("袋");
                lineDO.setItemPrice(BigDecimal.ONE);
                lineDO.setPrice(BigDecimal.ONE);
                lineDO.setPaidPrice(BigDecimal.ONE);
                lineDO.setDiscountPrice(BigDecimal.ZERO);
                lineDO.setTotalPrice(BigDecimal.ONE);
                lineDO.setItemQty(BigDecimal.ONE);
                lineDO.setLineNo("1");
                lineDO.setTradeNo(orderDO.getTradeNo());
                lineDO.setType(1);
                orderDO.setLines(Lists.newArrayList(lineDO));
                testOrderService.create(orderDO);
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(20, TimeUnit.SECONDS);
    }

    @Test
    void updatePayStatus() {
        List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
        entities.parallelStream().forEach(entity ->
                testOrderService.updatePayStatus(entity.getTradeNo(), 3, BigDecimal.ONE, new Date())
        );
    }

    @Test
    void updateDeliveryStatus() {
        List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
        entities.parallelStream().forEach(entity ->
                testOrderService.updateDeliveryStatus(entity.getTradeNo(), 5, new Date())
        );
    }

    @Test
    void updateReceivingStatus() {
        List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
        entities.parallelStream().forEach(entity ->
                testOrderService.updateReceivingStatus(entity.getTradeNo(), 7, new Date())
        );
    }

    @Test
    void cancel() {
        List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
        entities.parallelStream().forEach(entity ->
                testOrderService.cancel(entity.getTradeNo())
        );
    }

    @Test
    void delete() {
        List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
        entities.parallelStream().forEach(entity ->
                testOrderService.delete(entity.getTradeNo())
        );
    }
}

spring-elasticsearch-data功能強大,可以通過方法命名簡化查詢。EsOrderDocRepository代碼

package com.mingo.es.sync.repository;

import com.mingo.es.sync.document.OrderEntity;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

import java.util.Date;
import java.util.List;

/**
 * EsOrderDocRepository
 *
 * @author Doflamingo
 */
public interface EsOrderDocRepository extends ElasticsearchRepository<OrderEntity, String> {

    /**
     * 根據tradeNo查詢單據信息
     *
     * @param tradeNo
     * @return
     */
    OrderEntity findByTradeNo(String tradeNo);

    /**
     * 查詢大於某一個時刻的單據
     *
     * @return
     */
    List<OrderEntity> findByCreateTimeGreaterThan(Date createTime);
}

並發場景下插入很更新正常。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM