spring AOP 實現事務和主從讀寫分離


1 切面 是個類

2 切入點

3 連接點

4 通知 是個方法

5 配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">
    
    <!-- 引入屬性文件 -->
    <context:property-placeholder location="classpath:*.properties" />
    
    <!-- 組件自動掃描 -->
<context:component-scan base-package="com.hengxin.qianee">
  <context:exclude-filter type="regex" expression="com.hengxin.qianee.wechat.controller"/>
    </context:component-scan>
    
    <bean id="masterDataSource" class="com.alibaba.druid.pool.DruidDataSource"
        init-method="init" destroy-method="close">
        <property name="url" value="${jdbc_url_master}" />
        <property name="username" value="${jdbc_username_master}" />
        <property name="password" value="${jdbc_password_master}" />
        <!-- 初始化連接大小 -->
        <property name="initialSize" value="0" />
        <!-- 連接池最大使用連接數量 -->
        <property name="maxActive" value="20" />
        <!-- 連接池最大空閑 -->
        <!-- <property name="maxIdle" value="20" /> -->
        <!-- 連接池最小空閑 -->
        <property name="minIdle" value="0" />
        <!-- 獲取連接最大等待時間 -->
        <property name="maxWait" value="60000" />
        <property name="validationQuery" value="${validationQuery}" />
        <property name="testOnBorrow" value="false" />
        <property name="testOnReturn" value="false" />
        <property name="testWhileIdle" value="true" />
        <!-- 配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接,單位是毫秒 -->
        <property name="timeBetweenEvictionRunsMillis" value="60000" />
        <!-- 配置一個連接在池中最小生存的時間,單位是毫秒 -->
        <property name="minEvictableIdleTimeMillis" value="25200000" />
        <!-- 打開removeAbandoned功能 -->
        <property name="removeAbandoned" value="true" />
        <!-- 1800秒,也就是30分鍾 -->
        <property name="removeAbandonedTimeout" value="1800" />
        <!-- 關閉abanded連接時輸出錯誤日志 -->
        <property name="logAbandoned" value="true" />
        <!-- 監控數據庫 -->
        <!-- <property name="filters" value="stat" /> -->
        <property name="filters" value="mergeStat" />
    </bean>
    
    <bean id="slaveDataSource" class="com.alibaba.druid.pool.DruidDataSource"
        init-method="init" destroy-method="close">
        <property name="url" value="${jdbc_url_slave}" />
        <property name="username" value="${jdbc_username_slave}" />
        <property name="password" value="${jdbc_password_slave}" />
        <!-- 初始化連接大小 -->
        <property name="initialSize" value="0" />
        <!-- 連接池最大使用連接數量 -->
        <property name="maxActive" value="20" />
        <!-- 連接池最大空閑 -->
        <!-- <property name="maxIdle" value="20" /> -->
        <!-- 連接池最小空閑 -->
        <property name="minIdle" value="0" />
        <!-- 獲取連接最大等待時間 -->
        <property name="maxWait" value="60000" />
        <property name="validationQuery" value="${validationQuery}" />
        <property name="testOnBorrow" value="false" />
        <property name="testOnReturn" value="false" />
        <property name="testWhileIdle" value="true" />
        <!-- 配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接,單位是毫秒 -->
        <property name="timeBetweenEvictionRunsMillis" value="60000" />
        <!-- 配置一個連接在池中最小生存的時間,單位是毫秒 -->
        <property name="minEvictableIdleTimeMillis" value="25200000" />
        <!-- 打開removeAbandoned功能 -->
        <property name="removeAbandoned" value="true" />
        <!-- 1800秒,也就是30分鍾 -->
        <property name="removeAbandonedTimeout" value="1800" />
        <!-- 關閉abanded連接時輸出錯誤日志 -->
        <property name="logAbandoned" value="true" />
        <!-- 監控數據庫 -->
        <!-- <property name="filters" value="stat" /> -->
        <property name="filters" value="mergeStat" />
    </bean>
    
        <bean id="readWriteDataSource" class="com.hengxin.qianee.util.ReadWriteDataSource">
        <property name="writeDataSource" ref="masterDataSource"/>
        <property name="readDataSourceMap">
           <map>
              <entry key="readDataSource1" value-ref="slaveDataSource"/>
              <entry key="readDataSource2" value-ref="slaveDataSource"/>
              <entry key="readDataSource3" value-ref="slaveDataSource"/>
              <entry key="readDataSource4" value-ref="slaveDataSource"/>
           </map>
        </property>
    </bean>

    <bean id="readWriteDataSourceTransactionProcessor" class="com.hengxin.qianee.util.ReadWriteDataSourceProcessor">
       <property name="forceChoiceReadWhenWrite" value="false"/>
    </bean>
    <!-- myBatis文件 -->
    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="readWriteDataSource" />
        <!-- 自動掃描entity目錄, 省掉Configuration.xml里的手工配置 -->
        <property name="mapperLocations" value="classpath:com/hengxin/qianee/mapper/xml/*.xml" />
    </bean>
    
    <!-- myBatis掃描文件 -->
    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="basePackage" value="com.hengxin.qianee.mapper" />
        <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
    </bean>

    <!-- 配置事務管理器 -->
    <bean id="transactionManager"
        class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="readWriteDataSource" />
    </bean>

    <!-- 攔截器方式配置事物 -->
    <tx:advice id="transactionAdvice" transaction-manager="transactionManager">
        <tx:attributes>
            <tx:method name="add*" propagation="REQUIRED" />
            <tx:method name="append*" propagation="REQUIRED" />
            <tx:method name="insert*" propagation="REQUIRED" />
            <tx:method name="save*" propagation="REQUIRED" />
            <tx:method name="update*" propagation="REQUIRED" />
            <tx:method name="modify*" propagation="REQUIRED" />
            <tx:method name="edit*" propagation="REQUIRED" />
            <tx:method name="delete*" propagation="REQUIRED" />
            <tx:method name="remove*" propagation="REQUIRED" />
            <tx:method name="repair" propagation="REQUIRED" />
            <tx:method name="delAndRepair" propagation="REQUIRED" />
            <tx:method name="load*" propagation="REQUIRED" />    
            <tx:method name="do*" propagation="REQUIRED" />
            <tx:method name="send*" propagation="REQUIRED" />          
            <tx:method name="put*" read-only="true"/>
            <tx:method name="query*" read-only="true"/>
            <tx:method name="use*" read-only="true"/>
            <tx:method name="get*" read-only="true" />
            <tx:method name="count*" read-only="true" />
            <tx:method name="find*" read-only="true" />
            <tx:method name="list*" read-only="true" />
            <tx:method name="select*" read-only="true" />
            <tx:method name="is*" read-only="true" />
            <tx:method name="*" propagation="REQUIRED" />
        </tx:attributes>
    </tx:advice>
    <aop:config>
            <!-- 切點 -->
        <aop:pointcut id="transactionPointcut"
            expression="(execution(* com.hengxin.qianee.service.impl.*.*(..))) 
                        or (execution(* com.hengxin.qianee.wechat.service.impl.*.*(..)))" />
        <!-- 建議 -->
        <aop:advisor pointcut-ref="transactionPointcut"
            advice-ref="transactionAdvice" />
            <!-- 切面 -->
            <aop:aspect order="-2147483648" ref="readWriteDataSourceTransactionProcessor">
          <!-- 環繞通知 -->
           <aop:around pointcut-ref="transactionPointcut" method="determineReadOrWriteDB"/>
        </aop:aspect>
    </aop:config>
</beans>                                

6 serviceimpl 層 每個service方法是個切點 dao方法不是

package com.hengxin.qianee.service.impl;


/**
 * 前台首頁服務
 * @author user
 *
 */
@Service
public class FrontMainServiceImpl implements FrontMainService {

    @Autowired
    private ContentAdvertisementsDao ContentAdvertisementsDao;//大廣告Dao
    
    
    /**
     *  Ajax驗證用戶名是否已存在
     */
    @Override
    public ErrorInfo hasNameExist(String name, ErrorInfo errorInfo) {
            
        // 判斷否空
        if(StringUtils.isBlank(name)){
            errorInfo.code = -1;
            errorInfo.msg = "用戶名不能為空";
            
            return errorInfo;
        }
        
        // 判斷用戶名是否可用
        int rows = UserService.isNameExist(name);
        if(rows>0){
            errorInfo.code = -1;
            errorInfo.msg = "該用戶名已存在";
        }else{
            errorInfo.code = 1;
        }
        
        return errorInfo;
    }

    /**
     * 推薦人是否存在
     */
    @Override
    public ErrorInfo isRecommendExist(String recommend, ErrorInfo errorInfo) {
        
        String recoName = "";
        
        if(!StringUtils.isNotBlank(recommend)){
            recoName = "";
            errorInfo.code = 5;//當推薦人為空時 返回 "5"
            return errorInfo;
        }else{
            //推薦人不為空時,判斷邀請碼有無此人
            recoName = Encrypt.decrypt3DES(recommend, Constants.ENCRYPTION_KEY);
            
            //判斷用戶名是否可用
            int rows = UserService.isNameExist(recoName);
            if(rows<=0){
                errorInfo.code = -1;
                errorInfo.msg = "該推薦人不存在,請選填";
            }else{
                errorInfo.code = 4;
                errorInfo.msg = "該推薦人存在";
                return errorInfo;
            }
            
        }
        
        return errorInfo;
    }

    /**
     * 注冊頁面發送短信
     */
    @Override
    public ErrorInfo verifyMobileRe(String mobile,ErrorInfo errorInfo) {
        
        // 校驗非空
        if (StringUtils.isBlank(mobile)) {
            errorInfo.code = -1;
            errorInfo.msg = "請輸入手機號碼";
            
            return errorInfo;
        }
        
        // 校驗格式
        if (!RegexUtils.isMobileNum(mobile)) {
            errorInfo.code = -1;
            errorInfo.msg = "請輸入正確的手機號碼";
            
            return errorInfo;
        }
        
        Users user = new Users();
        boolean flag = false;
        
        if (user == null || StringUtils.isBlank(user.getMobile()) || !user.getMobile().equals(mobile)) {
            flag = UserService.isMobileExistFlag(mobile);
        }
        
        if(!flag){
            //發短信
            smsService.sendCode(mobile, errorInfo);
        }else{
            errorInfo.code = -1;
            errorInfo.msg = "該手機號碼已存在";
        }
        
        return errorInfo;
    }

    /**
     * 前台注冊用戶
     */
    @Override
    public ErrorInfo addregisterUser(Users user,String path,String contextPath,ErrorInfo errorInfo,String recoName) {
        errorInfo.clear();
        
        BackstageSet backstageSet = (BackstageSet) cache.getObject("backstageSet");
        user.setCreditLine(backstageSet.getInitialAmount());
        user.setLastCreditLine(backstageSet.getInitialAmount());
        
        // 獲取注冊關鍵否定詞(如:xijinping)
        String keyWord = backstageSet.getKeywords();
        
        if(StringUtils.isNotBlank(keyWord)){
            String [] keywords = keyWord.split(",");
            
            for(String word : keywords) {
                if(user.getName().contains(word)) {
                    errorInfo.code = -1;
                    errorInfo.msg = "對不起,注冊的用戶名包含敏感詞匯,請重新輸入用戶名";
                    
                    return errorInfo;
                }
            }
        }
        
        if(!recoName.equals("")){
            // 根據用戶在前台的推薦碼解密成推薦人用戶名查Id
            long recommendedId = userDao.queryIdByUserName(recoName);
            if(recommendedId>0){
                user.setRecommendUserId(recommendedId);
                user.setRecommendRewardType(backstageSet.getCpsRewardType());
                user.setRecommendTime(new Date());
            }else{
                user.setRecommendUserId(0L);
                user.setRecommendRewardType(-1);
                user.setRecommendTime(null);
            }
        }else{
            // 沒有推薦人,推薦人id為0(非空)
            user.setRecommendUserId(0L);
        }
        
        String uuid = UUID.randomUUID().toString();
        
        try {
            Qrcode.create(contextPath + "/loginAndRegister/register?un=" + Encrypt.encrypt3DES(user.getName(), Constants.ENCRYPTION_KEY), 
                    BarcodeFormat.QR_CODE, 
                    100, 100, 
                    new File(path,uuid+".png").getAbsolutePath(), "png");
            // 讀取本地文件
            String fileName = uuid.split("\\.")[0]+".png";
            File file = new File(path, fileName);
            
            // 用戶總數
            int userCount = UserService.selectUserCount();
            // 准備上傳至服務器
            Map<String, Object> map = fileUploadService.registeredUploadFiles(file, Constants.FileFormat.IMG, userCount, errorInfo );
            // 取完之后刪除文件
            file.delete();
            System.out.println(((String) map.get("fileName")).split("%")[1]);
            // 截取時間后面一節
            user.setQrCode(((String) map.get("fileName")).split("%")[1].split("\\.")[0]);
            // 是否禁止登錄(false:可以登錄)
            user.setIsAllowLogin(false);
        } catch (WriterException e) {
            e.printStackTrace();
            System.err.println("生成二維碼圖像失敗!");
        } catch (IOException e) {
            e.printStackTrace();
            System.err.println("生成二維碼圖像失敗!");
        }
        
        // 注冊成功添加用戶 每個service方法是個切點 dao方法不是
        int rows = userDao.insertUser(user);
        
        if(rows<=0){
            errorInfo.code = -1;
            errorInfo.msg = "此次注冊失敗!";
            return errorInfo;
        }
        
        MD5 md5 = new MD5();
        String sign1 = md5.getMD5ofStr(""+user.getId()+0.00+0.00+Constants.ENCRYPTION_KEY);
        String sign2 = md5.getMD5ofStr(""+user.getId()+0.00+0.00+0.00+0.00+Constants.ENCRYPTION_KEY);
        
        int updateSign = userDao.updateSign(sign1, sign2, user.getId());
        
        if(updateSign<=0){
            errorInfo.code = -1;
            errorInfo.msg = "此次注冊失敗!";
            return errorInfo;
        }
        
        userEventsDao.inserUserEvent(user.getId(), UserEvent.REGISTER, "注冊成功", errorInfo);
        
        if(errorInfo.code < 0){
            return errorInfo;
        }
        
        // 發送注冊站內信
        addSendLetter(user, Constants.M_REGISTER,errorInfo);
        
        //創建審計項目
        statisticUserAuditItemsDao.createAuditItem(user.getId());
        
        errorInfo.code = 0;
        errorInfo.msg = "恭喜你,注冊成功!";
        
        return errorInfo;
    }
    
    /**
     * 發送站內信
     */
    @Override
    public ErrorInfo addSendLetter(Users user,long id,ErrorInfo error) {
        
        // 獲取發送內容和標題
        MessageStationTemplates mst = messageStationTemplatesDao.fandMessageStationTemplates(id);
        
        // 開啟狀態(默認 true:開啟 false:關閉)
        if(mst.getStatus()){
            // 添加消息的任務(定時發送)
            int rows = messageSendingDao.addMessageTask(user.getId(), mst.getTitle(), mst.getContent());
            if(rows<=0){
                error.code = -1;
                error.msg = "添加失敗";
            }
        }
        return error;
    }

    /**
     * 忘記密碼頁面發送短信
     */
    @Override
    public ErrorInfo sendMobileMessage(String mobile, ErrorInfo errorInfo) {
        
        // 校驗非空
        if (StringUtils.isBlank(mobile)) {
            errorInfo.code = -1;
            errorInfo.msg = "請輸入手機號碼";
            
            return errorInfo;
        }
        
        // 校驗格式
        if (!RegexUtils.isMobileNum(mobile)) {
            errorInfo.code = -1;
            errorInfo.msg = "請輸入正確的手機號碼";
            
            return errorInfo;
        }
        
        //發短信
        smsService.sendCode(mobile, errorInfo);
        
        return errorInfo;
    }


    /**
     * 查詢用戶注冊協議
     */
    @Override
    public String queryContent(long id) {
        return ContentNewsDao.queryContent(id);
    }

}

7 切面

package com.hengxin.qianee.util;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.NestedRuntimeException;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource;
import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.ReflectionUtils;

/**
 * 
 * 
 * <pre>
 * 
 * 此類實現了兩個職責(為了減少類的數量將兩個功能合並到一起了):
 *   讀/寫動態數據庫選擇處理器
 *   通過AOP切面實現讀/寫選擇
 *   
 *   
 * ★★讀/寫動態數據庫選擇處理器★★
 * 1、首先讀取<tx:advice>事務屬性配置
 * 
 * 2、對於所有讀方法設置 read-only="true" 表示讀取操作(以此來判斷是選擇讀還是寫庫),其他操作都是走寫庫
 *    如<tx:method name="×××" read-only="true"/>
 *    
 * 3、 forceChoiceReadOnWrite用於確定在如果目前是寫(即開啟了事務),下一步如果是讀,
 *    是直接參與到寫庫進行讀,還是強制從讀庫讀<br/>
 *      forceChoiceReadOnWrite:false 表示目前是寫,下一步如果是讀,強制參與到寫事務(即從寫庫讀)
 *                                  這樣可以避免寫的時候從讀庫讀不到數據
 *                                  
 *                                  通過設置事務傳播行為:SUPPORTS實現
 *                                  
 *      forceChoiceReadOnWrite:true 表示不管當前事務是寫/讀,都強制從讀庫獲取數據
 *                                  通過設置事務傳播行為:NOT_SUPPORTS實現(連接是盡快釋放)                
 *                                  『此處借助了 NOT_SUPPORTS會掛起之前的事務進行操作 然后再恢復之前事務完成的』
 * 4、配置方式
 *  <bean id="readWriteDataSourceTransactionProcessor" class="cn.javass.common.datasource.ReadWriteDataSourceProcessor">
 *      <property name="forceChoiceReadWhenWrite" value="false"/>
 *  </bean>
 *
 * 5、目前只適用於<tx:advice>情況 TODO 支持@Transactional注解事務
 *  
 *  
 *  
 * ★★通過AOP切面實現讀/寫庫選擇★★
 * 
 * 1、首先將當前方法 與 根據之前【讀/寫動態數據庫選擇處理器】  提取的讀庫方法 進行匹配
 * 
 * 2、如果匹配,說明是讀取數據:
 *  2.1、如果forceChoiceReadOnWrite:true,即強制走讀庫
 *  2.2、如果之前是寫操作且forceChoiceReadOnWrite:false,將從寫庫進行讀取
 *  2.3、否則,到讀庫進行讀取數據
 * 
 * 3、如果不匹配,說明默認將使用寫庫進行操作
 * 
 * 4、配置方式
 *      <aop:aspect order="-2147483648" ref="readWriteDataSourceTransactionProcessor">
 *          <aop:around pointcut-ref="txPointcut" method="determineReadOrWriteDB"/>
 *      </aop:aspect>
 *  4.1、此處order = Integer.MIN_VALUE 即最高的優先級(請參考http://jinnianshilongnian.iteye.com/blog/1423489)
 *  4.2、切入點:txPointcut 和 實施事務的切入點一樣
 *  4.3、determineReadOrWriteDB方法用於決策是走讀/寫庫的,請參考
 *       @see cn.javass.common.datasource.ReadWriteDataSourceDecision
 *       @see cn.javass.common.datasource.ReadWriteDataSource
 * 
 * </pre>
 * @author Zhang Kaitao
 *
 */
public class ReadWriteDataSourceProcessor implements BeanPostProcessor {
//    private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSourceProcessor.class);
    
    private boolean forceChoiceReadWhenWrite = false;
    
    private Map<String, Boolean> readMethodMap = new HashMap<String, Boolean>();

    /**
     * 當之前操作是寫的時候,是否強制從從庫讀
     * 默認(false) 當之前操作是寫,默認強制從寫庫讀
     * @param forceReadOnWrite
     */
    
    public void setForceChoiceReadWhenWrite(boolean forceChoiceReadWhenWrite) {
        
        this.forceChoiceReadWhenWrite = forceChoiceReadWhenWrite;
    }
    

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        if(!(bean instanceof NameMatchTransactionAttributeSource)) {
            return bean;
        }
        
        try {
            NameMatchTransactionAttributeSource transactionAttributeSource = (NameMatchTransactionAttributeSource)bean;
            Field nameMapField = ReflectionUtils.findField(NameMatchTransactionAttributeSource.class, "nameMap");
            nameMapField.setAccessible(true);
            Map<String, TransactionAttribute> nameMap = (Map<String, TransactionAttribute>) nameMapField.get(transactionAttributeSource);
            
            for(Entry<String, TransactionAttribute> entry : nameMap.entrySet()) {
                RuleBasedTransactionAttribute attr = (RuleBasedTransactionAttribute)entry.getValue();

                //僅對read-only的處理
                if(!attr.isReadOnly()) {
                    continue;
                }
                
                String methodName = entry.getKey();
                Boolean isForceChoiceRead = Boolean.FALSE;
                if(forceChoiceReadWhenWrite) {
                    //不管之前操作是寫,默認強制從讀庫讀 (設置為NOT_SUPPORTED即可)
                    //NOT_SUPPORTED會掛起之前的事務
                    attr.setPropagationBehavior(Propagation.NOT_SUPPORTED.value());
                    isForceChoiceRead = Boolean.TRUE;
                } else {
                    //否則 設置為SUPPORTS(這樣可以參與到寫事務)
                    attr.setPropagationBehavior(Propagation.SUPPORTS.value());
                }
                System.out.println("read/write transaction process  method:{} force read:{}"+" "+ methodName+" "+ isForceChoiceRead);
                readMethodMap.put(methodName, isForceChoiceRead);
            }
            
        } catch (Exception e) {
            throw new ReadWriteDataSourceTransactionException("process read/write transaction error", e);
        }
        
        return bean;
    }
    
    
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    private class ReadWriteDataSourceTransactionException extends NestedRuntimeException {
        public ReadWriteDataSourceTransactionException(String message, Throwable cause) {
            super(message, cause);
        }
    }
    
//
ProceedingJoinPoint 連接點

public Object determineReadOrWriteDB(ProceedingJoinPoint pjp) throws Throwable { if (isChoiceReadDB(pjp.getSignature().getName())) { ReadWriteDataSourceDecision.markRead(); System.out.println("方法:" + pjp.getSignature().getName() +"進入讀庫!"); } else { ReadWriteDataSourceDecision.markWrite(); System.out.println("方法:" + pjp.getSignature().getName() +"進入寫庫!"); } try { return pjp.proceed(); } finally { System.out.println(pjp.getSignature().getName()+" "+"reset方法"); ReadWriteDataSourceDecision.reset(); } } private boolean isChoiceReadDB(String methodName) { String bestNameMatch = null; for (String mappedName : this.readMethodMap.keySet()) { if (isMatch(methodName, mappedName)) { bestNameMatch = mappedName; break; } } Boolean isForceChoiceRead = readMethodMap.get(bestNameMatch); //表示強制選擇 讀 庫 if(isForceChoiceRead == Boolean.TRUE) { System.out.println("表示強制選擇 讀 庫"); return true; } //如果之前選擇了寫庫 現在還選擇 寫庫 if(ReadWriteDataSourceDecision.isChoiceWrite()) { System.out.println("如果之前選擇了寫庫 現在還選擇 寫庫"); return false; } //表示應該選擇讀庫 if(isForceChoiceRead != null) { System.out.println("表示應該選擇讀庫"); return true; } //默認選擇 寫庫 return false; } protected boolean isMatch(String methodName, String mappedName) { return PatternMatchUtils.simpleMatch(mappedName, methodName); } }

 

package com.hengxin.qianee.util;


/**
 * <pre>
 * 讀/寫動態數據庫 決策者
 * 根據DataSourceType是write/read 來決定是使用讀/寫數據庫
 * 通過ThreadLocal綁定實現選擇功能
 * </pre>
 * @author Zhang Kaitao
 *
 */
public class ReadWriteDataSourceDecision {
    
    public enum DataSourceType {
        write, read;
    }
    
    
    private static final ThreadLocal<DataSourceType> holder = new ThreadLocal<DataSourceType>();

    public static void markWrite() {
        holder.set(DataSourceType.write);
    }
    
    public static void markRead() {
        holder.set(DataSourceType.read);
    }
    
    public static void reset() {
        holder.set(null);
    }
    
    public static boolean isChoiceNone() {
        return null == holder.get(); 
    }
    
    public static boolean isChoiceWrite() {
        return DataSourceType.write == holder.get();
    }
    
    public static boolean isChoiceRead() {
        return DataSourceType.read == holder.get();
    }

}
package com.hengxin.qianee.util;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;

import javax.sql.DataSource;

//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.datasource.AbstractDataSource;
import org.springframework.util.CollectionUtils;

/**
 * 
 * <pre>
 * 讀/寫動態選擇數據庫實現
 * 目前實現功能
 *   一寫庫多讀庫選擇功能,請參考
 *      @see cn.javass.common.datasource.ReadWriteDataSourceDecision
        @see cn.javass.common.datasource.ReadWriteDataSourceDecision.DataSourceType
 *   
 *   默認按順序輪詢使用讀庫
 *   默認選擇寫庫
 *   
 *   已實現:一寫多讀、當寫時默認讀操作到寫庫、當寫時強制讀操作到讀庫
 *   TODO 讀庫負載均衡、讀庫故障轉移
 * </pre>  
 * @author Zhang Kaitao 
 *
 */
public class ReadWriteDataSource extends AbstractDataSource implements InitializingBean {
//    private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSource.class);
    
    private DataSource writeDataSource;
    private Map<String, DataSource> readDataSourceMap;
    
    
    private String[] readDataSourceNames;
    private DataSource[] readDataSources;
    private int readDataSourceCount;

    private AtomicInteger counter = new AtomicInteger(1);

    
    /**
     * 設置讀庫(name, DataSource)
     * @param readDataSourceMap
     */
    public void setReadDataSourceMap(Map<String, DataSource> readDataSourceMap) {
        this.readDataSourceMap = readDataSourceMap;
    }
    public void setWriteDataSource(DataSource writeDataSource) {
        this.writeDataSource = writeDataSource;
    }
    
    
    @Override
    public void afterPropertiesSet() throws Exception {
        if(writeDataSource == null) {
            throw new IllegalArgumentException("property 'writeDataSource' is required");
        }
        if(CollectionUtils.isEmpty(readDataSourceMap)) {
            throw new IllegalArgumentException("property 'readDataSourceMap' is required");
        }
        readDataSourceCount = readDataSourceMap.size();
        
        readDataSources = new DataSource[readDataSourceCount];
        readDataSourceNames = new String[readDataSourceCount];
        
        int i = 0;
        for(Entry<String, DataSource> e : readDataSourceMap.entrySet()) {
            readDataSources[i] = e.getValue();
            readDataSourceNames[i] = e.getKey();
            i++;
        }
        
        
    }
    
    
    private DataSource determineDataSource() {
        if(ReadWriteDataSourceDecision.isChoiceWrite()) {
            System.out.println("current determine write datasource");
            return writeDataSource;
        }
        
        if(ReadWriteDataSourceDecision.isChoiceNone()) {
            System.out.println("no choice read/write, default determine write datasource");
            return writeDataSource;
        } 
        return determineReadDataSource();
    }
    
    private DataSource determineReadDataSource() {
        //按照順序選擇讀庫 
        //TODO 算法改進 
        int index = counter.incrementAndGet() % readDataSourceCount;
        if(index < 0) {
            index = - index;
        }
            
        String dataSourceName = readDataSourceNames[index];
        
        System.out.println("current determine read datasource : {}"+"  "+dataSourceName);

        return readDataSources[index];
    }
    
    @Override
    public Connection getConnection() throws SQLException {
        return determineDataSource().getConnection();
    }
    
    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return determineDataSource().getConnection(username, password);
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM