spring在加Transactional的方法中使用redis取值為空的問題


https://blog.csdn.net/qq_34021712/article/details/79606551

http://www.kaysonlv.com/%E8%AF%B4%E8%AF%B4redis-data-redis%E4%BA%8B%E5%8A%A1%E7%9A%84%E4%BD%BF%E7%94%A8.html

https://my.oschina.net/u/1168037/blog/967685

事務的過程分為三個步驟:

一、切換事務模式

二、事務入隊列

三、執行事務。

redis客戶端執行multi后返回ok,表明redis進入事務狀態。進入事務狀態以后redis並不會立即執行命令,會將redis客戶端發送的命令存入隊列,暫不執行,此時返回queued。最后調用exec,將命令從隊列中取出來,然后一次性執行,這些,命令同時成功同時失敗,最后將命令執行結果一次性返回,並且將事務狀態標志復位。在執行這些命令的過程中,使用同一客戶端,並且不會被其它客戶端中斷。

 在Spring Data Redis提供了RedisTemplate對redis進行讀寫操作並且支持事務。先配置開啟事務

redisTemplate.setEnableTransactionSupport(true);

其它相關配置,參考文檔:https://docs.spring.io/spring-data/redis/docs/2.1.8.RELEASE/reference/html/#tx.spring

然后在要使用事務的方法上面添加@Transactional,這樣就使用了redis的事務了

 

如果在同一線程(比如Web環境的一次請求中)中存在下面操作將會造成讀操作無法直接讀取出數據

  1. 先在非事務環境下執行reids操作(調用沒有加@Transactional注解)
  2. 然后在事務環境下執行redis操作(調用添加了@Transactional注解的方法)

可以從RedisTemplate源碼中找到原因。

        RedisTemplate中對Redis的各種數據類型的操作都抽象出了相對於的操作類 如 ValueOperations,ListOperations,SetOperations等,而這些類在執行操作時最終還是會調用RedisTemplate#execute

    public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline)

這個方法是RedisTemplate的操作Reids的核心方法

    public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {

        Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
        Assert.notNull(action, "Callback object must not be null");

        RedisConnectionFactory factory = getRequiredConnectionFactory();
        RedisConnection conn = null;
        try {
            //這里判斷redis的EnableTransactionSupport是否為true,如果為true將連接綁定到當前線程    
            if (enableTransactionSupport) {
                //如果設置了啟用事務,則調用bindConnection
                // only bind resources in case of potential transaction synchronization
                conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
            } else {
                //如果沒有開啟事務,直接獲取一個連接
                conn = RedisConnectionUtils.getConnection(factory);
            }
            //獲取當前線程綁定的連接,如果開啟事務,也就是上面的bindConnection(factory, enableTransactionSupport)代碼執行時那個連接
            boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
            //預留鈎子函數可在執行具體操作前對connection做一些處理
            RedisConnection connToUse = preProcessConnection(conn, existingConnection);

            boolean pipelineStatus = connToUse.isPipelined();
            if (pipeline && !pipelineStatus) {
                connToUse.openPipeline();
            }

            RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
            //從redis中獲取值
            T result = action.doInRedis(connToExpose);

            // close pipeline
            if (pipeline && !pipelineStatus) {
                connToUse.closePipeline();
            }

            //預留鈎子函數可在執行具體操作后對connection做一些處理
            // TODO: any other connection processing?
            return postProcessResult(result, connToUse, existingConnection);
        } finally {
            RedisConnectionUtils.releaseConnection(conn, factory);
        }
    }

RedisConnectionUtils是獲取連接的工具類,在配置RedisTemplate是如果設置了enableTransactionSupport=true時,則會通過bindConnection方法獲取連接

    public static RedisConnection bindConnection(RedisConnectionFactory factory, boolean enableTranactionSupport) {
        return doGetConnection(factory, true, true, enableTranactionSupport);
    }
    public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
            boolean enableTransactionSupport) {

        Assert.notNull(factory, "No RedisConnectionFactory specified");
        //從當前線程中獲取連接
        RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);

        if (connHolder != null) {
            if (enableTransactionSupport) {
                //開啟reids事務
                potentiallyRegisterTransactionSynchronisation(connHolder, factory);
            }
            return connHolder.getConnection();
        }

        if (!allowCreate) {
            throw new IllegalArgumentException("No connection found and allowCreate = false");
        }

        if (log.isDebugEnabled()) {
            log.debug("Opening RedisConnection");
        }
        //如果當前線程中不存在連接則創建連接
        RedisConnection conn = factory.getConnection();

        if (bind) {

            RedisConnection connectionToBind = conn;
            //如果開啟的事務且調用添加了@Transactional的方法,這里會創建一個連接的代理對象
            if (enableTransactionSupport && isActualNonReadonlyTransactionActive()) {
                connectionToBind = createConnectionProxy(conn, factory);
            }

            connHolder = new RedisConnectionHolder(connectionToBind);

            //綁定連接到當前線程中
            TransactionSynchronizationManager.bindResource(factory, connHolder);
            if (enableTransactionSupport) {
                //開啟reids事務
                potentiallyRegisterTransactionSynchronisation(connHolder, factory);
            }

            return connHolder.getConnection();
        }

        return conn;
    }

將代碼定位到TransactionSynchronizationManager.bindResource(factory, connHolder);這一行,內部使用ThreadLocal實現,

查看potentiallyRegisterTransactionSynchronisation函數,如果加了@Transactional就會開啟事務

    private static void potentiallyRegisterTransactionSynchronisation(RedisConnectionHolder connHolder,
            final RedisConnectionFactory factory) {

        if (isActualNonReadonlyTransactionActive()) {

            if (!connHolder.isTransactionSyncronisationActive()) {
                connHolder.setTransactionSyncronisationActive(true);

                RedisConnection conn = connHolder.getConnection();
                conn.multi(); //注冊一個事務完成時的回調,用於提交或回滾redis事務
                TransactionSynchronizationManager
                        .registerSynchronization(new RedisTransactionSynchronizer(connHolder, conn, factory));
            }
        }
    }

上面代碼可以看出獲取連接的整個流程

  1. TransactionSynchronizationManager.getResource(factory)(從當前線程中獲取連接,TransactionSynchronizationManager使用ThreadLocal把連接綁定到當前線程上。
  2. 如果獲取到連接則開啟事務,返回連接,如果沒有獲取到則創建連接
  3. 創建完連接后會判斷當前操作是否在事務中isActualNonReadonlyTransactionActive (是否添加了@Transactional注解,並且事務不是ReadOnly的)
  4. 如果操作實在事務中,則會創建一個連接的代理對象
  5. TransactionSynchronizationManager.bindResource(factory, connHolder); 綁定事務到當前線程中
  6. potentiallyRegisterTransactionSynchronisation(connHolder, factory); 開啟redis事務
  7. 返回連接

從上面流程可以看出在事務中執行和不在事務中執行的關鍵區別在於,是否創建了一個連接的代理對象,下面看一下createConnectionProxy的代碼

    private static RedisConnection createConnectionProxy(RedisConnection connection, RedisConnectionFactory factory) {

        ProxyFactory proxyFactory = new ProxyFactory(connection);
        //創建了一個ConnectionSplittingInterceptor類用於攔截RedisConnection所有方法
        proxyFactory.addAdvice(new ConnectionSplittingInterceptor(factory));

        return RedisConnection.class.cast(proxyFactory.getProxy());
    }

上面代碼中創建了一個ConnectionSplittingInterceptor類用於攔截RedisConnection中的所有方法,ConnectionSplittingInterceptor中的核心代碼是intecepter方法

        @Override
        public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {

            RedisCommand commandToExecute = RedisCommand.failsafeCommandLookup(method.getName());
            //判斷命令是否為只讀命令,如果是則新開一個連接執行度操作,如果是寫命令則放在事務中執行
            if (isPotentiallyThreadBoundCommand(commandToExecute)) {

                if (log.isDebugEnabled()) {
                    log.debug(String.format("Invoke '%s' on bound conneciton", method.getName()));
                }

                return invoke(method, obj, args);
            }

            if (log.isDebugEnabled()) {
                log.debug(String.format("Invoke '%s' on unbound conneciton", method.getName()));
            }

            RedisConnection connection = factory.getConnection();

            try {
                return invoke(method, connection, args);
            } finally {
                // properly close the unbound connection after executing command
                if (!connection.isClosed()) {
                    connection.close();
                }
            }
        }

intecepter方法中會判斷這次執行的命令是否是讀命令。如果不是,會用當前線程中的連接執行也就是放在事務中執行,如果是讀操作,會創建一個新的連接執行,這樣就能立即獲得讀取的數據。

通過代碼可以看出出錯的大致流程:

  1. 調用沒有使用事務的reids操作
  2. 創建一個連接並綁定到當前線程中(由於沒有使用事務,不會創建連接的代理對象)
  3. 執行reids操作 (操作完成后並沒有把當前線程中的連接清除)
  4. 調用使用事務的redis操作(方法上添加了@Transactional注解)
  5. 獲取連接方向當前線程中已經存在了連接不再重新創建(獲取到的是沒有使用事務時創建的連接,此連接對象不是代理對象)
  6. 開啟事務
  7. 執行操作(如果執行的是讀操作,由於連接對象不是代理對象,讀操作並不會重新創建一個連接,而是使用當前連接,並且放在事務中運行,因此讀操作並不會立即執行而是等到事務提交時才能執行,導致讀操作讀取的結果為null)

解決方案:

     此問題關鍵在於如果執行了為使用事務的reids操作,在操作完成后要將當前線程中綁定的連接對象給清除掉,或者在使用的事務的reids操作之前,判斷獲取到的連接是否是代理對象,如果不是則清除掉,重新獲取連接。

方法一:

    既然它沒有執行釋放的動作,那我們幫他執行就好了。繼續閱讀TransactionSynchronizationManager的源碼,發現有TransactionSynchronizationManager.unbindResource(factory);這個方法,這個方法的內部就是將資源釋放,如果你的redisTemplate開啟了事務,在未標明@Transactional的方法內使用時,可以在redisTemplate操作redis之后立馬調用該方法,具體代碼如下:

public void getRedis() {
        Object testtredis = redisTemplate.opsForValue().get("testtredis");
        TransactionSynchronizationManager.unbindResource(redisTemplate.getConnectionFactory());
        System.out.println(testtredis);
    }

方法二:

在RedisTemplate的execute方法中我們看到了 reids為我們預留了兩個鈎子函數,

preProcessConnection(conn, existingConnection) 

postProcessResult(result, connToUse, existingConnection)

因此我們可以繼承RedisTemplate來對連接進行處理

public class CustomRedisTemplate<K, V> extends RedisTemplate<K, V> {
    private boolean enableTransactionSupport = false;

    private static boolean isActualNonReadonlyTransactionActive() {
        return TransactionSynchronizationManager.isActualTransactionActive()
                && !TransactionSynchronizationManager.isCurrentTransactionReadOnly();
    }

    /**
     * 解決 redis先非事務中運行,然后又在事務中運行,出現取到的連接還是非事務連接的問題
     * 在事務環境中用非事務連接,讀取操作無法馬上讀出數據
     *
     * @param connection
     * @param existingConnection
     * @return
     */
    @Override
    protected RedisConnection preProcessConnection(RedisConnection connection, boolean existingConnection) {
        if (existingConnection && !Proxy.isProxyClass(connection.getClass()) && isActualNonReadonlyTransactionActive()) {
            RedisConnectionUtils.unbindConnection(getConnectionFactory());
            List<TransactionSynchronization> list = new ArrayList<>(TransactionSynchronizationManager.getSynchronizations());
            TransactionSynchronizationManager.clearSynchronization();
            TransactionSynchronizationManager.initSynchronization();
            //移除最后一個回調(由於之前回去連接是會注冊一個事務回調,下面如果再獲取連接會導致注冊兩個事務回調。事務完成后會執行兩次回調,
            //回調中會清除資源,第一次已經清除,第二次再清的時候回拋出異常)
            list.remove(list.size() - 1);
            list.forEach(TransactionSynchronizationManager::registerSynchronization);
            connection = RedisConnectionUtils.bindConnection(getConnectionFactory(), enableTransactionSupport);
        }
        return connection;
    }

    @Override
    public void setEnableTransactionSupport(boolean enableTransactionSupport) {
        super.setEnableTransactionSupport(enableTransactionSupport);
        this.enableTransactionSupport = enableTransactionSupport;
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM