Redis介紹
redis是一個key-value存儲系統。和Memcached類似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操作,而且這些操作都是原子性的。在此基礎上,redis支持各種不同方式的排序。與memcached一樣,為了保證效率,數據都是緩存在內存中。區別的是redis會周期性的把更新的數據寫入磁盤或者把修改操作寫入追加的記錄文件,並且在此基礎上實現了master-slave(主從)同步。
Redis的優缺點
優點
- 速度快,因為數據存在內存中,類似於HashMap,HashMap的優勢就是查找和操作的時間復雜度都是O(1)
- 支持豐富數據類型,支持string,list,set,sorted set,hash
- 支持事務,操作都是原子性,所謂的原子性就是對數據的更改要么全部執行,要么全部不執行
- 豐富的特性:可用於緩存,消息,按key設置過期時間,過期后將會自動刪除
缺點
在不使用框架的情況下使用起來較為麻煩
分布式Redis的搭建
搭建集群的第一件事情我們需要一些運行在集群模式的Redis實例. 這意味這集群並不是由一些普通的Redis實例組成的,集群模式需要通過配置啟用,開啟集群模式后的Redis實例便可以使用集群特有的命令和特性了.
下面是一個最少選項的集群的配置文件:
port 7000 cluster-enabled yes cluster-config-file nodes.conf cluster-node-timeout 5000 appendonly yes
文件中的 cluster-enabled 選項用於開實例的集群模式, 而 cluster-conf-file 選項則設定了保存節點配置文件的路徑, 默認值為 nodes.conf.節點配置文件無須人為修改, 它由 Redis 集群在啟動時創建, 並在有需要時自動進行更新。
要讓集群正常運作至少需要三個主節點,不過在剛開始試用集群功能時, 強烈建議使用六個節點: 其中三個為主節點, 而其余三個則是各個主節點的從節點。
為了方便測試,直接在同一台計算機內建立六個文件夾7000到7005,分別表示六個Redis實例,在文件夾 7000 至 7005 中, 各創建一個 redis.conf 文件, 文件的內容使用上面的示例配置文件, 但記得將配置中的端口號從 7000 改為與文件夾名字相同的號碼。
從 Redis Github 頁面 的 unstable 分支中取出最新的 Redis 源碼, 編譯出可執行文件 redis-server , 並將文件復制到 cluster-test 文件夾, 然后使用類似以下命令, 在每個標簽頁中打開一個實例:
/redis-server ./redis.conf
現在我們已經有了六個正在運行中的 Redis 實例, 接下來我們需要使用這些實例來創建集群, 並為每個節點編寫配置文件。
通過使用 Redis 集群命令行工具 redis-trib , 編寫節點配置文件的工作可以非常容易地完成: redis-trib 位於 Redis 源碼的 src 文件夾中, 它是一個 Ruby 程序, 這個程序通過向實例發送特殊命令來完成創建新集群, 檢查集群, 或者對集群進行重新分片(reshared)等工作。
./redis-trib.rb create --replicas 1 127.0.0.1:7000 127.0.0.1:7001
127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005
這個命令在這里用於創建一個新的集群, 選項–replicas 1 表示我們希望為集群中的每個主節點創建一個從節點。
之后跟着的其他參數則是這個集群實例的地址列表,3個master3個slave redis-trib 會打印出一份預想中的配置給你看, 如果你覺得沒問題的話, 就可以輸入 yes , redis-trib 就會將這份配置應用到集群當中,讓各個節點開始互相通訊,最后可以得到如下信息:
[OK] All 16384 slots covered
這表示集群中的 16384 個槽都有至少一個主節點在處理, 集群運作正常。
分布式Redis的原理
由上文可知,Redis是以哈希槽的形式對集群進行划分的,整個集群的哈希槽一共有16384個,在有3個Redis實例的情況下,節點A包含從0到5500的哈希槽,節點B包含從5501到11000 的哈希槽,節點C包含從11001到16384的哈希槽。當有新的節點添加進來的時候,會從當前的各個節點中選取一定的槽分配給新添加的節點,當有節點從集群中被刪除時,則會將當前節點的槽分配給集群中其他正在運行的節點。每當有新的key添加到Redis中時,會根據算法算出相應的哈希槽來找到對應的集群節點。
Redisson介紹
什么是Redisson
Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。
Redisson的使用
Redisson配置(以spring XML為例)
Maven配置
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>2.2.12</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.6.0</version> </dependency>
Spring XML配置
<redisson:client id="redisson" name="redisson1,redisson2" threads="0" netty-threads="0" codec-ref="myCodec" transport-mode="NIO" redisson-reference-enabled="true" codec-provider-ref="myCodecProvider" resolver-provider-ref="myResolverProvider" executor-ref="myExecutor" event-loop-group-ref="myEventLoopGroup" > <!-- 這里的name屬性和qualifier子元素不能同時使用。 id和name的屬性都可以被用來作為qualifier的備選值。 --> <!--<qualifier value="redisson3"/>--> <redisson:cluster-servers idle-connection-timeout="10000" ping-timeout="1000" connect-timeout="10000" timeout="3000" retry-attempts="3" retry-interval="1500" reconnection-timeout="3000" failed-attempts="3" password="do_not_use_if_it_is_not_set" subscriptions-per-connection="5" client-name="none" load-balancer-ref="myLoadBalancer" subscription-connection-minimum-idle-size="1" subscription-connection-pool-size="50" slave-connection-minimum-idle-size="10" slave-connection-pool-size="64" master-connection-minimum-idle-size="10" master-connection-pool-size="64" read-mode="SLAVE" subscription-mode="SLAVE" scan-interval="1000" > <redisson:node-address value="redis://127.0.0.1:6379" /> <redisson:node-address value="redis://127.0.0.1:6380" /> <redisson:node-address value="redis://127.0.0.1:6381" /> </redisson:cluster-servers> </redisson:client>
<!-- 最基本配置 -->
<redisson:client>
<redisson:cluster-servers>
<redisson:node-address value="redis://127.0.0.1:6379" />
<redisson:node-address value="redis://127.0.0.1:6380" />
<redisson:node-address value="redis://127.0.0.1:6381" />
...
</redisson:cluster-servers>
</redisson:client>
Redisson實現分布式鎖
鎖的種類
可重入鎖
RLock lock = redisson.getLock("anyLock"); // 最常見的使用方法 lock.lock(); ... lock.unlock()
// 加鎖以后10秒鍾自動解鎖 // 無需調用unlock方法手動解鎖 lock.lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();
公平鎖
RLock fairLock = redisson.getFairLock("anyLock"); // 最常見的使用方法 fairLock.lock();
// 10秒鍾以后自動解鎖 // 無需調用unlock方法手動解鎖 fairLock.lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS); ... fairLock.unlock();
Redisson同時還為分布式可重入公平鎖提供了異步執行的相關方法:
RLock fairLock = redisson.getFairLock("anyLock"); fairLock.lockAsync(); fairLock.lockAsync(10, TimeUnit.SECONDS); Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
聯鎖
基於Redis的Redisson分布式聯鎖RedissonMultiLock對象可以將多個RLock對象關聯為一個聯鎖,每個RLock對象實例可以來自於不同的Redisson實例。
RLock lock1 = redissonInstance1.getLock("lock1"); RLock lock2 = redissonInstance2.getLock("lock2"); RLock lock3 = redissonInstance3.getLock("lock3"); RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); // 同時加鎖:lock1 lock2 lock3 // 所有的鎖都上鎖成功才算成功。 lock.lock(); ... lock.unlock();
紅鎖
基於Redis的Redisson紅鎖RedissonRedLock對象實現了Redlock介紹的加鎖算法。該對象也可以用來將多個RLock對象關聯為一個紅鎖,每個RLock對象實例可以來自於不同的Redisson實例。
RLock lock1 = redissonInstance1.getLock("lock1"); RLock lock2 = redissonInstance2.getLock("lock2"); RLock lock3 = redissonInstance3.getLock("lock3"); RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); // 同時加鎖:lock1 lock2 lock3 // 紅鎖在大部分節點上加鎖成功就算成功。 lock.lock(); ... lock.unlock();
另外Redisson還通過加鎖的方法提供了leaseTime的參數來指定加鎖的時間。超過這個時間后鎖便自動解開了。
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); // 給lock1,lock2,lock3加鎖,如果沒有手動解開的話,10秒鍾后將會自動解開 lock.lock(10, TimeUnit.SECONDS); // 為加鎖等待100秒時間,並在加鎖成功10秒鍾后自動解開 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();
讀寫鎖
RReadWriteLock rwlock = redisson.getLock("anyRWLock"); // 最常見的使用方法 rwlock.readLock().lock(); // 或 rwlock.writeLock().lock();
信號量
RSemaphore semaphore = redisson.getSemaphore("semaphore"); semaphore.acquire(); //或 semaphore.acquireAsync(); semaphore.acquire(23); semaphore.tryAcquire(); //或 semaphore.tryAcquireAsync(); semaphore.tryAcquire(23, TimeUnit.SECONDS); //或 semaphore.tryAcquireAsync(23, TimeUnit.SECONDS); semaphore.release(10); semaphore.release(); //或 semaphore.releaseAsync();
可過期性信號量
基於Redis的Redisson可過期性信號量(PermitExpirableSemaphore)是在RSemaphore對象的基礎上,為每個信號增加了一個過期時間。每個信號可以通過獨立的ID來辨識,釋放時只能通過提交這個ID才能釋放。
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore"); String permitId = semaphore.acquire(); // 獲取一個信號,有效期只有2秒鍾。 String permitId = semaphore.acquire(2, TimeUnit.SECONDS); // ... semaphore.release(permitId);
閉鎖
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.trySetCount(1); latch.await(); // 在其他線程或其他JVM里 RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.countDown();
鎖的原理
在Redisson中,使用key來作為是否上鎖的標志,當通過getLock(String key)方法獲得相應的鎖之后,這個key即作為一個鎖存儲到Redis集群中,在接下來如果有其他的線程嘗試獲取名為key的鎖時,便會向集群中進行查詢,如果能夠查到這個鎖並發現相應的value的值不為0,則表示已經有其他線程申請了這個鎖同時還沒有釋放,則當前線程進入阻塞,否則由當前線程獲取這個鎖並將value值加一,如果是可重入鎖的話,則當前線程每獲得一個自身線程的鎖,就將value的值加一,而每釋放一個鎖則將value值減一,直到減至0,完全釋放這個鎖。因為底層是基於分布式的Redis集群,所以Redisson實現了分布式的鎖機制。
加鎖
在Redisson中,加鎖需要以下三個參數:
KEYS[1] :需要加鎖的key,這里需要是字符串類型。
ARGV[1] :鎖的超時時間,防止死鎖
ARGV[2] :鎖的唯一標識,id(UUID.randomUUID()) + “:” + threadId
Future tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, // 檢查是否key已經被占用,如果沒有則設置超時時間和唯一標識,初始化value=1 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 如果鎖重入,需要判斷鎖的key field 都一直情況下 value 加一 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //鎖重入重新設置超時時間 "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + // 返回剩余的過期時間 "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
解鎖
在Redisson中解鎖需要以下五個參數:
KEYS[1] :需要加鎖的key,這里需要是字符串類型。
KEYS[2] :redis消息的ChannelName,一個分布式鎖對應唯一的一個channelName:“redisson_lock__channel__{” + getName() + “}”
ARGV[1] :reids消息體,這里只需要一個字節的標記就可以,主要標記redis的key已經解鎖,再結合redis的Subscribe,能喚醒其他訂閱解鎖消息的客戶端線程申請鎖。
ARGV[2] :鎖的超時時間,防止死鎖
ARGV[3] :鎖的唯一標識,也就是剛才介紹的 id(UUID.randomUUID()) + “:” + threadId
public void unlock() { Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 如果key已經不存在,說明已經被解鎖,直接發布(publihs)redis消息 "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + // key和field不匹配,說明當前客戶端線程沒有持有鎖,不能主動解鎖。 "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 如果counter>0說明鎖在重入,不能刪除key "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + // 刪除key並且publish 解鎖消息 "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId())); if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } // 解鎖成功之后取消更新鎖expire的時間任務 if (opStatus) { cancelExpirationRenewal(); } }
注意點
Redisson 默認的 CommandExecutor 實現是通過 eval 命令來執行 Lua 腳本,所以要求 Redis 的版本必須為 2.6 或以上,否則可能要自己來實現