通過走讀Lettuce異步讀取源碼,針對Lettuce連接建立過程進行源碼走讀
總體展示一個Lettuce異步get時序

通過時序圖可以發現MasterSlaveChannelWriter主要提供一個負載分配的功能,並不是真正的命令發送服務
下面通過源碼分析實現過程
public static <K, V> StatefulRedisMasterSlaveConnection<K, V> connect(RedisClient redisClient, RedisCodec<K, V> codec,
Iterable<RedisURI> redisURIs) {
LettuceAssert.notNull(redisClient, "RedisClient must not be null");
LettuceAssert.notNull(codec, "RedisCodec must not be null");
LettuceAssert.notNull(redisURIs, "RedisURIs must not be null");
List<RedisURI> uriList = LettuceLists.newList(redisURIs);
LettuceAssert.isTrue(!uriList.isEmpty(), "RedisURIs must not be empty");
if (isSentinel(uriList.get(0))) {
return connectSentinel(redisClient, codec, uriList.get(0));
} else {
return connectStaticMasterSlave(redisClient, codec, uriList);
}
}
private static <K, V> StatefulRedisMasterSlaveConnection<K, V> connectSentinel(RedisClient redisClient,
RedisCodec<K, V> codec, RedisURI redisURI) {
//創建拓撲提供者為哨兵拓撲
TopologyProvider topologyProvider = new SentinelTopologyProvider(redisURI.getSentinelMasterId(), redisClient, redisURI);
//創建哨兵拓撲刷新服務
SentinelTopologyRefresh sentinelTopologyRefresh = new SentinelTopologyRefresh(redisClient,
redisURI.getSentinelMasterId(), redisURI.getSentinels());
//利用拓撲提供者和redisClient創建主備拓撲刷新服務
MasterSlaveTopologyRefresh refresh = new MasterSlaveTopologyRefresh(redisClient, topologyProvider);
//創建主備連接提供者
MasterSlaveConnectionProvider<K, V> connectionProvider = new MasterSlaveConnectionProvider<>(redisClient, codec,
redisURI, Collections.emptyMap());
//使用主備拓撲刷新服務獲取所有節點將其設置到連接提供者中
connectionProvider.setKnownNodes(refresh.getNodes(redisURI));
//使用連接提供者創建主備通道寫入器
MasterSlaveChannelWriter<K, V> channelWriter = new MasterSlaveChannelWriter<>(connectionProvider);
//創建連接
StatefulRedisMasterSlaveConnectionImpl<K, V> connection = new StatefulRedisMasterSlaveConnectionImpl<>(channelWriter,
codec, redisURI.getTimeout());
connection.setOptions(redisClient.getOptions());
Runnable runnable = () -> {
try {
LOG.debug("Refreshing topology");
List<RedisNodeDescription> nodes = refresh.getNodes(redisURI);
if (nodes.isEmpty()) {
LOG.warn("Topology refresh returned no nodes from {}", redisURI);
}
LOG.debug("New topology: {}", nodes);
connectionProvider.setKnownNodes(nodes);
} catch (Exception e) {
LOG.error("Error during background refresh", e);
}
};
try {
//向連接注冊可關閉服務
connection.registerCloseables(new ArrayList<>(), sentinelTopologyRefresh);
//綁定哨兵拓撲結構變化執行邏輯
sentinelTopologyRefresh.bind(runnable);
} catch (RuntimeException e) {
connection.close();
throw e;
}
return connection;
}
public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) {
super(writer, timeout);
this.codec = codec;
//創建異步步連接
this.async = newRedisAsyncCommandsImpl();
//創建同步連接
this.sync = newRedisSyncCommandsImpl();
//創建響應式連接
this.reactive = newRedisReactiveCommandsImpl();
}
protected RedisAsyncCommandsImpl<K, V> newRedisAsyncCommandsImpl() {
//使用裝飾器模式對當前實例進行增強
return new RedisAsyncCommandsImpl<>(this, codec);
}
public RedisAsyncCommandsImpl(StatefulRedisConnection<K, V> connection, RedisCodec<K, V> codec) {
super(connection, codec);
}
public AbstractRedisAsyncCommands(StatefulConnection<K, V> connection, RedisCodec<K, V> codec) {
this.connection = connection;
this.codec = codec;
this.commandBuilder = new RedisCommandBuilder<>(codec);
}
StatefulRedisConnectionImpl
@Override
public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {
//前置處理
RedisCommand<K, V, T> toSend = preProcessCommand(command);
try {
//通過父類進行派發,父類中對writer為當前類對構造方法對入參
return super.dispatch(toSend);
} finally {
if (command.getType().name().equals(MULTI.name())) {
multi = (multi == null ? new MultiOutput<>(codec) : multi);
}
}
}
protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
if (debugEnabled) {
logger.debug("dispatching command {}", cmd);
}
//將發送命令對處理委派給channelWriter處理
return channelWriter.write(cmd);
}
MasterSlaveChannelWriter
@Override
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
LettuceAssert.notNull(command, "Command must not be null");
if (closed) {
throw new RedisException("Connection is closed");
}
//獲取命令意圖
Intent intent = getIntent(command.getType());
//根據讀寫意圖獲取連接
StatefulRedisConnection<K, V> connection = (StatefulRedisConnection) masterSlaveConnectionProvider
.getConnection(intent);
//通過這個connection派發命令
return connection.dispatch(command);
}
//根據意圖獲取連接
public StatefulRedisConnection<K, V> getConnection(Intent intent) {
if (debugEnabled) {
logger.debug("getConnection(" + intent + ")");
}
//如果readFrom不為null且是READ
if (readFrom != null && intent == Intent.READ) {
//根據readFrom配置從已知節點中選擇可用節點描述
List<RedisNodeDescription> selection = readFrom.select(new ReadFrom.Nodes() {
@Override
public List<RedisNodeDescription> getNodes() {
return knownNodes;
}
@Override
public Iterator<RedisNodeDescription> iterator() {
return knownNodes.iterator();
}
});
//如果可選擇節點集合為空則拋出異常
if (selection.isEmpty()) {
throw new RedisException(String.format("Cannot determine a node to read (Known nodes: %s) with setting %s",
knownNodes, readFrom));
}
try {
//遍歷所有可用節點
for (RedisNodeDescription redisNodeDescription : selection) {
//獲取節點連接
StatefulRedisConnection<K, V> readerCandidate = getConnection(redisNodeDescription);
//如果節點連接不是打開到連接則繼續查找下一個連接
if (!readerCandidate.isOpen()) {
continue;
}
//返回可用連接
return readerCandidate;
}
//如果沒有找到可用連接,默認返回第一個
return getConnection(selection.get(0));
} catch (RuntimeException e) {
throw new RedisException(e);
}
}
//如果沒有配置readFrom或者不是READ 則返回master連接
return getConnection(getMaster());
}
protected StatefulRedisConnection<K, V> getConnection(RedisNodeDescription redisNodeDescription) {
//如果沒有則創建新節點,並添加到緩存中
return connections.computeIfAbsent(
new ConnectionKey(redisNodeDescription.getUri().getHost(), redisNodeDescription.getUri().getPort()),
connectionFactory);
}
創建實際的connectio
@Override
public StatefulRedisConnection<K, V> apply(ConnectionKey key) {
//構建URI
RedisURI.Builder builder = RedisURI.Builder
.redis(key.host, key.port)
.withSsl(initialRedisUri.isSsl())
.withVerifyPeer(initialRedisUri.isVerifyPeer())
.withStartTls(initialRedisUri.isStartTls());
if (initialRedisUri.getPassword() != null && initialRedisUri.getPassword().length != 0) {
builder.withPassword(initialRedisUri.getPassword());
}
if (initialRedisUri.getClientName() != null) {
builder.withClientName(initialRedisUri.getClientName());
}
builder.withDatabase(initialRedisUri.getDatabase());
//創建連接
StatefulRedisConnection<K, V> connection = redisClient.connect(redisCodec, builder.build());
//設置是否自動提交
synchronized (stateLock) {
connection.setAutoFlushCommands(autoFlushCommands);
}
return connection;
}
public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec, RedisURI redisURI) {
assertNotNull(redisURI);
return connectStandalone(codec, redisURI, redisURI.getTimeout());
}
private <K, V> StatefulRedisConnection<K, V> connectStandalone(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) {
//單機異步
ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStandaloneAsync(codec, redisURI, timeout);
//獲取連接
return getConnection(future);
}
private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
RedisURI redisURI, Duration timeout) {
assertNotNull(codec);
//檢查URI是否有效
checkValidRedisURI(redisURI);
logger.debug("Trying to get a Redis connection for: " + redisURI);
//創建DefaultEndpoint
DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions);
//創建connection
StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(endpoint, codec, timeout);
//創建連接
ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,
() -> new CommandHandler(clientOptions, clientResources, endpoint));
future.whenComplete((channelHandler, throwable) -> {
if (throwable != null) {
connection.close();
}
});
return future;
}
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,
DefaultEndpoint endpoint, RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
//connetion構造器
ConnectionBuilder connectionBuilder;
if (redisURI.isSsl()) {
SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
sslConnectionBuilder.ssl(redisURI);
connectionBuilder = sslConnectionBuilder;
} else {
connectionBuilder = ConnectionBuilder.connectionBuilder();
}
//設置connection
connectionBuilder.connection(connection);
//設置客戶端選項
connectionBuilder.clientOptions(clientOptions);
//設置客戶端資源
connectionBuilder.clientResources(clientResources);
//設置命令處理器以及endpoint
connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
//填充連接構造器
connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
//設置通道類型
channelType(connectionBuilder, redisURI);
if (clientOptions.isPingBeforeActivateConnection()) {
if (hasPassword(redisURI)) {
connectionBuilder.enableAuthPingBeforeConnect();
} else {
connectionBuilder.enablePingBeforeConnect();
}
}
//創建異步通道
ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);
//如果客戶端選項配置了pingBeforeActivateConnection同時有密碼
if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) {
future = future.thenApplyAsync(channelHandler -> {
connection.async().auth(new String(redisURI.getPassword()));
return channelHandler;
}, clientResources.eventExecutorGroup());
}
if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
future.thenApply(channelHandler -> {
connection.setClientName(redisURI.getClientName());
return channelHandler;
});
}
if (redisURI.getDatabase() != 0) {
future = future.thenApplyAsync(channelHandler -> {
connection.async().select(redisURI.getDatabase());
return channelHandler;
}, clientResources.eventExecutorGroup());
}
return future.thenApply(channelHandler -> (S) connection);
}
/**
* 連接同時通過connectionBuilder初始化一個通道
*/
@SuppressWarnings("unchecked")
protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(
ConnectionBuilder connectionBuilder) {
//獲取socketAddress
SocketAddress redisAddress = connectionBuilder.socketAddress();
//如果線程池關閉則拋出異常
if (clientResources.eventExecutorGroup().isShuttingDown()) {
throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
}
logger.debug("Connecting to Redis at {}", redisAddress);
CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();
//獲取bootstrap
Bootstrap redisBootstrap = connectionBuilder.bootstrap();
//創建redis通道初始化器
RedisChannelInitializer initializer = connectionBuilder.build();
//設置netty的處理器
redisBootstrap.handler(initializer);
//netty自定設置處理
clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
CompletableFuture<Boolean> initFuture = initializer.channelInitialized();
ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);
//增加監聽器
connectFuture.addListener(future -> {
if (!future.isSuccess()) {
logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause());
connectionBuilder.endpoint().initialState();
channelReadyFuture.completeExceptionally(future.cause());
return;
}
initFuture.whenComplete((success, throwable) -> {
if (throwable == null) {
logger.debug("Connecting to Redis at {}: Success", redisAddress);
RedisChannelHandler<?, ?> connection = connectionBuilder.connection();
connection.registerCloseables(closeableResources, connection);
channelReadyFuture.complete(connectFuture.channel());
return;
}
logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable);
connectionBuilder.endpoint().initialState();
Throwable failure;
if (throwable instanceof RedisConnectionException) {
failure = throwable;
} else if (throwable instanceof TimeoutException) {
failure = new RedisConnectionException("Could not initialize channel within "
+ connectionBuilder.getTimeout(), throwable);
} else {
failure = throwable;
}
channelReadyFuture.completeExceptionally(failure);
CompletableFuture<Boolean> response = new CompletableFuture<>();
response.completeExceptionally(failure);
});
});
return new DefaultConnectionFuture<T>(redisAddress, channelReadyFuture.thenApply(channel -> (T) connectionBuilder
.connection()));
}
connectionBuilder.build()
public RedisChannelInitializer build() {
return new PlainChannelInitializer(pingCommandSupplier, this::buildHandlers, clientResources, timeout);
}
protected List<ChannelHandler> buildHandlers() {
LettuceAssert.assertState(channelGroup != null, "ChannelGroup must be set");
LettuceAssert.assertState(connectionEvents != null, "ConnectionEvents must be set");
LettuceAssert.assertState(connection != null, "Connection must be set");
LettuceAssert.assertState(clientResources != null, "ClientResources must be set");
LettuceAssert.assertState(endpoint != null, "Endpoint must be set");
List<ChannelHandler> handlers = new ArrayList<>();
//設置clientOptions
connection.setOptions(clientOptions);
//添加channel監聽器
handlers.add(new ChannelGroupListener(channelGroup));
//添加命令編碼器
handlers.add(new CommandEncoder());
//添加commandHander
handlers.add(commandHandlerSupplier.get());
//如果設置自動重連,則設置看門狗處理器
if (clientOptions.isAutoReconnect()) {
handlers.add(createConnectionWatchdog());
}
//設置connectionEvenTrigger
handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus()));
if (clientOptions.isAutoReconnect()) {
handlers.add(createConnectionWatchdog());
}
return handlers;
}
看門狗處理器到作用就是在通道斷開是進行重連
protected ConnectionWatchdog createConnectionWatchdog() {
//如果看門狗不為null直接返回
if (connectionWatchdog != null) {
return connectionWatchdog;
}
LettuceAssert.assertState(bootstrap != null, "Bootstrap must be set for autoReconnect=true");
LettuceAssert.assertState(timer != null, "Timer must be set for autoReconnect=true");
LettuceAssert.assertState(socketAddressSupplier != null, "SocketAddressSupplier must be set for autoReconnect=true");
//創建連接看門狗
ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap, timer,
clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener, connection);
//向endpoint注冊看門狗
endpoint.registerConnectionWatchdog(watchdog);
connectionWatchdog = watchdog;
return watchdog;
}
lass PlainChannelInitializer extends io.netty.channel.ChannelInitializer<Channel> implements RedisChannelInitializer {
//不ping
final static Supplier<AsyncCommand<?, ?, ?>> NO_PING = () -> null;
//處理器提供器
private final Supplier<List<ChannelHandler>> handlers;
//ping命令提供器
private final Supplier<AsyncCommand<?, ?, ?>> pingCommandSupplier;
private final ClientResources clientResources;
//超時時間
private final Duration timeout;
private volatile CompletableFuture<Boolean> initializedFuture = new CompletableFuture<>();
PlainChannelInitializer(Supplier<AsyncCommand<?, ?, ?>> pingCommandSupplier, Supplier<List<ChannelHandler>> handlers,
ClientResources clientResources, Duration timeout) {
this.pingCommandSupplier = pingCommandSupplier;
this.handlers = handlers;
this.clientResources = clientResources;
this.timeout = timeout;
}
@Override
protected void initChannel(Channel channel) throws Exception {
//如果pipeline中沒有配置channelActivator則需要添加channelActivator處理器
if (channel.pipeline().get("channelActivator") == null) {
channel.pipeline().addLast("channelActivator", new RedisChannelInitializerImpl() {
private AsyncCommand<?, ?, ?> pingCommand;
@Override
public CompletableFuture<Boolean> channelInitialized() {
return initializedFuture;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//如果通道斷開連接
clientResources.eventBus().publish(new DisconnectedEvent(local(ctx), remote(ctx)));
//如果初始化沒有完成則拋出異常
if (!initializedFuture.isDone()) {
initializedFuture.completeExceptionally(new RedisConnectionException("Connection closed prematurely"));
}
initializedFuture = new CompletableFuture<>();
pingCommand = null;
super.channelInactive(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ConnectionEvents.Activated) {
if (!initializedFuture.isDone()) {
initializedFuture.complete(true);
clientResources.eventBus().publish(new ConnectionActivatedEvent(local(ctx), remote(ctx)));
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
//通過事件總線發送連接事件
clientResources.eventBus().publish(new ConnectedEvent(local(ctx), remote(ctx)));
//如果ping命令提供器不是NO_PING則發送執行ping
if (pingCommandSupplier != NO_PING) {
pingCommand = pingCommandSupplier.get();
pingBeforeActivate(pingCommand, initializedFuture, ctx, clientResources, timeout);
} else {
super.channelActive(ctx);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (!initializedFuture.isDone()) {
initializedFuture.completeExceptionally(cause);
}
super.exceptionCaught(ctx, cause);
}
});
}
//將hanler提供器提供的處理器添加到channel中
for (ChannelHandler handler : handlers.get()) {
channel.pipeline().addLast(handler);
}
clientResources.nettyCustomizer().afterChannelInitialized(channel);
}
static void pingBeforeActivate(AsyncCommand<?, ?, ?> cmd, CompletableFuture<Boolean> initializedFuture,
ChannelHandlerContext ctx, ClientResources clientResources, Duration timeout) throws Exception {
ctx.fireUserEventTriggered(new PingBeforeActivate(cmd));
Runnable timeoutGuard = () -> {
if (cmd.isDone() || initializedFuture.isDone()) {
return;
}
initializedFuture.completeExceptionally(new RedisCommandTimeoutException(String.format(
"Cannot initialize channel (PING before activate) within %s", timeout)));
};
Timeout timeoutHandle = clientResources.timer().newTimeout(t -> {
if (clientResources.eventExecutorGroup().isShuttingDown()) {
timeoutGuard.run();
return;
}
clientResources.eventExecutorGroup().submit(timeoutGuard);
}, timeout.toNanos(), TimeUnit.NANOSECONDS);
cmd.whenComplete((o, throwable) -> {
timeoutHandle.cancel();
if (throwable == null) {
ctx.fireChannelActive();
initializedFuture.complete(true);
} else {
initializedFuture.completeExceptionally(throwable);
}
});
}
@Override
public CompletableFuture<Boolean> channelInitialized() {
return initializedFuture;
}
}
