通過走讀Lettuce異步讀取源碼,針對Lettuce連接建立過程進行源碼走讀
總體展示一個Lettuce異步get時序
通過時序圖可以發現MasterSlaveChannelWriter主要提供一個負載分配的功能,並不是真正的命令發送服務
下面通過源碼分析實現過程
public static <K, V> StatefulRedisMasterSlaveConnection<K, V> connect(RedisClient redisClient, RedisCodec<K, V> codec, Iterable<RedisURI> redisURIs) { LettuceAssert.notNull(redisClient, "RedisClient must not be null"); LettuceAssert.notNull(codec, "RedisCodec must not be null"); LettuceAssert.notNull(redisURIs, "RedisURIs must not be null"); List<RedisURI> uriList = LettuceLists.newList(redisURIs); LettuceAssert.isTrue(!uriList.isEmpty(), "RedisURIs must not be empty"); if (isSentinel(uriList.get(0))) { return connectSentinel(redisClient, codec, uriList.get(0)); } else { return connectStaticMasterSlave(redisClient, codec, uriList); } }
private static <K, V> StatefulRedisMasterSlaveConnection<K, V> connectSentinel(RedisClient redisClient, RedisCodec<K, V> codec, RedisURI redisURI) { //創建拓撲提供者為哨兵拓撲 TopologyProvider topologyProvider = new SentinelTopologyProvider(redisURI.getSentinelMasterId(), redisClient, redisURI); //創建哨兵拓撲刷新服務 SentinelTopologyRefresh sentinelTopologyRefresh = new SentinelTopologyRefresh(redisClient, redisURI.getSentinelMasterId(), redisURI.getSentinels()); //利用拓撲提供者和redisClient創建主備拓撲刷新服務 MasterSlaveTopologyRefresh refresh = new MasterSlaveTopologyRefresh(redisClient, topologyProvider); //創建主備連接提供者 MasterSlaveConnectionProvider<K, V> connectionProvider = new MasterSlaveConnectionProvider<>(redisClient, codec, redisURI, Collections.emptyMap()); //使用主備拓撲刷新服務獲取所有節點將其設置到連接提供者中 connectionProvider.setKnownNodes(refresh.getNodes(redisURI)); //使用連接提供者創建主備通道寫入器 MasterSlaveChannelWriter<K, V> channelWriter = new MasterSlaveChannelWriter<>(connectionProvider); //創建連接 StatefulRedisMasterSlaveConnectionImpl<K, V> connection = new StatefulRedisMasterSlaveConnectionImpl<>(channelWriter, codec, redisURI.getTimeout()); connection.setOptions(redisClient.getOptions()); Runnable runnable = () -> { try { LOG.debug("Refreshing topology"); List<RedisNodeDescription> nodes = refresh.getNodes(redisURI); if (nodes.isEmpty()) { LOG.warn("Topology refresh returned no nodes from {}", redisURI); } LOG.debug("New topology: {}", nodes); connectionProvider.setKnownNodes(nodes); } catch (Exception e) { LOG.error("Error during background refresh", e); } }; try { //向連接注冊可關閉服務 connection.registerCloseables(new ArrayList<>(), sentinelTopologyRefresh); //綁定哨兵拓撲結構變化執行邏輯 sentinelTopologyRefresh.bind(runnable); } catch (RuntimeException e) { connection.close(); throw e; } return connection; }
public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) { super(writer, timeout); this.codec = codec; //創建異步步連接 this.async = newRedisAsyncCommandsImpl(); //創建同步連接 this.sync = newRedisSyncCommandsImpl(); //創建響應式連接 this.reactive = newRedisReactiveCommandsImpl(); }
protected RedisAsyncCommandsImpl<K, V> newRedisAsyncCommandsImpl() { //使用裝飾器模式對當前實例進行增強 return new RedisAsyncCommandsImpl<>(this, codec); }
public RedisAsyncCommandsImpl(StatefulRedisConnection<K, V> connection, RedisCodec<K, V> codec) { super(connection, codec); }
public AbstractRedisAsyncCommands(StatefulConnection<K, V> connection, RedisCodec<K, V> codec) { this.connection = connection; this.codec = codec; this.commandBuilder = new RedisCommandBuilder<>(codec); }
StatefulRedisConnectionImpl
@Override public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) { //前置處理 RedisCommand<K, V, T> toSend = preProcessCommand(command); try { //通過父類進行派發,父類中對writer為當前類對構造方法對入參 return super.dispatch(toSend); } finally { if (command.getType().name().equals(MULTI.name())) { multi = (multi == null ? new MultiOutput<>(codec) : multi); } } }
protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) { if (debugEnabled) { logger.debug("dispatching command {}", cmd); } //將發送命令對處理委派給channelWriter處理 return channelWriter.write(cmd); }
MasterSlaveChannelWriter
@Override public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) { LettuceAssert.notNull(command, "Command must not be null"); if (closed) { throw new RedisException("Connection is closed"); } //獲取命令意圖 Intent intent = getIntent(command.getType()); //根據讀寫意圖獲取連接 StatefulRedisConnection<K, V> connection = (StatefulRedisConnection) masterSlaveConnectionProvider .getConnection(intent); //通過這個connection派發命令 return connection.dispatch(command); }
//根據意圖獲取連接 public StatefulRedisConnection<K, V> getConnection(Intent intent) { if (debugEnabled) { logger.debug("getConnection(" + intent + ")"); } //如果readFrom不為null且是READ if (readFrom != null && intent == Intent.READ) { //根據readFrom配置從已知節點中選擇可用節點描述 List<RedisNodeDescription> selection = readFrom.select(new ReadFrom.Nodes() { @Override public List<RedisNodeDescription> getNodes() { return knownNodes; } @Override public Iterator<RedisNodeDescription> iterator() { return knownNodes.iterator(); } }); //如果可選擇節點集合為空則拋出異常 if (selection.isEmpty()) { throw new RedisException(String.format("Cannot determine a node to read (Known nodes: %s) with setting %s", knownNodes, readFrom)); } try { //遍歷所有可用節點 for (RedisNodeDescription redisNodeDescription : selection) { //獲取節點連接 StatefulRedisConnection<K, V> readerCandidate = getConnection(redisNodeDescription); //如果節點連接不是打開到連接則繼續查找下一個連接 if (!readerCandidate.isOpen()) { continue; } //返回可用連接 return readerCandidate; } //如果沒有找到可用連接,默認返回第一個 return getConnection(selection.get(0)); } catch (RuntimeException e) { throw new RedisException(e); } } //如果沒有配置readFrom或者不是READ 則返回master連接 return getConnection(getMaster()); }
protected StatefulRedisConnection<K, V> getConnection(RedisNodeDescription redisNodeDescription) { //如果沒有則創建新節點,並添加到緩存中 return connections.computeIfAbsent( new ConnectionKey(redisNodeDescription.getUri().getHost(), redisNodeDescription.getUri().getPort()), connectionFactory); }
創建實際的connectio
@Override public StatefulRedisConnection<K, V> apply(ConnectionKey key) { //構建URI RedisURI.Builder builder = RedisURI.Builder .redis(key.host, key.port) .withSsl(initialRedisUri.isSsl()) .withVerifyPeer(initialRedisUri.isVerifyPeer()) .withStartTls(initialRedisUri.isStartTls()); if (initialRedisUri.getPassword() != null && initialRedisUri.getPassword().length != 0) { builder.withPassword(initialRedisUri.getPassword()); } if (initialRedisUri.getClientName() != null) { builder.withClientName(initialRedisUri.getClientName()); } builder.withDatabase(initialRedisUri.getDatabase()); //創建連接 StatefulRedisConnection<K, V> connection = redisClient.connect(redisCodec, builder.build()); //設置是否自動提交 synchronized (stateLock) { connection.setAutoFlushCommands(autoFlushCommands); } return connection; }
public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec, RedisURI redisURI) { assertNotNull(redisURI); return connectStandalone(codec, redisURI, redisURI.getTimeout()); }
private <K, V> StatefulRedisConnection<K, V> connectStandalone(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) { //單機異步 ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStandaloneAsync(codec, redisURI, timeout); //獲取連接 return getConnection(future); }
private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) { assertNotNull(codec); //檢查URI是否有效 checkValidRedisURI(redisURI); logger.debug("Trying to get a Redis connection for: " + redisURI); //創建DefaultEndpoint DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions); //創建connection StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(endpoint, codec, timeout); //創建連接 ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI, () -> new CommandHandler(clientOptions, clientResources, endpoint)); future.whenComplete((channelHandler, throwable) -> { if (throwable != null) { connection.close(); } }); return future; }
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection, DefaultEndpoint endpoint, RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) { //connetion構造器 ConnectionBuilder connectionBuilder; if (redisURI.isSsl()) { SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder(); sslConnectionBuilder.ssl(redisURI); connectionBuilder = sslConnectionBuilder; } else { connectionBuilder = ConnectionBuilder.connectionBuilder(); } //設置connection connectionBuilder.connection(connection); //設置客戶端選項 connectionBuilder.clientOptions(clientOptions); //設置客戶端資源 connectionBuilder.clientResources(clientResources); //設置命令處理器以及endpoint connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint); //填充連接構造器 connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI); //設置通道類型 channelType(connectionBuilder, redisURI); if (clientOptions.isPingBeforeActivateConnection()) { if (hasPassword(redisURI)) { connectionBuilder.enableAuthPingBeforeConnect(); } else { connectionBuilder.enablePingBeforeConnect(); } } //創建異步通道 ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder); //如果客戶端選項配置了pingBeforeActivateConnection同時有密碼 if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) { future = future.thenApplyAsync(channelHandler -> { connection.async().auth(new String(redisURI.getPassword())); return channelHandler; }, clientResources.eventExecutorGroup()); } if (LettuceStrings.isNotEmpty(redisURI.getClientName())) { future.thenApply(channelHandler -> { connection.setClientName(redisURI.getClientName()); return channelHandler; }); } if (redisURI.getDatabase() != 0) { future = future.thenApplyAsync(channelHandler -> { connection.async().select(redisURI.getDatabase()); return channelHandler; }, clientResources.eventExecutorGroup()); } return future.thenApply(channelHandler -> (S) connection); }
/** * 連接同時通過connectionBuilder初始化一個通道 */ @SuppressWarnings("unchecked") protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync( ConnectionBuilder connectionBuilder) { //獲取socketAddress SocketAddress redisAddress = connectionBuilder.socketAddress(); //如果線程池關閉則拋出異常 if (clientResources.eventExecutorGroup().isShuttingDown()) { throw new IllegalStateException("Cannot connect, Event executor group is terminated."); } logger.debug("Connecting to Redis at {}", redisAddress); CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>(); //獲取bootstrap Bootstrap redisBootstrap = connectionBuilder.bootstrap(); //創建redis通道初始化器 RedisChannelInitializer initializer = connectionBuilder.build(); //設置netty的處理器 redisBootstrap.handler(initializer); //netty自定設置處理 clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap); CompletableFuture<Boolean> initFuture = initializer.channelInitialized(); ChannelFuture connectFuture = redisBootstrap.connect(redisAddress); //增加監聽器 connectFuture.addListener(future -> { if (!future.isSuccess()) { logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause()); connectionBuilder.endpoint().initialState(); channelReadyFuture.completeExceptionally(future.cause()); return; } initFuture.whenComplete((success, throwable) -> { if (throwable == null) { logger.debug("Connecting to Redis at {}: Success", redisAddress); RedisChannelHandler<?, ?> connection = connectionBuilder.connection(); connection.registerCloseables(closeableResources, connection); channelReadyFuture.complete(connectFuture.channel()); return; } logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable); connectionBuilder.endpoint().initialState(); Throwable failure; if (throwable instanceof RedisConnectionException) { failure = throwable; } else if (throwable instanceof TimeoutException) { failure = new RedisConnectionException("Could not initialize channel within " + connectionBuilder.getTimeout(), throwable); } else { failure = throwable; } channelReadyFuture.completeExceptionally(failure); CompletableFuture<Boolean> response = new CompletableFuture<>(); response.completeExceptionally(failure); }); }); return new DefaultConnectionFuture<T>(redisAddress, channelReadyFuture.thenApply(channel -> (T) connectionBuilder .connection())); }
connectionBuilder.build()
public RedisChannelInitializer build() { return new PlainChannelInitializer(pingCommandSupplier, this::buildHandlers, clientResources, timeout); }
protected List<ChannelHandler> buildHandlers() { LettuceAssert.assertState(channelGroup != null, "ChannelGroup must be set"); LettuceAssert.assertState(connectionEvents != null, "ConnectionEvents must be set"); LettuceAssert.assertState(connection != null, "Connection must be set"); LettuceAssert.assertState(clientResources != null, "ClientResources must be set"); LettuceAssert.assertState(endpoint != null, "Endpoint must be set"); List<ChannelHandler> handlers = new ArrayList<>(); //設置clientOptions connection.setOptions(clientOptions); //添加channel監聽器 handlers.add(new ChannelGroupListener(channelGroup)); //添加命令編碼器 handlers.add(new CommandEncoder()); //添加commandHander handlers.add(commandHandlerSupplier.get()); //如果設置自動重連,則設置看門狗處理器 if (clientOptions.isAutoReconnect()) { handlers.add(createConnectionWatchdog()); } //設置connectionEvenTrigger handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus())); if (clientOptions.isAutoReconnect()) { handlers.add(createConnectionWatchdog()); } return handlers; }
看門狗處理器到作用就是在通道斷開是進行重連
protected ConnectionWatchdog createConnectionWatchdog() { //如果看門狗不為null直接返回 if (connectionWatchdog != null) { return connectionWatchdog; } LettuceAssert.assertState(bootstrap != null, "Bootstrap must be set for autoReconnect=true"); LettuceAssert.assertState(timer != null, "Timer must be set for autoReconnect=true"); LettuceAssert.assertState(socketAddressSupplier != null, "SocketAddressSupplier must be set for autoReconnect=true"); //創建連接看門狗 ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap, timer, clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener, connection); //向endpoint注冊看門狗 endpoint.registerConnectionWatchdog(watchdog); connectionWatchdog = watchdog; return watchdog; }
lass PlainChannelInitializer extends io.netty.channel.ChannelInitializer<Channel> implements RedisChannelInitializer { //不ping final static Supplier<AsyncCommand<?, ?, ?>> NO_PING = () -> null; //處理器提供器 private final Supplier<List<ChannelHandler>> handlers; //ping命令提供器 private final Supplier<AsyncCommand<?, ?, ?>> pingCommandSupplier; private final ClientResources clientResources; //超時時間 private final Duration timeout; private volatile CompletableFuture<Boolean> initializedFuture = new CompletableFuture<>(); PlainChannelInitializer(Supplier<AsyncCommand<?, ?, ?>> pingCommandSupplier, Supplier<List<ChannelHandler>> handlers, ClientResources clientResources, Duration timeout) { this.pingCommandSupplier = pingCommandSupplier; this.handlers = handlers; this.clientResources = clientResources; this.timeout = timeout; } @Override protected void initChannel(Channel channel) throws Exception { //如果pipeline中沒有配置channelActivator則需要添加channelActivator處理器 if (channel.pipeline().get("channelActivator") == null) { channel.pipeline().addLast("channelActivator", new RedisChannelInitializerImpl() { private AsyncCommand<?, ?, ?> pingCommand; @Override public CompletableFuture<Boolean> channelInitialized() { return initializedFuture; } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //如果通道斷開連接 clientResources.eventBus().publish(new DisconnectedEvent(local(ctx), remote(ctx))); //如果初始化沒有完成則拋出異常 if (!initializedFuture.isDone()) { initializedFuture.completeExceptionally(new RedisConnectionException("Connection closed prematurely")); } initializedFuture = new CompletableFuture<>(); pingCommand = null; super.channelInactive(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof ConnectionEvents.Activated) { if (!initializedFuture.isDone()) { initializedFuture.complete(true); clientResources.eventBus().publish(new ConnectionActivatedEvent(local(ctx), remote(ctx))); } } super.userEventTriggered(ctx, evt); } @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { //通過事件總線發送連接事件 clientResources.eventBus().publish(new ConnectedEvent(local(ctx), remote(ctx))); //如果ping命令提供器不是NO_PING則發送執行ping if (pingCommandSupplier != NO_PING) { pingCommand = pingCommandSupplier.get(); pingBeforeActivate(pingCommand, initializedFuture, ctx, clientResources, timeout); } else { super.channelActive(ctx); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (!initializedFuture.isDone()) { initializedFuture.completeExceptionally(cause); } super.exceptionCaught(ctx, cause); } }); } //將hanler提供器提供的處理器添加到channel中 for (ChannelHandler handler : handlers.get()) { channel.pipeline().addLast(handler); } clientResources.nettyCustomizer().afterChannelInitialized(channel); } static void pingBeforeActivate(AsyncCommand<?, ?, ?> cmd, CompletableFuture<Boolean> initializedFuture, ChannelHandlerContext ctx, ClientResources clientResources, Duration timeout) throws Exception { ctx.fireUserEventTriggered(new PingBeforeActivate(cmd)); Runnable timeoutGuard = () -> { if (cmd.isDone() || initializedFuture.isDone()) { return; } initializedFuture.completeExceptionally(new RedisCommandTimeoutException(String.format( "Cannot initialize channel (PING before activate) within %s", timeout))); }; Timeout timeoutHandle = clientResources.timer().newTimeout(t -> { if (clientResources.eventExecutorGroup().isShuttingDown()) { timeoutGuard.run(); return; } clientResources.eventExecutorGroup().submit(timeoutGuard); }, timeout.toNanos(), TimeUnit.NANOSECONDS); cmd.whenComplete((o, throwable) -> { timeoutHandle.cancel(); if (throwable == null) { ctx.fireChannelActive(); initializedFuture.complete(true); } else { initializedFuture.completeExceptionally(throwable); } }); } @Override public CompletableFuture<Boolean> channelInitialized() { return initializedFuture; } }