前段時間寫了一個項目,用於攔截mybaties插入、更新和刪除操作,並將修改動作自動同步至elasticsearch。項目代碼中的更新操作帶上了@Param注解,用於定位更新的數據,且第一個參數用的是唯一鍵。上文已經介紹了Mybaties插件基本原理,項目基於該原理實現數據庫操作的同步。
代碼已開源至GitHub,點擊地址查看源碼。
本項目運行的Elasticsearch版本是7.9.0,java客戶端采用spring-data-elasticsearch-4.0.0,之前的很多接口方法都過時。
為了更清晰的理解這個同步器,項目中簡單模擬了一個簡單的電商場景,包含下單、支付、發貨、收貨和刪除操作,數據庫含兩張表:訂單主表“test_trade_order”和商品明細表“test_trade_order_line”,具體字段可看項目源碼。我這里給出了Elasticsearch實體,字段和數據庫對應。
下面列出了部分代碼。
1. OrderEntity和OrderLineEntity
/**
* 單據ES實體
*
* @author Doflamingo
*/
@Data
// 用了表達式生成索引名
@Document(indexName = "#{esConfig.name(T (com.mingo.es.sync.constant.type.EsIndexType).ORDER)}")
public class OrderEntity implements Serializable {
private static final long serialVersionUID = 1L;
@Id
@Field(type = FieldType.Keyword)
private String id;
@Field(type = FieldType.Keyword)
private String tradeNo;
@Field(type = FieldType.Long)
private Long buyerId;
@Field(type = FieldType.Long)
private Long sellerId;
@Field(type = FieldType.Integer)
private Integer type;
@Field(type = FieldType.Integer)
private Integer status;
@Field(type = FieldType.Double)
private BigDecimal amount;
@Field(type = FieldType.Double)
private BigDecimal discountAmount;
@Field(type = FieldType.Double)
private BigDecimal originAmount;
@Field(type = FieldType.Double)
private BigDecimal payAmount;
@Field(type = FieldType.Date)
private Date payTime;
@Field(type = FieldType.Date)
private Date deliveryTime;
@Field(type = FieldType.Date)
private Date receivingTime;
@Field(type = FieldType.Date)
private Date createTime;
@Field(type = FieldType.Date)
private Date updateTime;
@Field(type = FieldType.Integer)
private Integer version;
@Field(type = FieldType.Text)
private String extData;
@Field(type = FieldType.Integer)
private Integer deleted;
@Field(type = FieldType.Nested)
private List<OrderLineEntity> lines;
}
/**
* 單據明細es Entity
*/
@Data
public class OrderLineEntity implements Serializable {
private static final long serialVersionUID = 1L;
@Field(type = FieldType.Long)
private Long id;
@Field(type = FieldType.Keyword)
private String tradeNo;
@Field(type = FieldType.Text)
private String lineNo;
@Field(type = FieldType.Text)
private String itemCode;
@Field(type = FieldType.Text)
private String itemName;
@Field(type = FieldType.Text)
private String unitCode;
@Field(type = FieldType.Text)
private String unitName;
@Field(type = FieldType.Integer)
private Integer type;
@Field(type = FieldType.Double)
private BigDecimal itemPrice;
@Field(type = FieldType.Double)
private BigDecimal price;
@Field(type = FieldType.Double)
private BigDecimal discountPrice;
@Field(type = FieldType.Double)
private BigDecimal itemQty;
@Field(type = FieldType.Double)
private BigDecimal totalPrice;
@Field(type = FieldType.Double)
private BigDecimal paidPrice;
@Field(type = FieldType.Date)
private Date createTime;
@Field(type = FieldType.Date)
private Date updateTime;
@Field(type = FieldType.Integer)
private Integer version;
@Field(type = FieldType.Text)
private String extData;
@Field(type = FieldType.Integer)
private Integer deleted;
}
2. ElasticsearchInterceptor攔截器
本文的場景還不到攔截到參數、結果集或Statement的粒度,所以將數據庫表插入、更新和刪除操作同步至elasticsearch,只需用mybaties插件攔截Executor中int update(MappedStatement ms, Object parameter) throws SQLException方法,也就是業務在執行insert、update或者delete操作前后都會被該方法攔截。將操作攔截下來,判斷結果是否正確或者滿足,在執行elasticsearch的對應操作。
代碼如下
/**
* MyBaties 同步Elasticsearch攔截
* Executor.update(MappedStatement ms, Object parameter)
*
* @author Doflamingo
*/
@Component
@Slf4j
@Intercepts({
@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
})
public class ElasticsearchInterceptor implements Interceptor {
/**
* ES同步處理器
*/
private final Map<String, AbstractEsSyncHandler> handlerMap = new HashMap<>();
/**
* 攔截處理
*
* @param invocation
* @return
* @throws Throwable
*/
@Override
public Object intercept(Invocation invocation) throws Throwable {
Object res = invocation.proceed();
Object[] args = invocation.getArgs();
if (2 == args.length) {
MappedStatement statement = (MappedStatement) args[0];
// mapper中的方法
// 如 com.mingo.es.sync.mybaties.mapper.OrderMapper.insert
String key = statement.getId();
// 處理
AbstractEsSyncHandler esSynHandler = handlerMap.get(key);
if (null != esSynHandler) {
try {
esSynHandler.handler(statement, args[1], res);
} catch (Exception e) {
log.error("ES同步異常:", e);
// 為了簡便,示例中這里拋出了異常,會影響業務,如果是含事務場景將會回滾數據庫
// 如果不想影響業務,不是強實時,這里可以走順序消息,異步存入ES
throw new EsSyncException("ES同步異常:" + e.getMessage());
}
}
}
return res;
}
/**
* 生成代理對象
*
* @param target
* @return
*/
@Override
public Object plugin(Object target) {
// 代理對象,用於觸發intercept()
return Plugin.wrap(target, this);
}
@Override
public void setProperties(Properties properties) {
}
/**
* 注冊es處理器
*
* @param key
* @param synHandler
*/
public void regHandler(String key, AbstractEsSyncHandler synHandler) {
handlerMap.put(key, synHandler);
}
}
3. ES同步處理器
這里只給出了插入和更新處理。
/**
* 插入數據es同步處理器
*
* @author Doflamingo
*/
@Slf4j
@Component("insertEsSyncHandler")
class InsertSyncHandlerImpl extends AbstractEsSyncHandler {
public InsertSyncHandlerImpl(ElasticsearchInterceptor interceptor) {
// 注冊
interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.insert", this);
}
/**
* 創建文檔
*
* @param statement
* @param parameter
*/
private void create(MappedStatement statement, Object parameter) {
log.info("保存索引:{}", ((OrderDO) parameter).getTradeNo());
// es實體
OrderEntity document = convertor.doToEsEntity((OrderDO) parameter);
// 保存
OrderEntity save = repository.save(document);
log.info("保存索引成功:{}", save.getId());
}
/**
* 同步處理器
*
* @param statement
* @param parameter
*/
@Override
public void handler(MappedStatement statement, Object parameter, Object res) {
// sql執行失敗不會保存,這里判斷了返回結果是否null或者大於0
if (this.checkResult(res)) {
this.create(statement, parameter);
}
}
}
/**
* 更新數據es同步處理器
*
* @author Doflamingo
*/
@Slf4j
@Component("updateEsSyncHandler")
class UpdateSyncHandlerImpl extends AbstractEsSyncHandler {
public UpdateSyncHandlerImpl(ElasticsearchInterceptor interceptor) {
// 注冊
interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.updatePayStatus", this);
interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.updateDeliveryStatus", this);
interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.updateReceivingStatus", this);
interceptor.regHandler("com.mingo.es.sync.mybaties.mapper.OrderMapper.updateStatus", this);
}
/**
* 修改文檔
*
* @param statement
* @param parameter
*/
private void update(MappedStatement statement, Object parameter) {
String tradeNo = null;
Object obj = ((MapperMethod.ParamMap) parameter).get("param1");
if (obj instanceof OrderDO) {
tradeNo = ((OrderDO) obj).getTradeNo();
} else {
tradeNo = obj.toString();
}
Optional<OrderEntity> optional = repository.findById(tradeNo);
log.info("修改索引:{}", tradeNo);
optional.ifPresent(orderEntity -> {
this.updateEntity(orderEntity, (MapperMethod.ParamMap) parameter);
repository.save(orderEntity);
log.info("修改索引成功:{}", orderEntity.getTradeNo());
});
}
/**
* 更新索引字段
*
* @param orderEntity
* @param paramMap
*/
private void updateEntity(OrderEntity orderEntity, MapperMethod.ParamMap paramMap) {
Field[] fields = orderEntity.getClass().getDeclaredFields();
Map<String, Field> fieldMap = Arrays.stream(fields).collect(Collectors.toMap(Field::getName, Function.identity()));
paramMap.forEach((k, v) -> {
if (!"tradeNo".equals(k) && !"id".equals(k) && !k.toString().startsWith("param")) {
Field field = fieldMap.get(k);
if (null != field) {
field.setAccessible(true);
try {
field.set(orderEntity, v);
} catch (IllegalAccessException e) {
}
}
}
});
}
/**
* 同步處理器
*
* @param statement
* @param parameter
*/
@Override
public void handler(MappedStatement statement, Object parameter, Object res) {
if (this.checkResult(res)) {
this.update(statement, parameter);
}
}
}
4. 測試類TestOrderServiceImplTest
package com.mingo.es.sync.service.impl;
import com.mingo.es.sync.mybaties.dataobjact.OrderDO;
import com.mingo.es.sync.mybaties.dataobjact.OrderLineDO;
import com.mingo.es.sync.repository.EsOrderDocRepository;
import com.mingo.es.sync.service.TestOrderService;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.math.BigDecimal;
import java.util.Date;
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
class TestOrderServiceImplTest {
@Autowired
private TestOrderService testOrderService;
@Autowired
protected EsOrderDocRepository repository;
// 測試單號
private final String testTradeNo = "20200919018243198";
// 插入一條數據
@Test
void create() {
OrderDO orderDO = new OrderDO();
orderDO.setTradeNo(testTradeNo);
orderDO.setAmount(BigDecimal.ONE);
orderDO.setBuyerId(9527L);
orderDO.setCreateTime(new Date());
orderDO.setDiscountAmount(BigDecimal.ZERO);
orderDO.setOriginAmount(BigDecimal.ONE);
orderDO.setType(1);
orderDO.setSellerId(18899L);
orderDO.setStatus(1);
OrderLineDO lineDO = new OrderLineDO();
lineDO.setCreateTime(orderDO.getCreateTime());
lineDO.setDiscountPrice(BigDecimal.ZERO);
lineDO.setItemCode("6352678819");
lineDO.setItemName("泡椒鳳爪");
lineDO.setUnitCode("DAI");
lineDO.setUnitName("袋");
lineDO.setItemPrice(BigDecimal.ONE);
lineDO.setPrice(BigDecimal.ONE);
lineDO.setPaidPrice(BigDecimal.ONE);
lineDO.setDiscountPrice(BigDecimal.ZERO);
lineDO.setTotalPrice(BigDecimal.ONE);
lineDO.setItemQty(BigDecimal.ONE);
lineDO.setLineNo("1");
lineDO.setTradeNo(orderDO.getTradeNo());
lineDO.setType(1);
orderDO.setLines(Lists.newArrayList(lineDO));
testOrderService.create(orderDO);
}
// 支付操作,更新數據
@Test
void updatePayStatus() {
testOrderService.updatePayStatus(testTradeNo, 3, BigDecimal.ONE, new Date());
}
// 發貨回調操作,更新數據
@Test
void updateDeliveryStatus() {
testOrderService.updateDeliveryStatus(testTradeNo, 5, new Date());
}
// 收貨回調操作,更新數據
@Test
void updateReceivingStatus() {
testOrderService.updateReceivingStatus(testTradeNo, 7, new Date());
}
// 取消訂單,更新數據
@Test
void cancel() {
testOrderService.cancel(testTradeNo);
}
// 物理刪除操作,刪除數據
@Test
void delete() {
testOrderService.delete(testTradeNo);
}
}
4.1 插入數據運行結果
數據庫表:

ES索引:

4.2 支付更新數據運行結果
數據庫表:

ES索引:

另外,發貨、收貨和邏輯刪除的更新操作一樣,這里沒列出結果。
4.3 物理刪除數據
數據庫表:

ES索引:

數據同步的一致。
5. 並發測試類ConcurrentTestOrderServiceImplTest
package com.mingo.es.sync.service.impl;
import com.mingo.es.sync.document.OrderEntity;
import com.mingo.es.sync.mybaties.dataobjact.OrderDO;
import com.mingo.es.sync.mybaties.dataobjact.OrderLineDO;
import com.mingo.es.sync.repository.EsOrderDocRepository;
import com.mingo.es.sync.service.TestOrderService;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
class ConcurrentTestOrderServiceImplTest {
@Autowired
private TestOrderService testOrderService;
@Autowired
protected EsOrderDocRepository repository;
private final AtomicInteger atomicInteger = new AtomicInteger(1);
private final String YYYYMMDDHHMMSS = "yyyyMMddHHmmssSSS";
private final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern(YYYYMMDDHHMMSS);
public String getTradeNo() {
return LocalDateTime.now().format(FORMATTER) + atomicInteger.getAndAdd(1);
}
// 12個線程
private final ExecutorService executorService =
new ThreadPoolExecutor(
12,
12,
0,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>()
);
/**
* 用於測試該時間后的數據刪除:2020-09-19 22:05:33
*/
private final Date start = new Date(1600524351243L);
@Test
void create() throws Exception {
for (int i = 500; i > 0; i--) {
// 啟動
executorService.execute(() -> {
OrderDO orderDO = new OrderDO();
orderDO.setTradeNo(this.getTradeNo());
orderDO.setAmount(BigDecimal.ONE);
orderDO.setBuyerId(9527L);
orderDO.setCreateTime(new Date());
orderDO.setDiscountAmount(BigDecimal.ZERO);
orderDO.setOriginAmount(BigDecimal.ONE);
orderDO.setType(1);
orderDO.setSellerId(18899L);
orderDO.setStatus(1);
OrderLineDO lineDO = new OrderLineDO();
lineDO.setCreateTime(orderDO.getCreateTime());
lineDO.setDiscountPrice(BigDecimal.ZERO);
lineDO.setItemCode("6352678819");
lineDO.setItemName("泡椒鳳爪");
lineDO.setUnitCode("DAI");
lineDO.setUnitName("袋");
lineDO.setItemPrice(BigDecimal.ONE);
lineDO.setPrice(BigDecimal.ONE);
lineDO.setPaidPrice(BigDecimal.ONE);
lineDO.setDiscountPrice(BigDecimal.ZERO);
lineDO.setTotalPrice(BigDecimal.ONE);
lineDO.setItemQty(BigDecimal.ONE);
lineDO.setLineNo("1");
lineDO.setTradeNo(orderDO.getTradeNo());
lineDO.setType(1);
orderDO.setLines(Lists.newArrayList(lineDO));
testOrderService.create(orderDO);
});
}
executorService.shutdown();
executorService.awaitTermination(20, TimeUnit.SECONDS);
}
@Test
void updatePayStatus() {
List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
entities.parallelStream().forEach(entity ->
testOrderService.updatePayStatus(entity.getTradeNo(), 3, BigDecimal.ONE, new Date())
);
}
@Test
void updateDeliveryStatus() {
List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
entities.parallelStream().forEach(entity ->
testOrderService.updateDeliveryStatus(entity.getTradeNo(), 5, new Date())
);
}
@Test
void updateReceivingStatus() {
List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
entities.parallelStream().forEach(entity ->
testOrderService.updateReceivingStatus(entity.getTradeNo(), 7, new Date())
);
}
@Test
void cancel() {
List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
entities.parallelStream().forEach(entity ->
testOrderService.cancel(entity.getTradeNo())
);
}
@Test
void delete() {
List<OrderEntity> entities = repository.findByCreateTimeGreaterThan(start);
entities.parallelStream().forEach(entity ->
testOrderService.delete(entity.getTradeNo())
);
}
}
spring-elasticsearch-data功能強大,可以通過方法命名簡化查詢。EsOrderDocRepository代碼
package com.mingo.es.sync.repository;
import com.mingo.es.sync.document.OrderEntity;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import java.util.Date;
import java.util.List;
/**
* EsOrderDocRepository
*
* @author Doflamingo
*/
public interface EsOrderDocRepository extends ElasticsearchRepository<OrderEntity, String> {
/**
* 根據tradeNo查詢單據信息
*
* @param tradeNo
* @return
*/
OrderEntity findByTradeNo(String tradeNo);
/**
* 查詢大於某一個時刻的單據
*
* @return
*/
List<OrderEntity> findByCreateTimeGreaterThan(Date createTime);
}
並發場景下插入很更新正常。