Redis實現分布式鎖的原理
前面講了Redis在實際業務場景中的應用,那么下面再來了解一下Redisson功能性場景的應用,也就是大家經常使用的分布式鎖的實現場景。
-
引入redisson依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.16.0</version> </dependency>
-
編寫簡單的測試代碼
public class RedissonTest { private static RedissonClient redissonClient; static { Config config=new Config(); config.useSingleServer().setAddress("redis://192.168.221.128:6379"); redissonClient=Redisson.create(config); } public static void main(String[] args) throws InterruptedException { RLock rLock=redissonClient.getLock("updateOrder"); //最多等待100秒、上鎖10s以后自動解鎖 if(rLock.tryLock(100,10,TimeUnit.SECONDS)){ System.out.println("獲取鎖成功"); } Thread.sleep(2000); rLock.unlock(); redissonClient.shutdown(); } }
Redisson分布式鎖的實現原理
你們會發現,通過redisson,非常簡單就可以實現我們所需要的功能,當然這只是redisson的冰山一角,redisson最強大的地方就是提供了分布式特性的常用工具類。使得原本作為協調單機多線程並發程序的並發程序的工具包獲得了協調分布式多級多線程並發系統的能力,降低了程序員在分布式環境下解決分布式問題的難度,下面分析一下RedissonLock的實現原理
RedissonLock.tryLock
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
//通過tryAcquire方法嘗試獲取鎖
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) { //表示成功獲取到鎖,直接返回
return true;
}
//省略部分代碼....
}
tryAcquire
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
//leaseTime就是租約時間,就是redis key的過期時間。
if (leaseTime != -1) { //如果設置過期時間
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {//如果沒設置了過期時間,則從配置中獲取key超時時間,默認是30s過期
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
//當tryLockInnerAsync執行結束后,觸發下面回調
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) { //說明出現異常,直接返回
return;
}
// lock acquired
if (ttlRemaining == null) { //表示第一次設置鎖鍵
if (leaseTime != -1) { //表示設置過超時時間,更新internalLockLeaseTime,並返回
internalLockLeaseTime = unit.toMillis(leaseTime);
} else { //leaseTime=-1,啟動Watch Dog
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
tryLockInnerAsync
通過lua腳本來實現加鎖的操作
-
判斷lock鍵是否存在,不存在直接調用hset存儲當前線程信息並且設置過期時間,返回nil,告訴客戶端直接獲取到鎖。
-
判斷lock鍵是否存在,存在則將重入次數加1,並重新設置過期時間,返回nil,告訴客戶端直接獲取到鎖。
-
被其它線程已經鎖定,返回鎖有效期的剩余時間,告訴客戶端需要等待。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
關於Lua腳本,我們稍后再解釋。
unlock釋放鎖流程
釋放鎖的流程,腳本看起來會稍微復雜一點
-
如果lock鍵不存在,通過
publish
指令發送一個消息表示鎖已經可用。 -
如果鎖不是被當前線程鎖定,則返回nil
-
由於支持可重入,在解鎖時將重入次數需要減1
-
如果計算后的重入次數>0,則重新設置過期時間
-
如果計算后的重入次數<=0,則發消息說鎖已經可用
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
RedissonLock有競爭的情況
有競爭的情況在redis端的lua腳本是相同的,只是不同的條件執行不同的redis命令。當通過tryAcquire發現鎖被其它線程申請時,需要進入等待競爭邏輯中
-
this.await返回false,說明等待時間已經超出獲取鎖最大等待時間,取消訂閱並返回獲取鎖失敗
-
this.await返回true,進入循環嘗試獲取鎖。
繼續看RedissonLock.tryLock后半部分代碼如下:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//省略部分代碼
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
// 訂閱監聽redis消息,並且創建RedissonLockEntry
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 阻塞等待subscribe的future的結果對象,如果subscribe方法調用超過了time,說明已經超過了客戶端設置的最大wait time,則直接返回false,取消訂閱,不再繼續申請鎖了。
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) { //取消訂閱
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId); //表示搶占鎖失敗
return false; //返回false
}
try {
//判斷是否超時,如果等待超時,返回獲的鎖失敗
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
//通過while循環再次嘗試競爭鎖
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId); //競爭鎖,返回鎖超時時間
// lock acquired
if (ttl == null) { //如果超時時間為null,說明獲得鎖成功
return true;
}
//判斷是否超時,如果超時,表示獲取鎖失敗
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 通過信號量(共享鎖)阻塞,等待解鎖消息. (減少申請鎖調用的頻率)
// 如果剩余時間(ttl)小於wait time ,就在 ttl 時間內,從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可)。
// 否則就在wait time 時間范圍內等待可以通過信號量
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 更新等待時間(最大等待時間-已經消耗的阻塞時間)
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) { //獲取鎖失敗
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId); //取消訂閱
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
鎖過期了怎么辦?
一般來說,我們去獲得分布式鎖時,為了避免死鎖的情況,我們會對鎖設置一個超時時間,但是有一種情況是,如果在指定時間內當前線程沒有執行完,由於鎖超時導致鎖被釋放,那么其他線程就會拿到這把鎖,從而導致一些故障。
為了避免這種情況,Redisson引入了一個Watch Dog機制,這個機制是針對分布式鎖來實現鎖的自動續約,簡單來說,如果當前獲得鎖的線程沒有執行完,那么Redisson會自動給Redis中目標key延長超時時間。
默認情況下,看門狗的續期時間是30s,也可以通過修改Config.lockWatchdogTimeout來另行指定。
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit); //leaseTime=-1
}
實際上,當我們通過tryLock方法沒有傳遞超時時間時,默認會設置一個30s的超時時間,避免出現死鎖的問題。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else { //當leaseTime為-1時,leaseTime=internalLockLeaseTime,默認是30s,表示當前鎖的過期時間。
//this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) { //說明出現異常,直接返回
return;
}
// lock acquired
if (ttlRemaining == null) { //表示第一次設置鎖鍵
if (leaseTime != -1) { //表示設置過超時時間,更新internalLockLeaseTime,並返回
internalLockLeaseTime = unit.toMillis(leaseTime);
} else { //leaseTime=-1,啟動Watch Dog
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
由於默認設置了一個30s的過期時間,為了防止過期之后當前線程還未執行完,所以通過定時任務對過期時間進行續約。
- 首先,會先判斷在expirationRenewalMap中是否存在了entryName,這是個map結構,主要還是判斷在這個服務實例中的加鎖客戶端的鎖key是否存在,
- 如果已經存在了,就直接返回;主要是考慮到RedissonLock是可重入鎖。
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {// 第一次加鎖的時候會調用,內部會啟動WatchDog
entry.addThreadId(threadId);
renewExpiration();
}
}
定義一個定時任務,該任務中調用renewExpirationAsync
方法進行續約。
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//用到了時間輪機制
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// renewExpirationAsync續約租期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//每次間隔租期的1/3時間執行
ee.setTimeout(task);
}
執行Lua腳本,對指定的key進行續約。
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
Lua腳本
Lua是一個高效的輕量級腳本語言(和JavaScript類似),用標准C語言編寫並以源代碼形式開放, 其設計目的是為了嵌入應用程序中,從而為應用程序提供靈活的擴展和定制功能。Lua在葡萄牙語中是“月亮”的意思,它的logo形式衛星,寓意是Lua是一個“衛星語言”,能夠方便地嵌入到其他語言中使用;其實在很多常見的框架中,都有嵌入Lua腳本的功能,比如OpenResty、Redis等。
使用Lua腳本的好處:
-
減少網絡開銷,在Lua腳本中可以把多個命令放在同一個腳本中運行
-
原子操作,redis會將整個腳本作為一個整體執行,中間不會被其他命令插入。換句話說,編寫腳本的過程中無需擔心會出現競態條件
-
復用性,客戶端發送的腳本會永遠存儲在redis中,這意味着其他客戶端可以復用這一腳本來完成同樣的邏輯
Lua的下載和安裝
Lua是一個獨立的腳本語言,所以它有專門的編譯執行工具,下面簡單帶大家安裝一下。
-
下載Lua源碼包: https://www.lua.org/download.html
-
安裝步驟如下
tar -zxvf lua-5.4.3.tar.gz cd lua-5.4.3 make linux make install
如果報錯,說找不到readline/readline.h, 可以通過yum命令安裝
yum -y install readline-devel ncurses-devel
最后,直接輸入lua
命令即可進入lua的控制台。Lua腳本有自己的語法、變量、邏輯運算符、函數等,這塊我就不在這里做過多的說明,用過JavaScript的同學,應該只需要花幾個小時就可以全部學完,簡單演示兩個案例如下。
array = {"Lua", "mic"}
for i= 0, 2 do
print(array[i])
end
array = {"mic", "redis"}
for key,value in ipairs(array)
do
print(key, value)
end
Redis與Lua
Redis中集成了Lua的編譯和執行器,所以我們可以在Redis中定義Lua腳本去執行。同時,在Lua腳本中,可以直接調用Redis的命令,來操作Redis中的數據。
redis.call(‘set’,'hello','world')
local value=redis.call(‘get’,’hello’)
redis.call 函數的返回值就是redis命令的執行結果,前面我們介紹過redis的5中類型的數據返回的值的類型也都不一樣,redis.call函數會將這5種類型的返回值轉化對應的Lua的數據類型
在很多情況下我們都需要腳本可以有返回值,畢竟這個腳本也是一個我們所編寫的命令集,我們可以像調用其他redis內置命令一樣調用我們自己寫的腳本,所以同樣redis會自動將腳本返回值的Lua數據類型轉化為Redis的返回值類型。 在腳本中可以使用return 語句將值返回給redis客戶端,通過return語句來執行,如果沒有執行return,默認返回為nil。
Redis中執行Lua腳本相關的命令
編寫完腳本后最重要的就是在程序中執行腳本。Redis提供了EVAL命令可以使開發者像調用其他Redis內置命令一樣調用腳本。
EVAL命令-執行腳本
[EVAL] [腳本內容] [key參數的數量] [key …] [arg …]
可以通過key和arg這兩個參數向腳本中傳遞數據,他們的值可以在腳本中分別使用KEYS和ARGV 這兩個類型的全局變量訪問。
比如我們通過腳本實現一個set命令,通過在redis客戶端中調用,那么執行的語句是:
eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello
上述腳本相當於使用Lua腳本調用了Redis的set
命令,存儲了一個key=lua,value=hello到Redis中。
EVALSHA命令
考慮到我們通過eval執行lua腳本,腳本比較長的情況下,每次調用腳本都需要把整個腳本傳給redis,比較占用帶寬。為了解決這個問題,redis提供了EVALSHA命令允許開發者通過腳本內容的SHA1摘要來執行腳本。該命令的用法和EVAL一樣,只不過是將腳本內容替換成腳本內容的SHA1摘要
-
Redis在執行EVAL命令時會計算腳本的SHA1摘要並記錄在腳本緩存中
-
執行EVALSHA命令時Redis會根據提供的摘要從腳本緩存中查找對應的腳本內容,如果找到了就執行腳本,否則返回“NOSCRIPT No matching script,Please use EVAL”
# 將腳本加入緩存並生成sha1命令
script load "return redis.call('get','lua')"
# ["13bd040587b891aedc00a72458cbf8588a27df90"]
# 傳遞sha1的值來執行該命令
evalsha "13bd040587b891aedc00a72458cbf8588a27df90" 0
Redisson執行Lua腳本
通過lua腳本來實現一個訪問頻率限制功能。
思路,定義一個key,key中包含ip地址。 value為指定時間內的訪問次數,比如說是10秒內只能訪問3次。
-
定義Lua腳本。
local times=redis.call('incr',KEYS[1]) -- 如果是第一次進來,設置一個過期時間 if times == 1 then redis.call('expire',KEYS[1],ARGV[1]) end -- 如果在指定時間內訪問次數大於指定次數,則返回0,表示訪問被限制 if times > tonumber(ARGV[2]) then return 0 end -- 返回1,允許被訪問 return 1
-
定義controller,提供訪問測試方法
@RestController public class RedissonController { @Autowired RedissonClient redissonClient; private final String LIMIT_LUA= "local times=redis.call('incr',KEYS[1])\n" + "if times == 1 then\n" + " redis.call('expire',KEYS[1],ARGV[1])\n" + "end\n" + "if times > tonumber(ARGV[2]) then\n" + " return 0\n" + "end\n" + "return 1"; @GetMapping("/lua/{id}") public String lua(@PathVariable("id") Integer id) throws ExecutionException, InterruptedException { List<Object> keys= Arrays.asList("LIMIT:"+id); RFuture<Object> future=redissonClient.getScript(). evalAsync(RScript.Mode.READ_WRITE,LIMIT_LUA, RScript.ReturnType.INTEGER,keys,10,3); return future.get().toString(); } }
需要注意,上述腳本執行的時候會有問題,因為redis默認的序列化方式導致value的值在傳遞到腳本中時,轉成了對象類型,需要修改redisson.yml
文件,增加codec的序列化方式。
-
application.yml
spring: redis: redisson: file: classpath:redisson.yml
-
redisson.yml
singleServerConfig: address: redis://192.168.221.128:6379 codec: !<org.redisson.codec.JsonJacksonCodec> {}
Lua腳本的原子性
redis的腳本執行是原子的,即腳本執行期間Redis不會執行其他命令。所有的命令必須等待腳本執行完以后才能執行。為了防止某個腳本執行時間過程導致Redis無法提供服務。Redis提供了lua-time-limit參數限制腳本的最長運行時間。默認是5秒鍾。
非事務性操作
當腳本運行時間超過這個限制后,Redis將開始接受其他命令但不會執行(以確保腳本的原子性),而是返回BUSY的錯誤,下面演示一下這種情況。
打開兩個客戶端窗口,在第一個窗口中執行lua腳本的死循環
eval "while true do end" 0
在第二個窗口中運行get lua
,會得到如下的異常。
(error) BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.
我們會發現執行結果是Busy, 接着我們通過script kill 的命令終止當前執行的腳本,第二個窗口的顯示又恢復正常了。
存在事務性操作
如果當前執行的Lua腳本對Redis的數據進行了修改(SET、DEL等),那么通過SCRIPT KILL 命令是不能終止腳本運行的,因為要保證腳本運行的原子性,如果腳本執行了一部分終止,那就違背了腳本原子性的要求。最終要保證腳本要么都執行,要么都不執行
同樣打開兩個窗口,第一個窗口運行如下命令
eval "redis.call('set','name','mic') while true do end" 0
在第二個窗口運行
get lua
結果一樣,仍然是busy,但是這個時候通過script kill命令,會發現報錯,沒辦法kill。
(error) UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.
遇到這種情況,只能通過shutdown nosave命令來強行終止redis。
shutdown nosave和shutdown的區別在於 shutdown nosave不會進行持久化操作,意味着發生在上一次快照后的數據庫修改都會丟失。
Redisson的Lua腳本
了解了lua之后,我們再回過頭來看看Redisson的Lua腳本,就不難理解了。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
Redis中的Pub/Sub機制
下面是Redisson中釋放鎖的代碼,在代碼中我們發現一個publish的指令redis.call('publish', KEYS[2], ARGV[1])
,這個指令是干啥的呢?
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
Redis提供了一組命令可以讓開發者實現“發布/訂閱”模式(publish/subscribe) . 該模式同樣可以實現進程間的消息傳遞,它的實現原理是:
-
發布/訂閱模式包含兩種角色,分別是發布者和訂閱者。訂閱者可以訂閱一個或多個頻道,而發布者可以向指定的頻道發送消息,所有訂閱此頻道的訂閱者都會收到該消息
-
發布者發布消息的命令是PUBLISH, 用法是
PUBLISH channel message
比如向channel.1發一條消息:hello
PUBLISH channel.1 “hello”
這樣就實現了消息的發送,該命令的返回值表示接收到這條消息的訂閱者數量。因為在執行這條命令的時候還沒有訂閱者訂閱該頻道,所以返回為0. 另外值得注意的是消息發送出去不會持久化,如果發送之前沒有訂閱者,那么后續再有訂閱者訂閱該頻道,之前的消息就收不到了
訂閱者訂閱消息的命令是:
SUBSCRIBE channel [channel …]
該命令同時可以訂閱多個頻道,比如訂閱channel.1的頻道:SUBSCRIBE channel.1,執行SUBSCRIBE命令后客戶端會進入訂閱狀態。
一般情況下,我們不會用pub/sub來做消息發送機制,畢竟有這么多MQ技術在。
關注[跟着Mic學架構]公眾號,獲取更多精品原創