Lettuce是一個可伸縮線程安全的Redis客戶端。多個線程可以共享同一個RedisConnection.本文是基於Lettuce5,主要介紹的知識點如下:
- Lettuce在Spring Boot中的配置
- Lettuce的同步,異步,響應式使用方式
- 事件的訂閱
- 發布自定義事件
- 讀寫分離
- 讀寫分離策略實現源碼
- 客戶端分片實現
@Configuration
public class LettuceConfig {
/**
* 配置客戶端資源
* @return
*/
@Bean(destroyMethod = "shutdown")
ClientResources clientResources() {
return DefaultClientResources.builder().ioThreadPoolSize(8).computationThreadPoolSize(10).build();
}
/**
* 配置Socket選項
* keepAlive=true
* tcpNoDelay=true
* connectionTimeout=5秒
* @return
*/
@Bean
SocketOptions socketOptions(){
return SocketOptions.builder().keepAlive(true).tcpNoDelay(true).connectTimeout(Duration.ofSeconds(5)).build();
}
/**
* 配置客戶端選項
* @return
*/
@Bean
ClientOptions clientOptions(SocketOptions socketOptions) {
return ClientOptions.builder().socketOptions(socketOptions).build();
}
/**
* 創建RedisClient
* @param clientResources 客戶端資源
* @param clientOptions 客戶端選項
* @return
*/
@Bean(destroyMethod = "shutdown")
RedisClient redisClient(ClientResources clientResources, ClientOptions clientOptions) {
RedisURI uri = RedisURI.builder().withSentinel("xx.xx.xx.xx", 26009).withPassword("abcd1234").withSentinelMasterId("xxx").build();
RedisClient client = RedisClient.create(clientResources, uri);
client.setOptions(clientOptions);
return client;
}
/**
* 創建連接
* @param redisClient
* @return
*/
@Bean(destroyMethod = "close")
StatefulRedisConnection<String, String> connection(RedisClient redisClient) {
return redisClient.connect();
}
}
基本使用
public Mono<ServerResponse> hello(ServerRequest request) throws Exception {
//響應式使用
Mono<String> resp = redisConnection.reactive().get("gxt_new");
//同步使用
redisConnection.sync().get("test");
redisConnection.async().get("test").get(5, TimeUnit.SECONDS);
return ServerResponse.ok().body(resp, String.class);
}
客戶端訂閱事件
客戶端使用事件總線傳輸運行期間產生的事件;EventBus可以從客戶端資源進行配置和獲取,並用於客戶端和自定義事件。
如下事件可以被客戶端發送:
- 連接事件
- 測量事件
- 集群拓撲事件
client.getResources().eventBus().get().subscribe(e -> {
System.out.println("client 訂閱事件: " + e);
});
client 訂閱事件: ConnectionActivatedEvent [/xx:49910 -> /xx:6008] client 訂閱事件: ConnectionActivatedEvent [/xx:49911 -> /xx:6018] client 訂閱事件: ConnectedEvent [/xx:49912 -> /xx:6018]
發布事件
發布使用也是通過使用eventBus進行發布事件,Event接口只是一個標簽接口
eventBus.publish(new Event() {
@Override
public String toString() {
return "自定義事件";
}
});
訂閱者就可以訂閱到這個自定義事件了
client 訂閱事件: 自定義事件
讀寫分離
@Bean(destroyMethod = "close")
StatefulRedisMasterSlaveConnection<String, String> statefulRedisMasterSlaveConnection(RedisClient redisClient, RedisURI redisURI) {
StatefulRedisMasterSlaveConnection connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisURI);
connection.setReadFrom(ReadFrom.NEAREST);
return connection;
}
}
StatefulRedisMasterSlaveConnection 支持讀寫分離,通過設置ReadFrom控制讀是從哪個節點讀取.
| 參數 | 含義 |
| MASTER | 從master節點讀取 |
| SLAVE | 從slave節點讀取 |
MASTER_PREFERRED |
從master節點讀取,如果master節點不可以則從slave節點讀取 |
SLAVE_PREFERRED |
從slave節點讀取,如果slave節點不可用則倒退到master節點讀取 |
NEAREST |
從最近到節點讀取 |
具體是如何實現到呢? 下面看一下MasterSlaveConnectionProvider相關源碼
//根據意圖獲取連接
public StatefulRedisConnection<K, V> getConnection(Intent intent) {
if (debugEnabled) {
logger.debug("getConnection(" + intent + ")");
}
//如果readFrom不為null且是READ
if (readFrom != null && intent == Intent.READ) {
//根據readFrom配置從已知節點中選擇可用節點描述
List<RedisNodeDescription> selection = readFrom.select(new ReadFrom.Nodes() {
@Override
public List<RedisNodeDescription> getNodes() {
return knownNodes;
}
@Override
public Iterator<RedisNodeDescription> iterator() {
return knownNodes.iterator();
}
});
//如果可選擇節點集合為空則拋出異常
if (selection.isEmpty()) {
throw new RedisException(String.format("Cannot determine a node to read (Known nodes: %s) with setting %s",
knownNodes, readFrom));
}
try {
//遍歷所有可用節點
for (RedisNodeDescription redisNodeDescription : selection) {
//獲取節點連接
StatefulRedisConnection<K, V> readerCandidate = getConnection(redisNodeDescription);
//如果節點連接不是打開到連接則繼續查找下一個連接
if (!readerCandidate.isOpen()) {
continue;
}
//返回可用連接
return readerCandidate;
}
//如果沒有找到可用連接,默認返回第一個
return getConnection(selection.get(0));
} catch (RuntimeException e) {
throw new RedisException(e);
}
}
//如果沒有配置readFrom或者不是READ 則返回master連接
return getConnection(getMaster());
}
我們可以看到選擇連接到邏輯是通用的,不同的處理就是在selection的處理上,下面看一下不同readFrom策略對於selection的處理
ReadFromSlavePerferred和ReadFromMasterPerferred都是有優先級到概念,看看相關邏輯的處理
static final class ReadFromSlavePreferred extends ReadFrom {
@Override
public List<RedisNodeDescription> select(Nodes nodes) {
List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());
//優先添加slave節點
for (RedisNodeDescription node : nodes) {
if (node.getRole() == RedisInstance.Role.SLAVE) {
result.add(node);
}
}
//最后添加master節點
for (RedisNodeDescription node : nodes) {
if (node.getRole() == RedisInstance.Role.MASTER) {
result.add(node);
}
}
return result;
}
static final class ReadFromMasterPreferred extends ReadFrom {
@Override
public List<RedisNodeDescription> select(Nodes nodes) {
List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());
//優先添加master節點
for (RedisNodeDescription node : nodes) {
if (node.getRole() == RedisInstance.Role.MASTER) {
result.add(node);
}
}
//其次在添加slave節點
for (RedisNodeDescription node : nodes) {
if (node.getRole() == RedisInstance.Role.SLAVE) {
result.add(node);
}
}
return result;
}
}
對於ReadFromMaster和ReadFromSlave都是獲取指定角色的節點
static final class ReadFromSlave extends ReadFrom {
@Override
public List<RedisNodeDescription> select(Nodes nodes) {
List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());
//只獲取slave節點
for (RedisNodeDescription node : nodes) {
if (node.getRole() == RedisInstance.Role.SLAVE) {
result.add(node);
}
}
return result;
}
}
static final class ReadFromMaster extends ReadFrom {
@Override
public List<RedisNodeDescription> select(Nodes nodes) {
for (RedisNodeDescription node : nodes) {
if (node.getRole() == RedisInstance.Role.MASTER) {
return LettuceLists.newList(node);
}
}
return Collections.emptyList();
}
}
獲取最近的節點這個就有點特殊了,它對已知對節點沒有做處理,直接返回了它們的節點描述,也就是誰在前面就優先使用誰
static final class ReadFromNearest extends ReadFrom {
@Override
public List<RedisNodeDescription> select(Nodes nodes) {
return nodes.getNodes();
}
}
在SentinelTopologyProvider中可以發現,獲取nodes節點總是優先獲取Master節點,其次是slave節點,這樣Nearest效果就等效與MasterPreferred
public List<RedisNodeDescription> getNodes() {
logger.debug("lookup topology for masterId {}", masterId);
try (StatefulRedisSentinelConnection<String, String> connection = redisClient.connectSentinel(CODEC, sentinelUri)) {
RedisFuture<Map<String, String>> masterFuture = connection.async().master(masterId);
RedisFuture<List<Map<String, String>>> slavesFuture = connection.async().slaves(masterId);
List<RedisNodeDescription> result = new ArrayList<>();
try {
Map<String, String> master = masterFuture.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
List<Map<String, String>> slaves = slavesFuture.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
//添加master節點
result.add(toNode(master, RedisInstance.Role.MASTER));
//添加所有slave節點
result.addAll(slaves.stream().filter(SentinelTopologyProvider::isAvailable)
.map(map -> toNode(map, RedisInstance.Role.SLAVE)).collect(Collectors.toList()));
} catch (ExecutionException | InterruptedException | TimeoutException e) {
throw new RedisException(e);
}
return result;
}
}
自定義負載均衡
通過上文可以發現只需要實現 ReadFrom接口,就可以通過該接口實現Master,Slave負載均衡;下面的示例是通過將nodes節點進行打亂,進而實現
@Bean(destroyMethod = "close")
StatefulRedisMasterSlaveConnection<String, String> statefulRedisMasterSlaveConnection(RedisClient redisClient, RedisURI redisURI) {
StatefulRedisMasterSlaveConnection connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisURI);
connection.setReadFrom(new ReadFrom() {
@Override
public List<RedisNodeDescription> select(Nodes nodes) {
List<RedisNodeDescription> list = nodes.getNodes();
Collections.shuffle(list);
return list;
}
});
return connection;
}
在大規模使用的時候會使用多組主備服務,可以通過客戶端分片的方式將部分請求路由到指定的服務器上,但是Lettuce沒有提供這樣的支持,下面是自定義的實現:
public class Sharded< C extends StatefulRedisConnection,V> {
private TreeMap<Long, String> nodes;
private final Hashing algo = Hashing.MURMUR_HASH;
private final Map<String, StatefulRedisConnection> resources = new LinkedHashMap<>();
private RedisClient redisClient;
private String password;
private Set<HostAndPort> sentinels;
private RedisCodec<String, V> codec;
public Sharded(List<String> masters, RedisClient redisClient, String password, Set<HostAndPort> sentinels, RedisCodec<String, V> codec) {
this.redisClient = redisClient;
this.password = password;
this.sentinels = sentinels;
this.codec = codec;
initialize(masters);
}
private void initialize(List<String> masters) {
nodes = new TreeMap<>();
for (int i = 0; i != masters.size(); ++i) {
final String master = masters.get(i);
for (int n = 0; n < 160; n++) {
nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), master);
}
RedisURI.Builder builder = RedisURI.builder();
for (HostAndPort hostAndPort : sentinels) {
builder.withSentinel(hostAndPort.getHostText(), hostAndPort.getPort());
}
RedisURI redisURI = builder.withPassword(password).withSentinelMasterId(master).build();
resources.put(master, MasterSlave.connect(redisClient, codec, redisURI));
}
}
public StatefulRedisConnection getConnectionBy(String key) {
return resources.get(getShardInfo(SafeEncoder.encode(key)));
}
public Collection<StatefulRedisConnection> getAllConnection(){
return Collections.unmodifiableCollection(resources.values());
}
public String getShardInfo(byte[] key) {
SortedMap<Long, String> tail = nodes.tailMap(algo.hash(key));
if (tail.isEmpty()) {
return nodes.get(nodes.firstKey());
}
return tail.get(tail.firstKey());
}
public void close(){
for(StatefulRedisConnection connection: getAllConnection()){
connection.close();
}
}
private static class SafeEncoder {
static byte[] encode(final String str) {
try {
if (str == null) {
throw new IllegalArgumentException("value sent to redis cannot be null");
}
return str.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
}
private interface Hashing {
Hashing MURMUR_HASH = new MurmurHash();
long hash(String key);
long hash(byte[] key);
}
private static class MurmurHash implements Hashing {
static long hash64A(byte[] data, int seed) {
return hash64A(ByteBuffer.wrap(data), seed);
}
static long hash64A(ByteBuffer buf, int seed) {
ByteOrder byteOrder = buf.order();
buf.order(ByteOrder.LITTLE_ENDIAN);
long m = 0xc6a4a7935bd1e995L;
int r = 47;
long h = seed ^ (buf.remaining() * m);
long k;
while (buf.remaining() >= 8) {
k = buf.getLong();
k *= m;
k ^= k >>> r;
k *= m;
h ^= k;
h *= m;
}
if (buf.remaining() > 0) {
ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
// for big-endian version, do this first:
// finish.position(8-buf.remaining());
finish.put(buf).rewind();
h ^= finish.getLong();
h *= m;
}
h ^= h >>> r;
h *= m;
h ^= h >>> r;
buf.order(byteOrder);
return h;
}
public long hash(byte[] key) {
return hash64A(key, 0x1234ABCD);
}
public long hash(String key) {
return hash(SafeEncoder.encode(key));
}
}
}
@Bean(destroyMethod = "close")
Sharded<StatefulRedisMasterSlaveConnection,String> sharded(RedisClient redisClient) {
Set<HostAndPort> hostAndPorts=new HashSet<>();
hostAndPorts.add(HostAndPort.parse("1xx:26009"));
hostAndPorts.add(HostAndPort.parse("1xx:26009"));
return new Sharded<>(Arrays.asList("te009","test68","test67"),redisClient,"password",hostAndPorts, new Utf8StringCodec());
}
使用方式
//只從slave節點中讀取
StatefulRedisMasterSlaveConnection redisConnection = (StatefulRedisMasterSlaveConnection) sharded.getConnectionBy("key");
//使用異步模式獲取緩存值
System.out.println(redisConnection.sync().get("key"));
