spring-data-redis的事務操作深度解析--原來客戶端庫還可以攢夠了事務命令再發?


一、官方文檔

簡單介紹下redis的幾個事務命令:

redis事務四大指令: MULTI、EXEC、DISCARD、WATCH。

這四個指令構成了redis事務處理的基礎。

1.MULTI用來組裝一個事務;
2.EXEC用來執行一個事務;
3.DISCARD用來取消一個事務;

4.WATCH類似於樂觀鎖機制里的版本號。

被WATCH的key如果在事務執行過程中被並發修改,則事務失敗。需要重試或取消。

以后單獨介紹。

 

下面是最新版本的spring-data-redis(2.1.3)的官方手冊。

https://docs.spring.io/spring-data/redis/docs/2.1.3.RELEASE/reference/html/#tx

 

這里,我們注意這么一句話:

Redis provides support for transactions through the multiexec, and discard commands. These operations are available on RedisTemplate. However, RedisTemplate is not guaranteed to execute all operations in the transaction with the same connection. 

意思是redis服務器通過multi,exec,discard提供事務支持。這些操作在RedisTemplate中已經實現。然而,RedisTemplate不保證在同一個連接中執行所有的這些一個事務中的操作。

 

另外一句話:

Spring Data Redis provides the SessionCallback interface for use when multiple operations need to be performed with the same connection, such as when using Redis transactions. The following example uses the multi method:

意思是:spring-data-redis也提供另外一種方式,這種方式可以保證多個操作(比如使用redis事務)可以在同一個連接中進行。示例如下:

//execute a transaction
List<Object> txResults = redisTemplate.execute(new SessionCallback<List<Object>>() {
  public List<Object> execute(RedisOperations operations) throws DataAccessException {
    operations.multi();
    operations.opsForSet().add("key", "value1");

    // This will contain the results of all operations in the transaction
    return operations.exec();
  }
});
System.out.println("Number of items added to set: " + txResults.get(0));

 

二、實現事務的方式--RedisTemplate直接操作

在前言中我們說,通過RedisTemplate直接調用multi,exec,discard,不能保證在同一個連接中進行。

這幾個操作都會調用RedisTemplate#execute(RedisCallback<T>, boolean),比如multi:

    public void multi() {
        execute(connection -> {
            connection.multi();
            return null;
        }, true);
    }

 

我們看看RedisTemplate的execute方法的源碼:

 1 public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
 2 
 3         Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
 4         Assert.notNull(action, "Callback object must not be null");
 5 
 6         RedisConnectionFactory factory = getRequiredConnectionFactory();
 7         RedisConnection conn = null;
 8         try {
 9             --開啟了enableTransactionSupport選項,則會將獲取到的連接綁定到當前線程
10             if (enableTransactionSupport) {
11                 // only bind resources in case of potential transaction synchronization
12                 conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
13             } else {
-- 未開啟,就會去獲取新的連接
14 conn = RedisConnectionUtils.getConnection(factory); 15 } 16 17 boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); 18 19 RedisConnection connToUse = preProcessConnection(conn, existingConnection);
。。。忽略無關代碼。。。
26 RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); 27 T result = action.doInRedis(connToExpose); -- 使用獲取到的連接,執行定義在業務回調中的代碼 28 。。。忽略無關代碼。。。 33 34 // TODO: any other connection processing? 35 return postProcessResult(result, connToUse, existingConnection); 36 } finally { 37 RedisConnectionUtils.releaseConnection(conn, factory); 38 } 39 }

 

查看以上源碼,我們發現,

  • 不啟用enableTransactionSupport,默認每次獲取新連接,代碼如下:
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.multi();

template.opsForValue().set("test_long", 1);

template.opsForValue().increment("test_long", 1);

template.exec();

 

  • 啟用enableTransactionSupport,每次獲取與當前線程綁定的連接,代碼如下:
RedisTemplate<String, Object> template = new RedisTemplate<>();

template.setEnableTransactionSupport(true); 
template.multi();

template.opsForValue().set("test_long", 1);

template.opsForValue().increment("test_long", 1);

template.exec();  

 

 

三、實現事務的方式--SessionCallback

 采用這種方式,默認就會將所有操作放在同一個連接,因為在execute(SessionCallback<T> session)(注意,這里是重載函數,參數和上面不一樣)源碼中:

	public <T> T execute(SessionCallback<T> session) {

		Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
		Assert.notNull(session, "Callback object must not be null");

		RedisConnectionFactory factory = getRequiredConnectionFactory();
		//在執行業務回調前,手動進行了綁定
		RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
		try {   // 業務回調
			return session.execute(this);
		} finally {
			RedisConnectionUtils.unbindConnection(factory);
		}
	}

  

四、SessionCallback方式的示例代碼:

 1         RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration("192.168.19.90");
 2         JedisConnectionFactory factory = new JedisConnectionFactory(configuration);
 3         factory.afterPropertiesSet();
 4 
 5         RedisTemplate<String, Object> template = new RedisTemplate<>();
 6         template.setConnectionFactory(factory);
 7         template.setDefaultSerializer(new GenericFastJsonRedisSerializer());
 8         StringRedisSerializer serializer = new StringRedisSerializer();
 9         template.setKeySerializer(serializer);
10         template.setHashKeySerializer(serializer);
11 
12         template.afterPropertiesSet();
14 
15         try {
16             List<Object> txResults = template.execute(new SessionCallback<List<Object>>() {
17                 @Override
18                 public List<Object> execute(RedisOperations operations) throws DataAccessException {
19 
20                     operations.multi();
21 
22                     operations.opsForValue().set("test_long", 1);
23                     int i = 1/0;
24                     operations.opsForValue().increment("test_long", 1);
25 
26                     // This will contain the results of all ops in the transaction
27                     return operations.exec();
28                 }
29             });
30 
31         } catch (Exception e) {
32             System.out.println("error");
33             e.printStackTrace();
34         }

 

有幾個值得注意的點:

1、為什么加try catch

先說結論:只是為了防止調用的主線程失敗。

 

因為事務里運行到23行,(int i = 1/0)時,會拋出異常。

但是在 template.execute(SessionCallback<T> session)中未對其進行捕獲,只在finally塊進行了連接釋放。

所以會導致調用線程(這里是main線程)中斷。

 

 2.try-catch了,事務到底得到保證了沒

我們來測試下,測試需要,省略非關鍵代碼

2.1 事務執行過程,拋出異常的情況:

 

            List<Object> txResults = template.execute(new SessionCallback<List<Object>>() {
                @Override
                public List<Object> execute(RedisOperations operations) throws DataAccessException {

                    operations.multi();

                    operations.opsForValue().set("test_long", 1);
                    int i = 1/0;
                    operations.opsForValue().increment("test_long", 1);

                    // This will contain the results of all ops in the transaction
                    return operations.exec();
                }
            });

  執行上述代碼,執行到int i = 1/0時,會拋出異常。我們需要檢查,拋出異常后,是否發送了“discard”命令給redis 服務器?

下面是我的執行結果,從最后的抓包可以看到,是發送了discard命令的:    

 

2.2 事務執行過程,不拋出異常的情況:

 這次我們注釋了拋錯的那行,可以看到“EXEC”命令已經發出去了:

 

3 拋出異常,不捕獲異常的情況:

有些同學可能比較奇怪,為啥網上那么多教程,都是沒有捕獲異常的,我這里要捕獲呢?

其實我也奇怪,但在我目前測試來看,不捕獲的話,執行線程就中斷了,因為template.execute是同步執行的。

來,看看:

 

從上圖可以看到,主線程被未捕獲的異常給中斷了,但是,查看網絡抓包,發現“DISCARD”命令還是發出去了的。

 

4.總結

從上面可以看出來,不管捕獲異常沒,事務都能得到保證。只是不捕獲異常,會導致主線程中斷。

不保證所有版本如此,在我這,spring-data-redis 2.1.3是這樣的。

我跟了n趟代碼,發現:

1、在執行sessionCallBack中的代碼時,我們一般會先執行multi命令。

multi命令的代碼如下:

    public void multi() {
        execute(connection -> {
 connection.multi(); return null;
        }, true);
    }

即調用了當前線程綁定的connection的multi方法。

進入JedisConnection的multi方法,可以看到:

private @Nullable Transaction transaction;

public void multi() { if (isQueueing()) { return; } try { if (isPipelined()) { getRequiredPipeline().multi(); return; }
//賦值給了connection的實例變量
this.transaction = jedis.multi(); } catch (Exception ex) { throw convertJedisAccessException(ex); } }

 

2、在有異常拋出時,直接進入finally塊,會去關閉connection,當然,這里的關閉只是還回到連接池。

大概的邏輯如下:

 

3.在沒有異常拋出時,執行exec,在exec中會先將狀態變量修改,后邊進入finally的時候,就不會發送discard命令了。

 

 最后的結論就是:

所有這一切的前提是,共有同一個連接。(使用SessionCallBack的方式就能保證,總是共用同一個連接),否則multi用到的連接1里transcation是有值的,但是后面獲取到的其他連接2,3,4,里面的transaction是空的,

還怎么保證事務呢?

 

五、思考

在不開啟redisTemplate的enableTransactionSupport選項時,每執行一次redis操作,就會向服務器發送相應的命令。

但是,在開啟了redisTemplate的enableTransactionSupport選項,或者使用SessionCallback方式時,會像下面這樣發送命令:

 

 

 后來,我在《redis實戰》這本書里的4.4節,Redis事務這一節里,找到了答案:

 

歸根到底呢,因為重用同一個連接,所以可以延遲發;如果每次都不一樣的連接,只能馬上發了。

 

 這里另外說一句,不是所有客戶端都這樣,redis自帶的redis-cli是不會延遲發送的。

 

六、源碼

https://github.com/cctvckl/work_util/tree/master/spring-redis-template-2.1.3

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM