5.10.1. @Transactional Support
Transaction Support is disabled by default and has to be explicitly enabled for each RedisTemplate in use by setting setEnableTransactionSupport(true).
This will force binding the RedisConnection in use to the current Thread triggering MULTI.
If the transaction finishes without errors, EXEC is called, otherwise DISCARD.
Once in MULTI, RedisConnection would queue write operations, all readonly operations, such as KEYS are piped to a fresh (non thread bound) RedisConnection.
/** Sample Configuration **/ @Configuration public class RedisTxContextConfiguration { @Bean public StringRedisTemplate redisTemplate() { StringRedisTemplate template = new StringRedisTemplate(redisConnectionFactory()); // explicitly enable transaction support template.setEnableTransactionSupport(true); return template; } @Bean public PlatformTransactionManager transactionManager() throws SQLException { return new DataSourceTransactionManager(dataSource()); } @Bean public RedisConnectionFactory redisConnectionFactory( // jedis, lettuce, srp,... ); @Bean public DataSource dataSource() throws SQLException { // ... } } /** Usage Constrainsts **/ // executed on thread bound connection template.opsForValue().set("foo", "bar"); // read operation executed on a free (not tx-aware) connection template.keys("*"); // returns null as values set within transaction are not visible template.opsForValue().get("foo");
public Long leftPush(V value) { return this.ops.leftPush(this.getKey(), value); }
public Long leftPush(K key, V value) { final byte[] rawKey = this.rawKey(key); final byte[] rawValue = this.rawValue(value); return (Long)this.execute(new RedisCallback() { public Long doInRedis(RedisConnection connection) { return connection.lPush(rawKey, new byte[][]{rawValue}); } }, true); }
<T> T execute(RedisCallback<T> callback, boolean b) { return this.template.execute(callback, b); }
public <T> T execute(RedisCallback<T> action, boolean exposeConnection) { return this.execute(action, exposeConnection, false); }
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) { Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it"); Assert.notNull(action, "Callback object must not be null"); RedisConnectionFactory factory = this.getConnectionFactory(); RedisConnection conn = null; Object var11; try { if(this.enableTransactionSupport) { conn = RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport); } else { conn = RedisConnectionUtils.getConnection(factory); } boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); RedisConnection connToUse = this.preProcessConnection(conn, existingConnection); boolean pipelineStatus = connToUse.isPipelined(); if(pipeline && !pipelineStatus) { connToUse.openPipeline(); } RedisConnection connToExpose = exposeConnection?connToUse:this.createRedisConnectionProxy(connToUse); Object result = action.doInRedis(connToExpose); if(pipeline && !pipelineStatus) { connToUse.closePipeline(); } var11 = this.postProcessResult(result, connToUse, existingConnection); } finally { if(!this.enableTransactionSupport) { RedisConnectionUtils.releaseConnection(conn, factory); } } return var11; }
序
本文主要講述如何在java里頭使用redis進行cas操作。其實呢,redis不像memcached那樣顯示地支持cas操作,不過它有事務的概念。
准備
redis的樂觀鎖支持
Redis通過使用WATCH, MULTI, and EXEC組成的事務來實現樂觀鎖(注意沒有用DISCARD
),Redis事務沒有回滾操作。在SpringDataRedis當中通過RedisTemplate的SessionCallback中來支持(否則事務不生效
)。discard的話不需要自己代碼處理,callback返回null,成的話,返回非null,依據這個來判斷事務是否成功(沒有拋異常
)。
實例
@Test public void cas() throws InterruptedException, ExecutionException { String key = "test-cas-1"; ValueOperations<String, String> strOps = redisTemplate.opsForValue(); strOps.set(key, "hello"); ExecutorService pool = Executors.newCachedThreadPool(); List<Callable<Object>> tasks = new ArrayList<>(); for(int i=0;i<5;i++){ final int idx = i; tasks.add(new Callable() { @Override public Object call() throws Exception { return redisTemplate.execute(new SessionCallback() { @Override public Object execute(RedisOperations operations) throws DataAccessException { operations.watch(key); String origin = (String) operations.opsForValue().get(key); operations.multi(); operations.opsForValue().set(key, origin + idx); Object rs = operations.exec(); System.out.println("set:"+origin+idx+" rs:"+rs); return rs; } }); } }); } List<Future<Object>> futures = pool.invokeAll(tasks); for(Future<Object> f:futures){ System.out.println(f.get()); } pool.shutdown(); pool.awaitTermination(1000, TimeUnit.MILLISECONDS); }
輸出
set:hello2 rs:null set:hello3 rs:[] set:hello1 rs:null set:hello4 rs:null set:hello0 rs:null
查看該值
127.0.0.1:6379> get test-cas-1 "\"hello3\""
坑
SessionCallback
沒有在SessionCallback里頭執行watch、multi、exec,而是自己單獨寫
與數據庫事務的混淆
template.setEnableTransactionSupport(true);
這個應該是支持數據庫的事務成功才執行的意思。因為Spring默認的事務,都是基於DB事務的
/** * Gets a Redis connection. Is aware of and will return any existing corresponding connections bound to the current * thread, for example when using a transaction manager. Will create a new Connection otherwise, if * {@code allowCreate} is <tt>true</tt>. * * @param factory connection factory for creating the connection * @param allowCreate whether a new (unbound) connection should be created when no connection can be found for the * current thread * @param bind binds the connection to the thread, in case one was created * @param enableTransactionSupport * @return an active Redis connection */ public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind, boolean enableTransactionSupport) { Assert.notNull(factory, "No RedisConnectionFactory specified"); RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory); if (connHolder != null) { if (enableTransactionSupport) { potentiallyRegisterTransactionSynchronisation(connHolder, factory); } return connHolder.getConnection(); } if (!allowCreate) { throw new IllegalArgumentException("No connection found and allowCreate = false"); } if (log.isDebugEnabled()) { log.debug("Opening RedisConnection"); } RedisConnection conn = factory.getConnection(); if (bind) { RedisConnection connectionToBind = conn; if (enableTransactionSupport && isActualNonReadonlyTransactionActive()) { connectionToBind = createConnectionProxy(conn, factory); } connHolder = new RedisConnectionHolder(connectionToBind); TransactionSynchronizationManager.bindResource(factory, connHolder); if (enableTransactionSupport) { potentiallyRegisterTransactionSynchronisation(connHolder, factory); } return connHolder.getConnection(); } return conn; }
不要跟本文的樂觀鎖說的事務混淆在一起。
參考
https://segmentfault.com/a/1190000004393573
org.springframework.data.redis.core.RedisTemplate
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) { Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it"); Assert.notNull(action, "Callback object must not be null"); RedisConnectionFactory factory = this.getConnectionFactory(); RedisConnection conn = null; Object var11; try { if(this.enableTransactionSupport) { conn = RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport);
//這個是否支持的開關可以在@Configuration中配置: } else { conn = RedisConnectionUtils.getConnection(factory); } boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); RedisConnection connToUse = this.preProcessConnection(conn, existingConnection); boolean pipelineStatus = connToUse.isPipelined(); if(pipeline && !pipelineStatus) { connToUse.openPipeline(); } RedisConnection connToExpose = exposeConnection?connToUse:this.createRedisConnectionProxy(connToUse); Object result = action.doInRedis(connToExpose); if(pipeline && !pipelineStatus) { connToUse.closePipeline(); } var11 = this.postProcessResult(result, connToUse, existingConnection); } finally { if(!this.enableTransactionSupport) { RedisConnectionUtils.releaseConnection(conn, factory); } } return var11; }
@Bean public RedisTemplate redisTemplate(){ RedisTemplate<StringRedisSerializer, Serializable> rt = new RedisTemplate<>(); rt.setConnectionFactory(jedisConnectionFactory()); StringRedisSerializer stringSerializer = new StringRedisSerializer(); JdkSerializationRedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer(); RedisHashKeySerializer redisHashKeySerializer = new RedisHashKeySerializer(); rt.setKeySerializer(stringSerializer); rt.setValueSerializer(jdkSerializationRedisSerializer); rt.setHashKeySerializer(redisHashKeySerializer); rt.setHashValueSerializer(jdkSerializationRedisSerializer); rt.afterPropertiesSet(); rt.setEnableTransactionSupport(true); return rt; }
org.springframework.data.redis.core.RedisConnectionUtils
public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind, boolean enableTransactionSupport) { Assert.notNull(factory, "No RedisConnectionFactory specified"); RedisConnectionUtils.RedisConnectionHolder connHolder = (RedisConnectionUtils.RedisConnectionHolder)TransactionSynchronizationManager.getResource(factory); if(connHolder != null) { if(enableTransactionSupport) { potentiallyRegisterTransactionSynchronisation(connHolder, factory); } return connHolder.getConnection(); } else if(!allowCreate) { throw new IllegalArgumentException("No connection found and allowCreate = false"); } else { if(log.isDebugEnabled()) { log.debug("Opening RedisConnection"); } RedisConnection conn = factory.getConnection(); if(bind) { RedisConnection connectionToBind = conn; if(enableTransactionSupport && isActualNonReadonlyTransactionActive()) { connectionToBind = createConnectionProxy(conn, factory); } connHolder = new RedisConnectionUtils.RedisConnectionHolder(connectionToBind); TransactionSynchronizationManager.bindResource(factory, connHolder); if(enableTransactionSupport) { potentiallyRegisterTransactionSynchronisation(connHolder, factory); } return connHolder.getConnection(); } else { return conn; } } }
private static void potentiallyRegisterTransactionSynchronisation(RedisConnectionUtils.RedisConnectionHolder connHolder, RedisConnectionFactory factory) { if(isActualNonReadonlyTransactionActive() && !connHolder.isTransactionSyncronisationActive()) { connHolder.setTransactionSyncronisationActive(true); RedisConnection conn = connHolder.getConnection(); conn.multi(); TransactionSynchronizationManager.registerSynchronization(new RedisConnectionUtils.RedisTransactionSynchronizer(connHolder, conn, factory)); } }
RedisTemplate api詳解
1. RedisTemplate的事務
private boolean enableTransactionSupport = false; private boolean exposeConnection = false; private boolean initialized = false; private boolean enableDefaultSerializer = true; private RedisSerializer<?> defaultSerializer = new JdkSerializationRedisSerializer(); private RedisSerializer keySerializer = null; private RedisSerializer valueSerializer = null; private RedisSerializer hashKeySerializer = null; private RedisSerializer hashValueSerializer = null; private RedisSerializer<String> stringSerializer = new StringRedisSerializer(); private ScriptExecutor<K> scriptExecutor; // cache singleton objects (where possible) private ValueOperations<K, V> valueOps; private ListOperations<K, V> listOps; private SetOperations<K, V> setOps; private ZSetOperations<K, V> zSetOps;
enableTransactionSupport:是否啟用事務支持。
在代碼中搜索下用到這個變量的地方,會看到,在調用RedisCallback之前,有一行代碼是如果啟用事務支持,那么conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport),
也就是說,系統自動幫我們拿到了事務中綁定的連接。
可以在一個方法的多次對Redis增刪該查中,始終使用同一個連接。但是,即使使用了同樣的連接,沒有進行connection.multi()和connection.exec(),依然是無法啟用事務的。
我沒有仔細的查閱代碼,但是可以知道的是,Spring已經對這個,給了我們一個更好的支持:@Transactional
在調用RedisTempalte中的execute()方法的地方,加入這個注解(是spring包下面提供的,不要引用成rt包下的注解),能讓這個方法中的所有execute,自動加入multi()以及異常的回滾或者是正常運行時候的提交!
2. RedisTempalte的Serializer
用過jedis操作的都知道,所有connection的操作方法,都是傳入字節數組。那么,將一個對象和字節相互轉換,就需要通過序列化和反序列化。
模版方法中,Spring提供了默認的StringSerializer和JdkSerializer,第一個很簡單,就是通過String.getBytes()來實現的。而且在Redis中,所有存儲的值都是字符串類型的。所以這種方法保存后,通過Redis-cli控制台,是可以清楚的查看到我們保存了什么key,value是什么。但是對於JdkSerializationRedisSerializer來說,這個序列化方法就是Jdk提供的了。首先要求我們要被序列化的類繼承自Serializeable接口,然后通過,然后通過Jdk對象序列化的方法保存。(注:這個序列化保存的對象,即使是個String類型的,在redis控制台,也是看不出來的,因為它保存了一些對象的類型什么的額外信息,)
這么一長串,其實就是一個int類型的123。
keySerializer:這個是對key的默認序列化器。默認值是StringSerializer。
valueSerializer:這個是對value的默認序列化器,默認值是取自DefaultSerializer的JdkSerializationRedisSerializer。
hashKeySerializer:對hash結構數據的hashkey序列化器,默認值是取自DefaultSerializer的JdkSerializationRedisSerializer。
hashValueSerializer:對hash結構數據的hashvalue序列化器,默認值是取自DefaultSerializer的JdkSerializationRedisSerializer。
除此之外,我們在該類中,還發現了valueOps和hashOps等操作類,這是spring給我們提供的可以直接使用來操作Redis的類,非常方便。下一篇我們將講解這些類。
http://www.cnblogs.com/luochengqiuse/p/4640932.html?utm_source=tuicool&utm_medium=referral
把 Redis 當作數據庫的用例
現在我們來看看在服務器端 Java 企業版系統中把 Redis 當作數據庫的各種用法吧。無論用例的簡繁,Redis 都能幫助用戶優化性能、處理能力和延遲,讓常規 Java 企業版技術棧望而卻步。
1. 全局唯一增量計數器
我們先從一個相對簡單的用例開始吧:一個增量計數器,可顯示某網站受到多少次點擊。Spring Data Redis 有兩個適用於這一實用程序的類:RedisAtomicInteger 和 RedisAtomicLong。和 Java 並發包中的 AtomicInteger 和 AtomicLong 不同的是,這些 Spring 類能在多個 JVM 中發揮作用。
列表 1:全局唯一增量計數器
RedisAtomicLong counter = new RedisAtomicLong("UNIQUE_COUNTER_NAME", redisTemplate.getConnectionFactory()); Long myCounter = counter.incrementAndGet();// return the incremented value
請注意整型溢出並謹記,在這兩個類上進行操作需要付出相對較高的代價。
2. 全局悲觀鎖
時不時的,用戶就得應對服務器集群的爭用。假設你從一個服務器集群運行一個預定作業。在沒有全局鎖的情況下,集群中的節點會發起冗余作業實例。假設某個聊天室分區可容納 50 人。如果聊天室已滿,就需要創建新的聊天室實例來容納另外 50 人。
如果檢測到聊天室已滿但沒有全局鎖,集群中的各個節點就會創建自有的聊天室實例,為整個系統帶來不可預知的因素。列表 2 介紹了應當如何充分利用 SETNX(SET if **N**ot e**X**ists:如果不存在,則設置)這一 Redis 命令來執行全局悲觀鎖。
列表2:全局悲觀鎖
import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.BoundValueOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @Service public class DistributedLockRedis { public static final Logger LOGGER = LoggerFactory.getLogger(DistributedLockRedis.class); public static final String DISTRIBUTED_LOCK_REDIS_KEY = "distributed:lock:redis:"; @Autowired private RedisTemplate<String, Long> redisTemplate; /** * 由於Redis是單線程模型,命令操作原子性,所以利用這個特性可以很容易的實現分布式鎖。 * 獲得一個鎖 * * @param bizName 業務名 * @param lockTimeout 線程占用鎖的時間 * @param unit 單位 * @throws InterruptedException 鎖可以被中斷 */ public void lock(String bizName, int lockTimeout, TimeUnit unit) throws InterruptedException { // redisTemplate.getConnectionFactory().getConnection().setNX() String redisKey; if (StringUtils.isBlank(bizName)) { LOGGER.warn("is not recommended!"); redisKey = DISTRIBUTED_LOCK_REDIS_KEY; } else { redisKey = DISTRIBUTED_LOCK_REDIS_KEY + bizName.trim(); } BoundValueOperations<String, Long> valueOps = redisTemplate.boundValueOps(redisKey); while (true) { // https://redis.io/commands/setnx long currentTimeMillis = System.currentTimeMillis(); long releaseLockTime = currentTimeMillis + unit.toMillis(lockTimeout) + 1; //這兩個if else不能混寫,因為多個相同類型的線程競爭鎖時,在鎖超時時,設置的超時時間是一樣的 if (valueOps.setIfAbsent(releaseLockTime)) {//第一次獲取鎖 redisTemplate.expire(redisKey, lockTimeout, unit); return; } else if (currentTimeMillis > valueOps.get()) {//鎖已經超時 //如果其它線程占用鎖,再重新設置的時間和原來時間的時間差,可以忽略 Long lockCurrentValue = valueOps.getAndSet(releaseLockTime); //如果當前時間小於LockKey存放的時間,說明已經有其它線程加鎖 if (currentTimeMillis > lockCurrentValue) { redisTemplate.expire(redisKey, lockTimeout, unit); return; } } else { TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 1000)); } } } }
Set key to hold string value if key does not exist. In that case, it is equal to SET. When key already holds a value, no operation is performed. SETNX is short for "SET if Not eXists".
Return value
Integer reply, specifically:
1 if the key was set
0 if the key was not set
Examples
redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"
redis>
How to do set nx option using RedisTemplate?
The method name is setIfAbsent
setIfAbsent
Boolean setIfAbsent(V value)
Set the bound key to hold the string value if the bound key is absent.
Parameters:
value -
See Also:
Redis Documentation: SETNX
http://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/BoundValueOperations.html#setIfAbsent-V-
用以下Python代碼來實現上述的使用 SETNX 命令作分布式鎖的算法。
LOCK_TIMEOUT = 3 lock = 0 lock_timeout = 0 lock_key = 'lock.foo' # 獲取鎖 while lock != 1: now = int(time.time()) lock_timeout = now + LOCK_TIMEOUT + 1 lock = redis_client.setnx(lock_key, lock_timeout) if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout)): break else: time.sleep(0.001) # 已獲得鎖 do_job() # 釋放鎖 now = int(time.time()) if now < lock_timeout: redis_client.delete(lock_key)
http://blog.csdn.net/lihao21/article/details/49104695
如果使用關系數據庫,一旦最先生成鎖的程序意外退出,鎖就可能永遠得不到釋放。Redis 的 EXPIRE 設置可確保在任何情況下釋放鎖。
3. 位屏蔽(Bit Mask)
假設 web 客戶端需要輪詢一台 web 服務器,針對某個數據庫中的多個表查詢客戶指定更新內容。如果盲目地查詢所有相應的表以尋找潛在更新,成本較高。為了避免這一做法,可以嘗試在 Redis 中給每個客戶端保存一個整型作為臟指標,整型的每個數位表示一個表。該表中存在客戶所需更新時,設置數位。輪詢期間,不會觸發對表的查詢,除非設置了相應數位。就獲取並將這樣的位屏蔽設置為 STRING 而言,Redis 非常高效。
4. 排行榜(Leaderboard)
Redis 的 ZSET 數據結構為游戲玩家排行榜提供了簡潔的解決方案。ZSET 的工作方式有些類似於 Java 中的 PriorityQueue,各個對象均為經過排序的數據結構,井井有條。可以按照分數排出游戲玩家在排行榜上的位置。Redis 的 ZSET 定義了一份內容豐富的命令列表,支持靈活有效的查詢。例如,ZRANGE(包括 ZREVRANGE)可返回有序集內的指定范圍要素。
你可以使用這一命令列出排行榜前 100 名玩家。ZRANGEBYSCORE 返回指定分數范圍內的要素(例如列出得分為 1000 至 2000 之間的玩家),ZRNK 則返回有序集內的要素的排名,諸如此類。
5. 布隆(Bloom)過濾器
布隆過濾器 (Bloom filter) 是一種空間利用率較高的概率數據結構,用來測試某元素是否某個集的一員。可能會出現誤報匹配,但不會漏報。查詢可返回“可能在集內”或“肯定不在集內”。
就在線服務和離線服務包括大數據分析等方面,布隆過濾器數據結構都能派上很多用場。Facebook 利用布隆過濾器進行輸入提示搜索,為用戶輸入的查詢提取朋友和朋友的朋友。Apache HBase 則利用布隆過濾器過濾掉不包含特殊行或列的 HFile 塊磁盤讀取,使讀取速度得到明顯提升。Bitly 用布隆過濾器來避免將用戶重定向到惡意網站,而 Quara 則在訂閱后端執行了一個切分的布隆過濾器,用來過濾掉之前查看過的內容。在我自己的項目里,我用布隆過濾器追蹤用戶對各個主題的投票情況。
借助出色的速度和處理能力,Redis 極好地融合了布隆過濾器。搜索 GitHub,就能發現很多 Redis 布隆過濾器項目,其中一些還支持可調諧精度。
6. 高效的全局通知:發布/訂閱渠道
Redis 發布/訂閱渠道的工作方式類似於一個扇出消息傳遞系統,或 JMS 語義中的一個主題。JMS 主題和 Redis 發布/訂閱渠道的一個區別是,通過 Redis 發布的消息並不持久。消息被推送給所有相連的客戶端后,Redis 上就會刪除這一消息。換句話說,訂閱者必須一直在線才能接收新消息。Redis 發布/訂閱渠道的典型用例包括實時配置分布、簡單的聊天服務器等。
在 web 服務器集群中,每個節點都可以是 Redis 發布/訂閱渠道的一個訂閱者。發布到渠道上的消息也會被即時推送到所有相連節點。這一消息可以是某種配置更改,也可以是針對所有在線用戶的全局通知。和恆定輪詢相比,這種推送溝通模式顯然極為高效。
Redis 性能優化
Redis 非常強大,但也可以從整體上和根據特定編程場景做出進一步優化。可以考慮以下技巧。
存活時間
所有 Redis 數據結構都具備存活時間 (TTL) 屬性。當你設置這一屬性時,數據結構會在過期后自動刪除。充分利用這一功能,可以讓 Redis 保持較低的內存損耗。
管道技術
在一條請求中向 Redis 發送多個命令,這種方法叫做管道技術。這一技術節省了網絡往返的成本,這一點非常重要,因為網絡延遲可能比 Redis 延遲要高上好幾個量級。但這里存在一個陷阱:管道中的 Redis 命令列表必須預先確定,並且應當彼此獨立。如果一個命令的參數是由先前命令的結果計算得出,管道技術就不起作用。列表 3 給出了 Redis 管道技術的一個示例。
列表 3:管道技術
@Override public List<LeaderboardEntry> fetchLeaderboard(String key, String... playerIds) { final List<LeaderboardEntry> entries = new ArrayList<>(); redisTemplate.executePipelined(new RedisCallback<Object>() { // enable Redis Pipeline @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { for(String playerId : playerIds) { Long rank = connection.zRevRank(key.getBytes(), playerId.getBytes()); Double score = connection.zScore(key.getBytes(), playerId.getBytes()); LeaderboardEntry entry = new LeaderboardEntry(playerId, score!=null?score.intValue():-1, rank!=null?rank.intValue():-1); entries.add(entry); } return null; } }); return entries; }
副本集和切分
Redis 支持主從副本配置。和 MongoDB 一樣,副本集也是不對稱的,因為從節點是只讀的,以便共享讀取工作量。
我在文章開頭提到過,也可以執行切分來橫向擴展 Redis 的處理能力和存儲容量。
事實上,Redis 非常強大,據亞馬遜公司的內部基准顯示,類型 r3.4xlarge 的一個 EC2 實例每秒可輕松處理 100000 次請求。傳說還有把每秒 700000 次請求作為基准的。
對於中小型應用程序,通常無需考慮 Redis 切分。
(請參見這篇非常出色的文章《運行中的 Redis》https://www.manning.com/books/redis-in-action,進一步了解 Redis 的性能優化和切分http://www.oneapm.com/brand/apm.html。)
Redis 中的事務
Redis 並不像關系數據庫管理系統那樣能支持全面的 ACID 事務,但其自有的事務也非常有效。從本質上來說,Redis 事務是管道、樂觀鎖、確定提交和回滾的結合。其思想是執行一個管道中的一個命令列表,然后觀察某一關鍵記錄的潛在更新(樂觀鎖)。根據所觀察的記錄是否會被另一個進程更新,該命令列表或整體確定提交,或完全回滾。
下面以某個拍賣網站上的賣方庫存為例。買方試圖從賣方處購買某件商品時,你負責觀察 Redis 事務內的賣方庫存變化。同時,你要從同一個庫存中刪除此商品。事務關閉前,如果庫存被一個以上進程觸及(例如,如果兩個買方同時購買了同一件商品),事務將回滾,否則事務會確定提交。回滾后可開始重試。
Spring Data Redis 中的事務陷阱
我在 Spring 的 RedisTemplate
類 redisTemplate.setEnableTransactionSupport(true);
中啟用 Redis 事務時得到一個慘痛的教訓:Redis 會在運行幾天后開始返回垃圾數據,導致數據嚴重損壞。StackOverflow上也報道了類似情況。
在運行一個 monitor
命令后,我的團隊發現,在進行 Redis 操作或 RedisCallback
后,Spring 並沒有自動關閉 Redis 連接,而事實上它是應該關閉的。如果再次使用未關閉的連接,可能會從意想不到的 Redis 密鑰返回垃圾數據。有意思的是,如果在 RedisTemplate
中把事務支持設為 false,這一問題就不會出現了。
我們發現,我們可以先在 Spring 語境里配置一個 PlatformTransactionManager
(例如 DataSourceTransactionManager
),然后再用 @Transactional
注釋來聲明 Redis 事務的范圍,讓 Spring 自動關閉 Redis 連接。
根據這一經驗,我們相信,在 Spring 語境里配置兩個單獨的 RedisTemplate
是很好的做法:其中一個 RedisTemplates 的事務設為 false,用於大多數 Redis 操作,另一個 RedisTemplates 的事務已激活,僅用於 Redis 事務。當然必須要聲明 PlatformTransactionManager
和 @Transactional
,以防返回垃圾數值。
另外,我們還發現了 Redis 事務和關系數據庫事務(在本例中,即 JDBC)相結合的不利之處。混合型事務的表現和預想的不太一樣。
I learned a hard lesson when enabling Redis transactions in the Spring RedisTemplate class redisTemplate.setEnableTransactionSupport(true);: Redis started returning junk data after running for a few days, causing serious data corruption. A similar case was reported on StackOverflow. By running a monitor command, my team discovered that after a Redis operation or RedisCallback, Spring doesn't close the Redis connection automatically, as it should do. Reusing an unclosed connection may return junk data from an unexpected key in Redis. Interestingly, this issue doesn't show up when transaction support is set to false in RedisTemplate. We discovered that we could make Spring close Redis connections automatically by configuring a PlatformTransactionManager (such as DataSourceTransactionManager) in the Spring context, then using the @Transactional annotation to declare the scope of Redis transactions. Based on this experience, we believe it's good practice to configure two separate RedisTemplates in the Spring context: One with transaction set to false is used on most Redis operations; the other with transaction enabled is only applied to Redis transactions. Of course PlatformTransactionManager and @Transactional must be declared to prevent junk values from being returned. Moreover, we learned the downside of mixing a Redis transaction with a relational database transaction, in this case JDBC. Mixed transactions do not behave as you would expect.
結論
我希望通過這篇文章向其他 Java 企業開發師介紹 Redis 的強大之處,尤其是將 Redis 用作遠程數據緩存和用於易揮發數據時。
在這里我介紹了 Redis 的六個有效用例,分享了一些性能優化技巧,還說明了我的 Glu Mobile 團隊怎樣解決了 Spring Data Redis 事務配置不當造成的垃圾數據問題。
我希望這篇文章能夠激發你對 Redis NoSQL 的好奇心,讓你能夠受到啟發,在自己的 Java 企業版系統里創造出一番天地。
http://blog.oneapm.com/apm-tech/778.html
http://www.javaworld.com/article/3062899/big-data/lightning-fast-nosql-with-spring-data-redis.html?page=2
https://docs.spring.io/spring-data/redis/docs/1.8.6.RELEASE/reference/html/#tx.spring
如果是使用編程的方式(通常是基於 Spring Boot 項目)配置 RedisTemplate 的話只需在你的配置類(被@Configuration
注解修飾的類)中顯式創建 RedisTemplate
Bean,設置 Serializer
即可。
@Bean public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(redisConnectionFactory); GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer(); redisTemplate.setDefaultSerializer(fastJsonRedisSerializer);//設置默認的Serialize,包含 keySerializer & valueSerializer //redisTemplate.setKeySerializer(fastJsonRedisSerializer);//單獨設置keySerializer //redisTemplate.setValueSerializer(fastJsonRedisSerializer);//單獨設置valueSerializer return redisTemplate; }
https://github.com/alibaba/fastjson/wiki/%E5%9C%A8-Spring-%E4%B8%AD%E9%9B%86%E6%88%90-Fastjson