一致性Hash與Redis集群數據分片


一、Hash算法引入--分布式緩存

有一個電商平台,需要使用Redis存儲商品的圖片資源,key為圖片名稱,value為圖片所在服務器的路徑。利用隨機分配的規則進行分庫。總量3000w,以每台服務器存500w的數量,部署12台緩存服務器,並且進行主從復制,架構圖如下圖:

由於規則是隨機分配的,那么我們不知道圖片存儲在哪台服務器上,這就需要遍歷所有redis服務器進行查詢。這顯然不是我們想要的,所以引入下面的Hash算法來優化。

1.Hash算法優化

目的是為了每張圖片在進行分庫時都可以得到特定的服務器。我們共有六台主服務器,計算的公式為:hash(milk.png) % 6 = 5 這樣就不需要進行遍歷redis主從服務器,節省時間,提升性能。

2.Hash算法弊端
當Redis緩存服務器的節點數發生變化時,所有的緩存位置都會發生改變。
假如我們增加2台Redis緩存服務器,計算公式從`hash(milk.png) % 6 = 5`變成了`hash(milk.png) % 8 = ?` 結果肯定不是5。
再或者,6台服務器集群中,當某個主從出現故障無法進行緩存,故障機器需要移除,取余數從6變成5,也會導致緩存位置更改。
為了解決上述問題,又引入了Hash一致性算法

二、一致性Hash算法引入

再說一致性Hash算法原理之前,首先要知道我們現在在做的是要優化我們的分配規則,這個優化就是通過(一致性hash算法)進行優化,算法如何實現的不去深究,只要知道算法最終達到的效果即可。

1.分配規則
一致性Hash是由一個固定長度的Hash環構成,大小為2的32次方.
一般用在服務器集群的增刪節點的處理上,根據節點名稱的Hash值(其分布為[0, 2^32-1])將服務器節點放置在這個Hash環上,然后根據數據的Key值計算得到其Hash值(其分布也為[0, 232-1]),接着在Hash環上順時針查找距離這個Key值的Hash值最近的服務器節點,完成Key到服務器的映射查找!

節點分配規則

圖1為hash環的整體布局,數值為[0, 2^32-1]
圖2中A、B、C三個節點對其ip:port經過hash計算之后得到三個值,對應hash環上ABC三個點,因此將整個hash環分成3部分
圖2中1234代表4個值經過hash計算后的位置,那么依據順時針找服務器節點的方式 1,2->A節點, 4->B節點, 3->C節點

增加和減少節點的影響

圖1為去掉C節點后的分布,如圖所示,按照規則,3->B節點,A節點無影響
圖2增加D節點,1->D節點,2->A節點,B和C節點無影響

2.虛擬節點

如上圖所示已經幫我們解決了最初Redis分布式緩存集群在增加節點和減少節點的情況下,找不到圖片路徑的問題。但是由上面的增減節點,我們會發現一個問題---節點分布不均勻,會導致各個節點負載不均衡。又引出虛擬節點的概念。

圖1中只有AB倆個節點且分布不均衡 1->A,2345->B
圖2中采取增加副本數為2的算法,那么總共就是4個節點 A1&A2->A,B1&B2->B,結果1->A1,5->A2,2&3->B2,4->B1,即1&5->A,2&3&4->B

給出以下參數建議

橫軸表示需要為每台服務器擴展的虛擬節點數,縱軸表示的是實際物理服務器數。
可以看出,物理服務器很少,需要更大的虛擬節點;反之物理服務器比較多,虛擬節點就可以少一些。
比如有10台物理服務器,那么差不多需要為每台服務器增加100~200個虛擬節點才可以達到真正的負載均衡

3.結論
在一致性Hash算法中,冗災和擴容對數據的影響
1.冗災:如果一台服務器不可用,則受影響的數據僅僅是此服務器到其環空間中前一台服務器(即沿着逆時針方向行走遇到的第一台服務器)之間數據,所以把影響范圍控制在一個很小的范圍內
2.擴容:如果增加一台服務器,則受影響的數據僅僅是新服務器到其環空間中前一台服務器(即沿着逆時針方向行走遇到的第一台服務器)之間數據,其它數據也不會受到影響,這就大大增加了系統的魯棒性
3.但是仍然可能會有部分數據丟失的風險,一致性hash仍然會導致緩存丟失

參考文章:一致性Hash算法:https://my.oschina.net/u/3768341/blog/2251129

三、Redis集群分片

1.Redis 集群協議中的客戶端和服務器端

1.1服務端節點功能,為滿足下面的功能,集群節點之間通過gossip協議傳播信息,監測心跳等功能

1.存儲數據
2.記錄集群的狀態(包括鍵值到正確節點的映射)
3.自動發現其他節點,檢測出沒正常工作的節點, 並且在需要的時候在從節點中推選出主節點

以下是節點文件nodes.conf中的節點信息參數含義:

epoch 從節點選舉機制:

epoch是一個集群中的邏輯時鍾,主要用於從節點提升的過程,這個值在節點進行ping-pong時進行傳輸,在節點最初創建的時候,每個節點都會產生一個epoch值定義為currentepoch,如上圖所示按節點順序及主從順序,產生了epoch的值1 2 3 4 5 6,以下是理解案例,只需要"大多數主節點"同意即可,案例只為演示epoch值增加的一個過程,正常是不需要請求從節點的同意的
主節點1宕掉 從節點1要想執行failover指令,要獲得其他節點的同意,而其他節點只支持epoch的值最大的節點所提出的選舉
    從節點1 -> 主節點2 發送ping epoch4 - 回復pong epoch4
    從節點1 -> 主節點3 發送ping epoch4 - 回復pong epoch4
    從節點1 -> 從節點2 發送ping epoch4 - 回復pong currentepoch5
    從節點1 -> 從節點2 發送ping currentepoch(5+1) - 回復pong epoch6
    從節點1 -> 從節點2 發送ping epoch6 - 回復pong currentepoch6
    從節點1 -> 從節點2 發送ping currentepoch(6+1) - 回復pong epoch7
 此時,從節點1獲取最大的epoch值,可以執行failover


1.2客戶端功能:

由於集群節點不能代理(proxy)請求,所以客戶端在接收到重定向錯誤(redirections errors) -MOVED 和 -ASK 的時候, 
將命令重定向到其他節點。理論上來說,客戶端是可以自由地向集群中的所有節點發送請求,在需要的時候把請求重定向到其他節點,所以客戶端是不需要保存集群狀態。 
不過客戶端可以緩存鍵值和節點之間的映射關系,這樣能明顯提高命令執行的效率。    

1.在 Redis 集群中節點並不是把命令轉發到管理所給出的鍵值的正確節點上,而是把客戶端重定向到服務一定范圍內的鍵值的節點上。
 最終客戶端獲得一份最新的集群信息,包含哪些節點服務及其對應的鍵值子集,所以在正常操作中客戶端是直接聯系到對應的節點並把給定的命令發過去。
2.同樣,由於一些命令不支持操作多個鍵值,如果不是槽重新分配(resharding),那么數據是永遠不會在節點間移動的。
3.普通操作是可以被處理得跟在單一 Redis 上一樣的。
這意味着,在一個擁有 N 個主節點的 Redis 集群中,由於 Redis 的設計是支持線性擴展的,所以你可以認為同樣的操作在集群上的表現會跟在單一 Redis 上的表現乘以 N 一樣。
同時,客戶端會保持跟節點持續不斷的連接,所以延遲數據跟在單一 Reids 上是一樣的。
2.鍵分布模型

鍵空間被分割為 16384 槽(slot),所有的主節點都負責 16384 個哈希槽中的一部分。
當集群處於穩定狀態時,集群中沒有在執行重配置(reconfiguration)操作,每個哈希槽都只由一個節點進行處理
以下是用來把鍵映射到哈希槽的算法(下一段哈希標簽例外就是按照這個規則):

HASH_SLOT = CRC16(key) mod 16384


通過客戶端查詢結果:

四、客戶端加載參數緩存

springboot使用rediscluster時,會將集群的節點及對應的槽信息加入緩存,提高操作效率,流程按照注釋1-10進行加載

//1.項目啟動會先加載autoconfiguration包下的spring.factories文件,文件中包含了各個需要加載的類
org.springframwork.boot:spring-boot-test-autoconfiguration:2.1.5.RELEASE
    spring.factories
    	org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration
    
@Configuration
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class) //yml映射文件
@Import({ LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class })//2.引入lettuce jedis連接
public class RedisAutoConfiguration {

	@Bean
	@ConditionalOnMissingBean(name = "redisTemplate")
	public RedisTemplate<Object, Object> redisTemplate(
			RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
		RedisTemplate<Object, Object> template = new RedisTemplate<>();
		template.setConnectionFactory(redisConnectionFactory);
		return template;
	}

	@Bean
	@ConditionalOnMissingBean
	public StringRedisTemplate stringRedisTemplate(
			RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
		StringRedisTemplate template = new StringRedisTemplate();
		template.setConnectionFactory(redisConnectionFactory);
		return template;
	}

}


@Configuration //配置類
@ConditionalOnClass({ GenericObjectPool.class, JedisConnection.class, Jedis.class })
class JedisConnectionConfiguration extends RedisConnectionConfiguration {

	private final RedisProperties properties;
	private final ObjectProvider<JedisClientConfigurationBuilderCustomizer> builderCustomizers;

	JedisConnectionConfiguration(RedisProperties properties,
			ObjectProvider<RedisSentinelConfiguration> sentinelConfiguration,
			ObjectProvider<RedisClusterConfiguration> clusterConfiguration,
			ObjectProvider<JedisClientConfigurationBuilderCustomizer> builderCustomizers) {
		super(properties, sentinelConfiguration, clusterConfiguration);
		this.properties = properties;
		this.builderCustomizers = builderCustomizers;
	}

	@Bean //3.注入類創建jedisConnnectionFactory
	@ConditionalOnMissingBean(RedisConnectionFactory.class)
	public JedisConnectionFactory redisConnectionFactory() throws UnknownHostException {
		return createJedisConnectionFactory();
	}

	private JedisConnectionFactory createJedisConnectionFactory() {
		JedisClientConfiguration clientConfiguration = getJedisClientConfiguration();
		if (getSentinelConfig() != null) {
			return new JedisConnectionFactory(getSentinelConfig(), clientConfiguration);
		}
		if (getClusterConfiguration() != null) {
			return new JedisConnectionFactory(getClusterConfiguration(),
					clientConfiguration);
		}
		return new JedisConnectionFactory(getStandaloneConfig(), clientConfiguration);
	}
}

//4.JedisConnectionFactory 實現 InitializingBean 接口並重寫方法afterPropertiesSet()
public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
    
    public void afterPropertiesSet() { //重寫initializingBean中的方法
		if (shardInfo == null && clientConfiguration instanceof MutableJedisClientConfiguration) {
			providedShardInfo = false;
			shardInfo = new JedisShardInfo(getHostName(), getPort(), isUseSsl(), 
					clientConfiguration.getSslSocketFactory().orElse(null), 
					clientConfiguration.getSslParameters().orElse(null), 
					clientConfiguration.getHostnameVerifier().orElse(null));
			getRedisPassword().map(String::new).ifPresent(shardInfo::setPassword);
			int readTimeout = getReadTimeout();
			if (readTimeout > 0) {
				shardInfo.setSoTimeout(readTimeout);
			}
			getMutableConfiguration().setShardInfo(shardInfo);
		}
		if (getUsePool() && !isRedisClusterAware()) {
			this.pool = createPool();
		}
		if (isRedisClusterAware()) {
			this.cluster = createCluster();  //創建集群調用createCluster()
		}
	}
     private JedisCluster createCluster() {
		JedisCluster cluster = createCluster((RedisClusterConfiguration) this.configuration, getPoolConfig());
		JedisClusterTopologyProvider topologyProvider = new JedisClusterTopologyProvider(cluster);
         //6.調用靜態類JedisClusterConnection.JedisClusterNodeResourceProvider(cluster, topologyProvider)
		this.clusterCommandExecutor = new ClusterCommandExecutor(topologyProvider,
				new JedisClusterConnection.JedisClusterNodeResourceProvider(cluster, topologyProvider), EXCEPTION_TRANSLATION);
		return cluster;
	}
}

//靜態類方法
public class JedisClusterConnection implements DefaultedRedisClusterConnection {

	static class JedisClusterNodeResourceProvider implements ClusterNodeResourceProvider {

		private final JedisCluster cluster;
		private final ClusterTopologyProvider topologyProvider;
		private final JedisClusterConnectionHandler connectionHandler;

		JedisClusterNodeResourceProvider(JedisCluster cluster, ClusterTopologyProvider topologyProvider) {
			this.cluster = cluster;
			this.topologyProvider = topologyProvider;
			if (cluster != null) {
				PropertyAccessor accessor = new DirectFieldAccessFallbackBeanWrapper(cluster);
                //7.獲取JedisClusterConnectionHandler
				this.connectionHandler = accessor.isReadableProperty("connectionHandler") 
						? (JedisClusterConnectionHandler) accessor.getPropertyValue("connectionHandler") 
						: null;
			} else {
				this.connectionHandler = null;
			}
		}
    }
}

//JedisClusterConnectionHandler
public abstract class JedisClusterConnectionHandler implements Closeable {
	protected final JedisClusterInfoCache cache; 
     //8.初始化JedisClusterConnectionHandler調用initializeSlotsCache方法
    public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
             final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
    	this(nodes, poolConfig, connectionTimeout, soTimeout, password, null);//8.1
    }
    public JedisClusterConnectionHandler(Set<HostAndPort> nodes,//8.2
          final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, String clientName) {
        this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password, clientName);
        initializeSlotsCache(nodes, poolConfig, connectionTimeout, soTimeout, password, clientName);//8.3
	}
    //8.4
    private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig,
                                    int connectionTimeout, int soTimeout, String password, String clientName) {
        for (HostAndPort hostAndPort : startNodes) {
          Jedis jedis = null;
          try {
            jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout);
            if (password != null) {
              jedis.auth(password);
            }
            if (clientName != null) {
              jedis.clientSetname(clientName);
            }
            cache.discoverClusterNodesAndSlots(jedis);  //9.調用緩存方法discoverClusterNodesAndSlots發現集群節點和對應槽數量
            break;
          } catch (JedisConnectionException e) {
            // try next nodes
          } finally {
            if (jedis != null) {
              jedis.close();
            }
          }
        }
  	} 
}

//集群信息緩存類  包含了對節點和槽操作的所有方法
public class JedisClusterInfoCache {
    public void discoverClusterNodesAndSlots(Jedis jedis) {
        w.lock();
        try {
          reset();
          /** slots.size()為3,共三個主節點
           [[0,5460,["10.19.185.17",6023,""],["10.19.185.18",6027,""]],
            [10923,16383,["10.19.185.19",6025,""],["10.19.185.17",6026,""]],
            [5461,10922,["10.19.185.18",6024,""],["10.19.185.19",6028,""]]]
           **/ 
          List<Object> slots = jedis.clusterSlots(); //TODO 通信協議需要再研究一下

           //遍歷三個節點
          for (Object slotInfoObj : slots) {
            List<Object> slotInfo = (List<Object>) slotInfoObj;

            if (slotInfo.size() <= MASTER_NODE_INDEX) {
              continue;
            }
			/**
				0-5460
				5461-10922
				10923-16383
			**/
            List<Integer> slotNums = getAssignedSlotArray(slotInfo);

            int size = slotInfo.size();
            for (int i = MASTER_NODE_INDEX; i < size; i++) {
              List<Object> hostInfos = (List<Object>) slotInfo.get(i);
              if (hostInfos.size() <= 0) {
                continue;
              }

              HostAndPort targetNode = generateHostAndPort(hostInfos);
              setupNodeIfNotExist(targetNode);
              if (i == MASTER_NODE_INDEX) {
                assignSlotsToNode(slotNums, targetNode);//10.將槽和節點對應
              }
            }
          }
        } finally {
          w.unlock();
        }
    }
    
    private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
    List<Integer> slotNums = new ArrayList<Integer>();
    for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1))
        .intValue(); slot++) {
      slotNums.add(slot);
    }
    return slotNums;
  }
}

以下為應用啟動時可進入斷點查看緩存信息:



下圖將原集群中的6024節點移除,槽遷移到6023上,訪問客戶端,可以看到節點與槽的對應關系。
后再將6023上的槽遷移到6024上,訪問客戶端,可以看到節點與槽之間的對應關系緩存已重新加載


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM