Lettuce是一個高級的Redis客戶端,下面通過對其創建連接過程的源碼進行走讀
下面看看RedisClient是如何創建單機模式的異步連接的, 首先從RedisClient中的connectAsync看起,在該方法中並沒有什么特別的地方,在對RedisURI進行非空校驗后就直接調用了內部方法
public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec, RedisURI redisURI) {
assertNotNull(redisURI);
return connectStandalone(codec, redisURI, redisURI.getTimeout());
}
在內部方法中首先通過一個異步方式創建連接,在從ConnectionFuture中獲取連接
/**
* 獲取單機連接
*/
private <K, V> StatefulRedisConnection<K, V> connectStandalone(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) {
//單機異步連接
ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStandaloneAsync(codec, redisURI, timeout);
//獲取連接
return getConnection(future);
}
那么異步創建連接的過程又是什么樣子的呢?下面就通過其代碼進行分析一下
private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
RedisURI redisURI, Duration timeout) {
//編解碼器不能為null
assertNotNull(codec);
//檢查URI是否有效
checkValidRedisURI(redisURI);
logger.debug("Trying to get a Redis connection for: " + redisURI);
//創建DefaultEndpoint
DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions);
//創建connection,該connection是一個真正有效的connection其它的都是再此基礎上進行增強
StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(endpoint, codec, timeout);
//異步方式創建連接
ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,
() -> new CommandHandler(clientOptions, clientResources, endpoint));
//注冊監聽器,在結束時觸發
future.whenComplete((channelHandler, throwable) -> {
//如果異常不為null則表示連接創建異常,則需要關閉連接
if (throwable != null) {
connection.close();
}
});
//返回
return future;
}
在newStatefulRedisConnection中只是創建了連接對象,此時還不是一個可用連接
protected <K, V> StatefulRedisConnectionImpl<K, V> newStatefulRedisConnection(RedisChannelWriter channelWriter,
RedisCodec<K, V> codec, Duration timeout) {
return new StatefulRedisConnectionImpl<>(channelWriter, codec, timeout);
}
可以看到在創建 StatefulRedisConnectionImpl實例的時候實際上是創建了多種方式連接,異步連接,同步連接響應式連接
/**
* 初始化一個新的連接
*/
public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V> codec, Duration timeout) {
super(writer, timeout);
this.codec = codec;
//創建異步步連接
this.async = newRedisAsyncCommandsImpl();
//創建同步連接
this.sync = newRedisSyncCommandsImpl();
//創建響應式連接
this.reactive = newRedisReactiveCommandsImpl();
}
其中異步連接的方法如下:
protected RedisAsyncCommandsImpl<K, V> newRedisAsyncCommandsImpl() {
//使用裝飾器模式對當前實例進行增強
return new RedisAsyncCommandsImpl<>(this, codec);
}
此時創建的連接對象還不是一個可用連接,關鍵邏輯還是在connectionStatefulAsync中實現
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,
DefaultEndpoint endpoint, RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
//connetion構造器,在Lettuce中對於構造器模式運用很多
ConnectionBuilder connectionBuilder;
//根據是否是SSL選擇不同構造器
if (redisURI.isSsl()) {
SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
sslConnectionBuilder.ssl(redisURI);
connectionBuilder = sslConnectionBuilder;
} else {
connectionBuilder = ConnectionBuilder.connectionBuilder();
}
//設置connection
connectionBuilder.connection(connection);
//設置客戶端選項
connectionBuilder.clientOptions(clientOptions);
//設置客戶端資源
connectionBuilder.clientResources(clientResources);
//設置命令處理器以及endpoint
connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
//填充連接構造器,
connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
//設置頻道類型,同時根據頻道類型設置客戶端NIO線程組
channelType(connectionBuilder, redisURI);
//在連接生效前是否需要ping
if (clientOptions.isPingBeforeActivateConnection()) {
if (hasPassword(redisURI)) {
connectionBuilder.enableAuthPingBeforeConnect();
} else {
connectionBuilder.enablePingBeforeConnect();
}
}
//創建異步通道
ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);
//如果客戶端選項配置了pingBeforeActivateConnection同時有密碼
if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) {
future = future.thenApplyAsync(channelHandler -> {
connection.async().auth(new String(redisURI.getPassword()));
return channelHandler;
}, clientResources.eventExecutorGroup());
}
if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
future.thenApply(channelHandler -> {
connection.setClientName(redisURI.getClientName());
return channelHandler;
});
}
if (redisURI.getDatabase() != 0) {
future = future.thenApplyAsync(channelHandler -> {
connection.async().select(redisURI.getDatabase());
return channelHandler;
}, clientResources.eventExecutorGroup());
}
return future.thenApply(channelHandler -> (S) connection);
}
在connectionBuilder方法中創建了Netty的客戶端Bootstrap
protected void connectionBuilder(Supplier<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
RedisURI redisURI) {
//創建Bootstrap netty啟動器
Bootstrap redisBootstrap = new Bootstrap();
//設置channel選項
redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);
//獲取套接字選項
SocketOptions socketOptions = getOptions().getSocketOptions();
//設置連接超時時間
redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
Math.toIntExact(socketOptions.getConnectTimeout().toMillis()));
//如果redisURI中沒有socket選擇參數則根據clientresouce設置
if (LettuceStrings.isEmpty(redisURI.getSocket())) {
//是否保持長連接
redisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive());
//是否要求TCP低延遲
redisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay());
}
//設置超時時間
connectionBuilder.timeout(redisURI.getTimeout());
//設置密碼
connectionBuilder.password(redisURI.getPassword());
//設置bootstrap
connectionBuilder.bootstrap(redisBootstrap);
connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
connectionBuilder.socketAddressSupplier(socketAddressSupplier);
}
在channelType方法中設置了EeventLoopGroup
protected void channelType(ConnectionBuilder connectionBuilder, ConnectionPoint connectionPoint) {
LettuceAssert.notNull(connectionPoint, "ConnectionPoint must not be null");
//設置客戶端線程組,EventLoopGroup用來處理所有頻道事件
connectionBuilder.bootstrap().group(getEventLoopGroup(connectionPoint));
if (connectionPoint.getSocket() != null) {
NativeTransports.assertAvailable();
connectionBuilder.bootstrap().channel(NativeTransports.domainSocketChannelClass());
} else {
connectionBuilder.bootstrap().channel(Transports.socketChannelClass());
}
}
/**
* 異步處理連接同時通過connectionBuilder初始化一個通道
*/
@SuppressWarnings("unchecked")
protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(
ConnectionBuilder connectionBuilder) {
//獲取socketAddress
SocketAddress redisAddress = connectionBuilder.socketAddress();
//如果線程池關閉則拋出異常
if (clientResources.eventExecutorGroup().isShuttingDown()) {
throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
}
logger.debug("Connecting to Redis at {}", redisAddress);
//頻道准備就緒future
CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();
//獲取bootstrap
Bootstrap redisBootstrap = connectionBuilder.bootstrap();
//創建redis通道初始化器
RedisChannelInitializer initializer = connectionBuilder.build();
//設置netty的處理器
redisBootstrap.handler(initializer);
//netty自定設置處理
clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
CompletableFuture<Boolean> initFuture = initializer.channelInitialized();
//連接Redis服務器,在該處才是真正和服務器創建連接
ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);
//增加監聽器
connectFuture.addListener(future -> {
//沒有成功
if (!future.isSuccess()) {
logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause());
connectionBuilder.endpoint().initialState();
//通過准備就緒異步結果異常結束
channelReadyFuture.completeExceptionally(future.cause());
return;
}
//completableFuture特性,在future結束的時候執行
initFuture.whenComplete((success, throwable) -> {
//如果throwable不為null表示存在異常
if (throwable == null) {
logger.debug("Connecting to Redis at {}: Success", redisAddress);
//獲取RedisChannelHandler
RedisChannelHandler<?, ?> connection = connectionBuilder.connection();
//注冊可關閉資源,在connection關閉的時候關閉可關閉資源
connection.registerCloseables(closeableResources, connection);
//頻道准備就緒
channelReadyFuture.complete(connectFuture.channel());
return;
}
logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable);
connectionBuilder.endpoint().initialState();
Throwable failure;
if (throwable instanceof RedisConnectionException) {
failure = throwable;
} else if (throwable instanceof TimeoutException) {
failure = new RedisConnectionException("Could not initialize channel within "
+ connectionBuilder.getTimeout(), throwable);
} else {
failure = throwable;
}
channelReadyFuture.completeExceptionally(failure);
CompletableFuture<Boolean> response = new CompletableFuture<>();
response.completeExceptionally(failure);
});
});
//針對connectionBuilder.connection()的結果進行裝飾,增加獲取remoteAddress功能
return new DefaultConnectionFuture<T>(redisAddress, channelReadyFuture.thenApply(channel -> (T) connectionBuilder
.connection()));
}
public RedisChannelInitializer build() {
return new PlainChannelInitializer(pingCommandSupplier, this::buildHandlers, clientResources, timeout);
}
在buildHandlers中創建了一些處理器,這些處理器都是有序的
- 命令編碼器,用戶將命令編碼為Redis通信協議規定的格式
- CammandHanler lettuce核心功能
- ConnectionWatchDog 用於自動重連
- ConnectionEventTriger 用於發布connection事件
protected List<ChannelHandler> buildHandlers() {
LettuceAssert.assertState(channelGroup != null, "ChannelGroup must be set");
LettuceAssert.assertState(connectionEvents != null, "ConnectionEvents must be set");
LettuceAssert.assertState(connection != null, "Connection must be set");
LettuceAssert.assertState(clientResources != null, "ClientResources must be set");
LettuceAssert.assertState(endpoint != null, "Endpoint must be set");
List<ChannelHandler> handlers = new ArrayList<>();
//設置clientOptions
connection.setOptions(clientOptions);
//添加頻道監控,如果頻道有效則將頻道添加到頻道組中,如果頻道無效則從頻道組中刪除
handlers.add(new ChannelGroupListener(channelGroup));
//添加命令編碼器
handlers.add(new CommandEncoder());
//添加commandHander
handlers.add(commandHandlerSupplier.get());
//如果設置自動重連,則設置看門狗處理器
if (clientOptions.isAutoReconnect()) {
handlers.add(createConnectionWatchdog());
}
//設置connectionEvenTrigger
handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus()));
if (clientOptions.isAutoReconnect()) {
handlers.add(createConnectionWatchdog());
}
return handlers;
}
@Override
protected void initChannel(Channel channel) throws Exception {
//如果pipeline中沒有配置channelActivator則需要添加channelActivator處理器
if (channel.pipeline().get("channelActivator") == null) {
channel.pipeline().addLast("channelActivator", new RedisChannelInitializerImpl() {
private AsyncCommand<?, ?, ?> pingCommand;
@Override
public CompletableFuture<Boolean> channelInitialized() {
return initializedFuture;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//如果通道斷開連接
clientResources.eventBus().publish(new DisconnectedEvent(local(ctx), remote(ctx)));
//如果初始化沒有完成則拋出異常
if (!initializedFuture.isDone()) {
initializedFuture.completeExceptionally(new RedisConnectionException("Connection closed prematurely"));
}
initializedFuture = new CompletableFuture<>();
pingCommand = null;
super.channelInactive(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ConnectionEvents.Activated) {
if (!initializedFuture.isDone()) {
initializedFuture.complete(true);
clientResources.eventBus().publish(new ConnectionActivatedEvent(local(ctx), remote(ctx)));
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
//通過事件總線發送連接事件
clientResources.eventBus().publish(new ConnectedEvent(local(ctx), remote(ctx)));
//如果ping命令提供器不是NO_PING則發送執行ping
if (pingCommandSupplier != NO_PING) {
pingCommand = pingCommandSupplier.get();
pingBeforeActivate(pingCommand, initializedFuture, ctx, clientResources, timeout);
} else {
super.channelActive(ctx);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (!initializedFuture.isDone()) {
initializedFuture.completeExceptionally(cause);
}
super.exceptionCaught(ctx, cause);
}
});
}
//將hanler提供器提供的的處理器添加到該頻道的管道中
for (ChannelHandler handler : handlers.get()) {
channel.pipeline().addLast(handler);
}
//擴展點,用戶可以對向pipline中添加自定義的channel
clientResources.nettyCustomizer().afterChannelInitialized(channel);
}
