Redission源碼


redisson用的是netty的io框架,邏輯在channel的handler中

先看配置,以常用的哨兵模式為例,config.useSentinelServers()+Redisson.create(config)。Redisson的構造方法中,最重要的是給connectionManager屬性初始化,ConfigSupport.createConnectionManager(configCopy)---new SentinelConnectionManager,其中initTimer中有subscribeService的初始化,這個是訂閱用的。再往下,是先去哨兵地址的第一個,發請求獲取master的地址+其他哨兵的地址+salve的地址。再往下有initSingleEntry方法,這里實例化了一個MasterSlaveEntry,並為它實例化了一個RedisClient,這個client顧名思義是和master通信的。之后把這個entry放到connectionManager的client2entry和slotEntry中,到這里配置邏輯基本完成。

下面以加鎖的lock方法為例,看看請求是怎么發出去的。進入到tryAcquire方法,tryAcquireAsync返回一個future,這個future在get的時候會await,其實它是tryLockInnerAsync方法的返回值:ttlRemainingFuture,而一路點下去,又是evalAsync方法中的mainPromise,這個mainPromise會在響應到來之后執行mainPromise.trySuccess(res)方法,並在DefaultPromise的checkNotifyWaiters方法中notifyAll,喚醒剛才被await的線程(至於響應到來怎么讓mainPromise執行這個回調方法,以后再說)。redisson的源碼中頻繁的用到了異步的調用,看習慣了也就適應了,script用eval一次執行多個語句這個不說了,看evalAsync方法的async ---> executor.execute(),先用getConnection獲取連接 --- connectionManager.connectionWriteOp,先是獲取entry,從上面的slotEntry中取出上面說的MasterSlaveEntry---一直到ConnectionPool.acquireConnection,獲取freeConnectionsCounter的許可,獲取之后在回調里執行connectTo方法,從entry的freeConnections中試圖取出空閑連接,如果沒有,用client連接生成一個(至於為什么返回值是RedisConnection類型而不是channel類型,以后再說)。連接成功之后,執行回調,RedisExecutor的connectionFuture.onComplete方法,其實這時候返回的future已經是isDone的,所以剛添加到listener中的觀察者會被立即執行,執行到sendCommand方法,把attemptPromise封裝成CommandData,channel.writeAndFlush(data),到這里請求基本結束了。

至於響應的處理,和RocketMQ不同,前者發出請求前用請求ID作為key把回調放入map中,而redisson是用信號量來控制總訪問數,在到達最大值之前新建或者獲取空閑連接,響應處理完再還回去,這種處理方式個人感覺比較占資源。一個請求發出之后,所經過的handler都在RedisChannelInitializer的initChannel方法中,在其中的CommandsQueue的write---sendData中, ch.attr(CURRENT_COMMAND).set(data)一句把command放到ch的屬性中,而在響應到來的處理是在CommandDecoder中,decode方法中有QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get(),取出剛才的command,執行decodeXXX一直到handleResult,result = data.getCommand().getConvertor().convert(result);先取出convertor轉化結果,completeResponse方法再調用data.getPromise().trySuccess(result)喚醒線程並返回結果。這里的promise就是之前說的ttlRemainingFuture。

剛才說的client.connect返回一個RedisConnection類型,這個的邏輯還是去handler里面找,不過這次不是read和write了,而是channelRegistered,在RedisConnectionHandler的父類BaseConnectionHandler的channelRegistered方法中,createConnection新建一個RedisConnection,channel.attr(CONNECTION).set(this),把RedisConnection,放到channel的屬性中,而redisClient在connect的時候會取出這個RedisConnection,調用返回的future的trySuccess方法,把值設置進去。

再看subscribe,先判斷有沒有entryName(connectionManagerId + lockName)對應的RedissonLockEntry,如果有,說明已經訂閱直接放行進入下面的while。如果沒有生成一個新的,封裝一下,然后createListener生成一個listener並傳到PublishSubscribeService.subscribe方法中,其實這個方法才是關鍵,因為subscribe並不僅僅在lock方法中,redissonClient是隨時可以訂閱頻道的,比如getTopic.addListener就可以把自己的回調方法封裝成listener,傳到PublishSubscribeService.subscribe中。進入service.subscribe,先檢查PubSubConnectionEntry,同樣的,如果有了connEntry,那么說明已經訂閱了,直接把listener添加到entry的conn屬性中。如果沒有connEntry,那么需要在獲取freePubSubLock(限額1)的同步下,建立新鏈接,和tryaquire方法一樣,用的是connect方法----nextPubSubConnection----pubSubConnectionPool.get()----acquireConnection----connectTo---存在空閑的、已經和master建立起來的連接,就取出來,如果沒有,就createConnection----connectPubSub----connectPubSubAsync---- pubSubBootstrap.connect,這里和tryquire用的bootstrap是兩個strap,其他的邏輯都是一樣的。訂閱的話,會收到兩種消息,一個是訂閱成功的消息,一個是釋放鎖之后發布的消息。和tryaquire一樣,都要走encoder,不過此時的encoder是CommandPubSubDecoder了,在decodeCommand---decode,之前tryAquire的code是+,現在是*,所以進入decodeList----decodeResult----根據message的類型,放入隊列然后取出執行,如果是PubSubStatusMessage(subscribe/unsubscibe成功的響應),取出conn中的所有listener,執行onStatus方法,進入上面說的封裝的listener,發現onStatus是取出RedissonLockEntry中的promise的trySuccess方法,這個正好對應lock方法中的commandExecutor.syncSubscription(future)這一句同步方法的釋放;而PubSubMessage調用的是封裝的listener的onMessage方法---LockPubSub.onMessage---value.getLatch().release(),這里又正好對應lock方法中的getEntry(threadId).getLatch().tryAcquire的放行。

所以整個lock的邏輯就是:先去獲取鎖,如果獲取不到,就會獲取到當前鎖的過期時長,那么開始訂閱相應的channel,訂閱成功后進入while循環,進入之后,先再嘗試去獲取鎖,獲取不到就會返回當前鎖的過期時長,以此做為最大時長掛起當前線程,等待鎖釋放消息的發布,收到之后,再去獲取鎖……如此反復直到獲取到為止。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM