MongoDB數據源動態切換


項目需要根據業務來分庫,比如任務A相關數據入庫到MongoDB-A,任務B相關數據入庫到MongoDB-B;

網上搜索了下大概方案都是依賴AOP來實現,代碼也都千篇一律,BUG百出;於是修改了下,大概的思路如下:

  1. 切面放在了MongoTemplate
@Aspect
@Component
@Slf4j
public class MongodbSpecialSourceAspect {

    private final String POINT_CUT = "execution (* org.springframework.data.mongodb.core.MongoTemplate.*(..))";

    @Around(value = POINT_CUT)
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        Object o = joinPoint.getTarget();
        CustomMongoTemplate customMongoTemplate = null;
        // 判斷是否是自定義的MongoTemplate
        if (o.getClass() == CustomMongoTemplate.class) {
            customMongoTemplate = (CustomMongoTemplate) o;
            // TODO 判斷是否切換邏輯
            if(xxxx) {
                customMongoTemplate.switchMongoDbFactory(CustomMongoTemplate.EDU_DB_KEY);
            }
        }
        Object result = joinPoint.proceed();
        if (customMongoTemplate != null) {
            customMongoTemplate.restoreMaster();
        }
        return result;
    }
}
  1. 聲明MongoTemplate的子類,重寫getMongoDbFactory方法
@Slf4j
public class CustomMongoTemplate extends MongoTemplate {

    // 存放當前的數據源dbFactory
    private ThreadLocal<MongoDbFactory> mongoDbFactoryThreadLocal;

    // 存放初始化多數據源的dbFactory
    private final Map<Integer, MongoDbFactory> mongoDbFactoriesMap;

    public final static int MASTER_DB_KEY = 0;
    public final static int EDU_DB_KEY = 1;

    public CustomMongoTemplate(Map<Integer, MongoDbFactory> mongoDbFactoriesMap, MongoDbFactory mongoDbFactory){
        super(mongoDbFactory);
        if (!mongoDbFactoriesMap.containsKey(0)) {
            throw new BeanInitializationException("mongoDbFactoriesMap must contain an entry with key=0.");
        }
        this.mongoDbFactoriesMap = mongoDbFactoriesMap;
        if(mongoDbFactoryThreadLocal==null) {
            mongoDbFactoryThreadLocal = new ThreadLocal<>();
        }
    }

    public void switchMongoDbFactory(Integer mongoDbFactoryKey){
        if (!mongoDbFactoriesMap.containsKey(mongoDbFactoryKey)) {
            throw new IllegalArgumentException("The entry of the current key cannot be found in mongoDbFactoriesMap.");
        }
        mongoDbFactoryThreadLocal.set(mongoDbFactoriesMap.get(mongoDbFactoryKey));
    }

    public void restoreMaster() {
        mongoDbFactoryThreadLocal.set(mongoDbFactoriesMap.get(MASTER_DB_KEY));
    }

    ///////////////////////
    @Override
    public MongoDbFactory getMongoDbFactory() {
        if (mongoDbFactoryThreadLocal.get() == null) {
            restoreMaster();
        }
        return mongoDbFactoryThreadLocal.get();
    }

    @Override
    protected MongoDatabase doGetDatabase() {
        return MongoDatabaseUtils.getDatabase(getMongoDbFactory(), SessionSynchronization.ON_ACTUAL_TRANSACTION);
    }

    @Override
    public SessionScoped withSession(ClientSessionOptions options) {

        Assert.notNull(options, "ClientSessionOptions must not be null!");

        return withSession(() -> getMongoDbFactory().getSession(options));
    }

    ////////////////////

主要是重寫getMongoDbFactory方法來實時改變dbFactory,因為可以查閱源碼看到所有的dbFactory都是來源於此方法。當然也有例外,比如

doGetDatabasewithSession還有findDistinct方法,所以我們重寫了doGetDatabasewithSession方法,由直接使用私有變量mongoDbFactory改為調用getMongoDbFactory方法來獲取mongoDbFactory

findDistinct方法中使用了大量的私有變量,導致無法重寫此方法;如果有用到,可以重新實現下此方法;

  1. 聲明MongoTemplate的Bean實例
@Configuration
@ConfigurationProperties(prefix = "spring.data.mongodb.file")
public class FileMongoConfig {
    
    private String url;
    private String eduUrl;

    @Bean(name = "fileMongoTemplate")
    public MongoTemplate getMongoTemplate() {
        // 將需要切換的數據源都放到map中
        Map<Integer, MongoDbFactory> mongoDbFactoriesMap = new HashMap<>(1<<3);
        MongoDbFactory mongoDbFactory = this.fileMongoDbFactory();
        mongoDbFactoriesMap.put(CustomMongoTemplate.MASTER_DB_KEY, mongoDbFactory);
        mongoDbFactoriesMap.put(CustomMongoTemplate.EDU_DB_KEY, this.fileEduMongoDbFactory());
        // 這里返回我們自定義的MongoTemplate
        return new CustomMongoTemplate(mongoDbFactoriesMap, mongoDbFactory);
    }

    public MongoDbFactory fileMongoDbFactory() {
        return new SimpleMongoDbFactory(new MongoClientURI(getUrl()));
    }

    public MongoDbFactory fileEduMongoDbFactory() {
        return new SimpleMongoDbFactory(new MongoClientURI(getEduUrl()));
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }
    
    public String getEduUrl() {
        return eduUrl;
    }

    public void setEduUrl(String eduUrl) {
        this.eduUrl = eduUrl;
    }
}
  1. 以上三步后,我們准備工作就完成了;此時進行一個mongo查詢時,切面攔截,判斷是否是自定義的MongoTemplate,如果是則進行邏輯判斷是否切換數據源;

    在執行完mongo語句后,切換回主數據源;完成數據源的切換;

缺陷
  1. 此方案缺陷很明顯,aop拿不到方法參數,如果場景依賴的是方法的傳參來判斷是否切換數據源,那么用aop就無法完成;此方案只適合整體切換的場景;

轉換思路,如果我需要依賴collectionName來切換數據源或者部分入參來切換數據源,該怎么實現呢?

  • 首先aop肯定是拿不到參數的,就算使用threadlocal傳輸也很難實現
  • 那么我們直接重寫MongoTemplate的方法,在所有方法前都進行邏輯判斷是否切換,在方法執行后還原
  1. 刪除aspect類
  2. 修改下CustomMongoTemplate,我們仔細篩選MongoTemplate的所有參數有collectionName的方法,大部分底層調用都是一樣的。那么我們只需要重寫這些底層的方法就可以;最終如下:
@Slf4j
public class CustomMongoTemplate extends MongoTemplate {

    private ThreadLocal<MongoDbFactory> mongoDbFactoryThreadLocal;

    // 用於分區map, 0: master-db   1: edu-db
    private final Map<Integer, MongoDbFactory> mongoDbFactoriesMap;

    public final static int MASTER_DB_KEY = 0;
    public final static int EDU_DB_KEY = 1;

    public CustomMongoTemplate(Map<Integer, MongoDbFactory> mongoDbFactoriesMap, MongoDbFactory mongoDbFactory){
        super(mongoDbFactory);
        if (!mongoDbFactoriesMap.containsKey(0)) {
            throw new BeanInitializationException("mongoDbFactoriesMap must contain an entry with key=0.");
        }
        this.mongoDbFactoriesMap = mongoDbFactoriesMap;
        if(mongoDbFactoryThreadLocal==null) {
            mongoDbFactoryThreadLocal = new ThreadLocal<>();
        }
    }

    public CustomMongoTemplate(Map<Integer, MongoDbFactory> mongoDbFactoriesMap, MongoDbFactory mongoDbFactory, @Nullable MongoConverter mongoConverter){
        super(mongoDbFactory, mongoConverter);
        if (!mongoDbFactoriesMap.containsKey(0)) {
            throw new BeanInitializationException("mongoDbFactoriesMap must contain an entry with key=0.");
        }
        this.mongoDbFactoriesMap = mongoDbFactoriesMap;
        if(mongoDbFactoryThreadLocal==null) {
            mongoDbFactoryThreadLocal = new ThreadLocal<>();
        }
    }

    public void switchMongoDbFactory(Integer mongoDbFactoryKey){
        if (!mongoDbFactoriesMap.containsKey(mongoDbFactoryKey)) {
            throw new IllegalArgumentException("The entry of the current key cannot be found in mongoDbFactoriesMap.");
        }
        mongoDbFactoryThreadLocal.set(mongoDbFactoriesMap.get(mongoDbFactoryKey));
    }

    public void restoreMaster() {
        mongoDbFactoryThreadLocal.set(mongoDbFactoriesMap.get(MASTER_DB_KEY));
    }

    public void checkAndSwitchDbFactory(String collectionName) {
        // TODO 判斷業務相關邏輯,切換到相關的DbFactory
        this.switchMongoDbFactory(EDU_DB_KEY);
    }

    ///////////////////////
    @Override
    public MongoDbFactory getMongoDbFactory() {
        if (mongoDbFactoryThreadLocal.get() == null) {
            restoreMaster();
        }
        return mongoDbFactoryThreadLocal.get();
    }

    @Override
    protected MongoDatabase doGetDatabase() {
        return MongoDatabaseUtils.getDatabase(getMongoDbFactory(), SessionSynchronization.ON_ACTUAL_TRANSACTION);
    }

    @Override
    public SessionScoped withSession(ClientSessionOptions options) {

        Assert.notNull(options, "ClientSessionOptions must not be null!");

        return withSession(() -> getMongoDbFactory().getSession(options));
    }

    ////////////////////
    @Override
    public <S, T> List<T> doFind(String collectionName, Document query, Document fields, Class<S> sourceClass, Class<T> targetClass, CursorPreparer preparer) {
        checkAndSwitchDbFactory(collectionName);
        List<T> ts = super.doFind(collectionName, query, fields, sourceClass, targetClass, preparer);
        restoreMaster();
        return ts;
    }

    @Override
    protected void executeQuery(Query query, String collectionName, DocumentCallbackHandler documentCallbackHandler, CursorPreparer preparer) {
        checkAndSwitchDbFactory(collectionName);
        super.executeQuery(query, collectionName, documentCallbackHandler, preparer);
        restoreMaster();
    }

    @Override
    public <T> T execute(String collectionName, CollectionCallback<T> callback) {
        checkAndSwitchDbFactory(collectionName);
        T x = super.execute(collectionName, callback);
        restoreMaster();
        return x;
    }

    @Override
    public MongoCollection<Document> getCollection(String collectionName) {
        checkAndSwitchDbFactory(collectionName);
        MongoCollection<Document> x = super.getCollection(collectionName);
        restoreMaster();
        return x;
    }

    @Override
    public boolean collectionExists(String collectionName) {
        checkAndSwitchDbFactory(collectionName);
        boolean x = super.collectionExists(collectionName);
        restoreMaster();
        return x;
    }

    @Override
    public <T> List<T> findDistinct(Query query, String field, String collectionName, Class<?> entityClass, Class<T> resultClass) {
        checkAndSwitchDbFactory(collectionName);
        List<T> x = super.findDistinct(query, field, collectionName, entityClass, resultClass);
        restoreMaster();
        return x;
    }

    @Override
    public <T> GeoResults<T> geoNear(NearQuery near, Class<?> domainType, String collectionName, Class<T> returnType) {
        checkAndSwitchDbFactory(collectionName);
        GeoResults<T> x = super.geoNear(near, domainType, collectionName, returnType);
        restoreMaster();
        return x;
    }

    @Override
    protected <T> T doInsert(String collectionName, T objectToSave, MongoWriter<T> writer) {
        checkAndSwitchDbFactory(collectionName);
        T x = super.doInsert(collectionName, objectToSave, writer);
        restoreMaster();
        return x;
    }

    @Override
    protected <T> Collection<T> doInsertBatch(String collectionName, Collection<? extends T> batchToSave, MongoWriter<T> writer) {
        checkAndSwitchDbFactory(collectionName);
        Collection<T> x = super.doInsertBatch(collectionName, batchToSave, writer);
        restoreMaster();
        return x;
    }

    @Override
    protected <T> T doSave(String collectionName, T objectToSave, MongoWriter<T> writer) {
        checkAndSwitchDbFactory(collectionName);
        T x = super.doSave(collectionName, objectToSave, writer);
        restoreMaster();
        return x;
    }

    @Override
    public <T> List<T> findAll(Class<T> entityClass, String collectionName) {
        checkAndSwitchDbFactory(collectionName);
        List<T> x = super.findAll(entityClass, collectionName);
        restoreMaster();
        return x;
    }

    @Override
    public <T> MapReduceResults<T> mapReduce(Query query, String inputCollectionName, String mapFunction, String reduceFunction, MapReduceOptions mapReduceOptions, Class<T> entityClass) {
        checkAndSwitchDbFactory(inputCollectionName);
        MapReduceResults<T> x = super.mapReduce(query, inputCollectionName, mapFunction, reduceFunction, mapReduceOptions, entityClass);
        restoreMaster();
        return x;
    }

    @Override
    public <T> List<T> mapReduce(Query query, Class<?> domainType, String inputCollectionName, String mapFunction, String reduceFunction, MapReduceOptions mapReduceOptions, Class<T> resultType) {
        checkAndSwitchDbFactory(inputCollectionName);
        List<T> x = super.mapReduce(query, domainType, inputCollectionName, mapFunction, reduceFunction, mapReduceOptions, resultType);
        restoreMaster();
        return x;
    }

    @Override
    public <T> GroupByResults<T> group(Criteria criteria, String inputCollectionName, GroupBy groupBy, Class<T> entityClass) {
        checkAndSwitchDbFactory(inputCollectionName);
        GroupByResults<T> x = super.group(criteria, inputCollectionName, groupBy, entityClass);
        restoreMaster();
        return x;
    }

    @Override
    protected <O> AggregationResults<O> doAggregate(Aggregation aggregation, String collectionName, Class<O> outputType, AggregationOperationContext context) {
        checkAndSwitchDbFactory(collectionName);
        AggregationResults<O> x = super.doAggregate(aggregation, collectionName, outputType, context);
        restoreMaster();
        return x;
    }

    @Override
    protected MongoCollection<Document> doCreateCollection(String collectionName, Document collectionOptions) {
        checkAndSwitchDbFactory(collectionName);
        MongoCollection<Document> x = super.doCreateCollection(collectionName, collectionOptions);
        restoreMaster();
        return x;
    }

    @Override
    protected <T> T doFindOne(String collectionName, Document query, Document fields, Class<T> entityClass) {
        checkAndSwitchDbFactory(collectionName);
        T x = super.doFindOne(collectionName, query, fields, entityClass);
        restoreMaster();
        return x;
    }

    @Override
    protected <S, T> List<T> doFind(String collectionName, Document query, Document fields, Class<S> entityClass, CursorPreparer preparer, DocumentCallback<T> objectCallback) {
        checkAndSwitchDbFactory(collectionName);
        List<T> x = super.doFind(collectionName, query, fields, entityClass, preparer, objectCallback);
        restoreMaster();
        return x;
    }

    @Override
    protected <T> T doFindAndRemove(String collectionName, Document query, Document fields, Document sort, Collation collation, Class<T> entityClass) {
        checkAndSwitchDbFactory(collectionName);
        T x = super.doFindAndRemove(collectionName, query, fields, sort, collation, entityClass);
        restoreMaster();
        return x;
    }

    @Override
    protected <T> T doFindAndModify(String collectionName, Document query, Document fields, Document sort, Class<T> entityClass, Update update, FindAndModifyOptions options) {
        checkAndSwitchDbFactory(collectionName);
        T x = super.doFindAndModify(collectionName, query, fields, sort, entityClass, update, options);
        restoreMaster();
        return x;
    }

    @Override
    protected <T> T doFindAndReplace(String collectionName, Document mappedQuery, Document mappedFields, Document mappedSort, com.mongodb.client.model.Collation collation, Class<?> entityType, Document replacement, FindAndReplaceOptions options, Class<T> resultType) {
        checkAndSwitchDbFactory(collectionName);
        T x = super.doFindAndReplace(collectionName, mappedQuery, mappedFields, mappedSort, collation, entityType, replacement, options, resultType);
        restoreMaster();
        return x;
    }
}

收工;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM