第三集:分布式Ehcache緩存改造
前言
好久沒有寫博客了,大有半途而廢的趨勢。忙不是借口,這個好習慣還是要繼續堅持。前面我承諾的第一期的DIY分布式,是時候上終篇了---DIY分布式緩存。
探索之路
在前面的文章中,我給大家大致說過項目背景:項目中的緩存使用的是Ehcache。因為前面使用Ehcache的應用就一台,所以這種單機的Ehcache並不會有什么問題。現在分布式部署之后,如果各個應用之間的緩存不能共享,那么其實各自就是一個孤島。可能在一個業務跑下來,請求了不同的應用,結果在緩存中取出來的值不一樣,
造成數據不一致。所以需要重新設計緩存的實現。
因為盡量不要引入新的中間件,所以改造仍然是圍繞Ehcache來進行的。搜集了各種資料之后,發現Ehcache實現分布式緩存基本有以下兩種思路:
客戶端實現分布式算法: 在使用Ehcache的客戶端自己實現分布式算法。
算法的基本思路就是取模:即假設有三台應用(編號假設分別為0,1,2),對於一個要緩存的對象,首先計算其key的hash值,然后將hash值模3,得到的余數是幾,就將數據緩存到哪台機器。
同步冗余數據: Ehcache是支持集群配置的,集群的各個節點之間支持按照一定的協議進行數據同步。這樣每台應用其實緩存了一整份數據,不同節點之間的數據是一致的。
雖然冗余的辦法顯得有點浪費資源,但是我最終還是選擇了冗余。具體原因有以下幾點:
- 分布式算法的復雜性: 前面所講的分布式算法只是最基本的實現。事實上實現要比這個復雜的多。需要考慮增加或者刪除節點的情況,需要使用更加復雜的一致性hash算法
- 可能導致整個應用不可用: 當刪除節點之后,如果算法不能夠感知進行自動調整,仍然去請求那個已經被刪除的節點,可能導致整個系統不可用。
Demo
最終我的實現采用RMI的方式進行同步
配置ehcache
spring-ehcache-cache.xml
<?xml version="1.0" encoding="UTF-8"?>
<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://ehcache.org/ehcache.xsd" name="businessCaches">
<diskStore path="java.io.tmpdir/ehcache"/>
<cache name="business1Cache"
maxElementsInMemory="10000000"
eternal="true"
overflowToDisk="false"
memoryStoreEvictionPolicy="LRU">
<cacheEventListenerFactory
class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"/>
</cache>
<cache name="business2Cache"
maxElementsInMemory="100"
eternal="true"
overflowToDisk="false"
memoryStoreEvictionPolicy="LRU">
<cacheEventListenerFactory
class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"/>
</cache>
<!-- cache發布信息配置,人工發現peerDiscovery=manual,cacheNames可配置多個緩存名稱,以|分割 ) -->
<cacheManagerPeerProviderFactory
class="com.rampage.cache.distribute.factory.DisRMICacheManagerPeerProviderFactory"
properties="peerDiscovery=manual, cacheNames=business1Cache|business2Cache" />
<!-- 接收同步cache信息的地址 -->
<cacheManagerPeerListenerFactory
class="com.rampage.cache.distribute.factory.DisRMICacheManagerPeerListenerFactory"
properties="socketTimeoutMillis=2000" />
</ehcache>
spring-cache.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:cache="http://www.springframework.org/schema/cache"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd"
default-autowire="byName">
<!-- 包掃描 -->
<context:component-scan base-package="com.rampage.cache" />
<!-- 啟用Cache注解 -->
<cache:annotation-driven cache-manager="cacheManager"
key-generator="keyGenerator" proxy-target-class="true" />
<!-- 自定義的緩存key生成類,需實現org.springframework.cache.interceptor.KeyGenerator接口 -->
<bean id="keyGenerator" class="com.rampage.cache.support.CustomKeyGenerator" />
<!-- 替換slite的ehcache實現 -->
<bean id="ehCacheManagerFactory" class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean">
<property name="configLocation" value="classpath:spring/cache/sppay-ehcache-cache.xml"/>
<!-- value對應前面ehcache文件定義的manager名稱 -->
<property name="cacheManagerName" value="businessCaches" />
</bean>
<bean id="ehCacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager">
<property name="cacheManager" ref="ehCacheManagerFactory"/>
</bean>
<bean id="cacheManager" class="org.springframework.cache.support.CompositeCacheManager">
<property name="cacheManagers">
<list>
<ref bean="ehCacheManager" />
</list>
</property>
<property name="fallbackToNoOpCache" value="true" />
</bean>
</beans>
實現自定義轉發和監聽
細心的讀者應該不難發現,前面xml配置中cacheManagerPeerProviderFactory
和cacheManagerPeerListenerFactory
我使用的都是自定義的類。之所以使用自定義的類,是為了在初始化的時候發布的地址和端口,監聽的地址端口可以在配置文件配置。具體類的實現如下:
/**
* 分布式EhCache監聽工廠
* @author secondWorld
*
*/
public class DisRMICacheManagerPeerListenerFactory extends RMICacheManagerPeerListenerFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(DisRMICacheManagerPeerListenerFactory.class);
/**
* 配置文件中配置的監聽地址,可以不配置,默認為本機地址
*/
private static final String LISTEN_HOST = "distribute.ehcache.listenIP";
/**
* 配置文件中配置的監聽端口
*/
private static final String LISTEN_PORT = "distribute.ehache.listenPort";
@Override
protected CacheManagerPeerListener doCreateCachePeerListener(String hostName, Integer port,
Integer remoteObjectPort, CacheManager cacheManager, Integer socketTimeoutMillis) {
// xml中hostName為空,則讀取配置文件(app-config.properties)中的值
if (StringUtils.isEmpty(hostName)) {
String propHost = AppConfigPropertyUtils.get(LISTEN_HOST);
if (StringUtils.isNotEmpty(propHost)) {
hostName = propHost;
}
}
// 端口采用默認端口0,則去讀取配置文件(app-config.properties)中的值
if (port != null && port == 0) {
Integer propPort = null;
try {
propPort = Integer.parseInt(AppConfigPropertyUtils.get(LISTEN_PORT));
} catch (NumberFormatException e) {
}
if (propPort != null) {
port = propPort;
}
}
LOGGER.info(
"Initiliazing DisRMICacheManagerPeerListenerFactory:cacheManager[{}], hostName[{}], port[{}], remoteObjectPort[{}], socketTimeoutMillis[{}]......",
cacheManager, hostName, port, remoteObjectPort, socketTimeoutMillis);
return super.doCreateCachePeerListener(hostName, port, remoteObjectPort, cacheManager, socketTimeoutMillis);
}
}
/**
* 分布式EhCache發布工廠
*
* @author secondWorld
*
*/
public class DisRMICacheManagerPeerProviderFactory extends RMICacheManagerPeerProviderFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(DisRMICacheManagerPeerProviderFactory.class);
private static final String CACHENAME_DELIMITER = "|";
private static final String PROVIDER_ADDRESSES = "distribute.ehcache.providerAddresses";
private static final String CACHE_NAMES = "cacheNames";
/**
* rmi地址格式: //127.0.0.1:4447/Cache1|//127.0.0.1:4447/Cache2
*/
@Override
protected CacheManagerPeerProvider createManuallyConfiguredCachePeerProvider(Properties properties) {
// 從app-config.properties中讀取發布地址列表
String providerAddresses = AppConfigPropertyUtils.get(PROVIDER_ADDRESSES, StringUtils.EMPTY);
// 從ehcache配置文件讀取緩存名稱
String cacheNames = PropertyUtil.extractAndLogProperty(CACHE_NAMES, properties);
// 參數校驗,這里發布地址和緩存名稱都不能為空
if (StringUtils.isEmpty(providerAddresses) || StringUtils.isEmpty(cacheNames)) {
throw new IllegalArgumentException("Elements \"providerAddresses\" and \"cacheNames\" are needed!");
}
// 解析地址列表
List<String> cachesNameList = getCacheNameList(cacheNames);
List<String> providerAddressList = getProviderAddressList(providerAddresses);
// 注冊發布節點
RMICacheManagerPeerProvider rmiPeerProvider = new ManualRMICacheManagerPeerProvider();
StringBuilder sb = new StringBuilder();
for (String cacheName : cachesNameList) {
for (String providerAddress : providerAddressList) {
sb.setLength(0);
sb.append("//").append(providerAddress).append("/").append(cacheName);
rmiPeerProvider.registerPeer(sb.toString());
LOGGER.info("Registering peer provider [{}]", sb);
}
}
return rmiPeerProvider;
}
/**
* 得到發布地址列表
* @param providerAddresses 發布地址字符串
* @return 發布地址列表
*/
private List<String> getProviderAddressList(String providerAddresses) {
StringTokenizer stringTokenizer = new StringTokenizer(providerAddresses,
AppConfigPropertyUtils.APP_ITEM_DELIMITER);
List<String> ProviderAddressList = new ArrayList<String>(stringTokenizer.countTokens());
while (stringTokenizer.hasMoreTokens()) {
String providerAddress = stringTokenizer.nextToken();
providerAddress = providerAddress.trim();
ProviderAddressList.add(providerAddress);
}
return ProviderAddressList;
}
/**
* 得到緩存名稱列表
* @param cacheNames 緩存名稱字符串
* @return 緩存名稱列表
*/
private List<String> getCacheNameList(String cacheNames) {
StringTokenizer stringTokenizer = new StringTokenizer(cacheNames, CACHENAME_DELIMITER);
List<String> cacheNameList = new ArrayList<String>(stringTokenizer.countTokens());
while (stringTokenizer.hasMoreTokens()) {
String cacheName = stringTokenizer.nextToken();
cacheName = cacheName.trim();
cacheNameList.add(cacheName);
}
return cacheNameList;
}
@Override
protected CacheManagerPeerProvider createAutomaticallyConfiguredCachePeerProvider(CacheManager cacheManager,
Properties properties) throws IOException {
throw new UnsupportedOperationException("Not supported automatic distribute cache!");
}
}
配置
假設有三台機器,則他們分別得配置如下:
#應用1,在4447端口監聽
#緩存同步消息發送地址(如果同步到多台需要配置多台地址,多台地址用英文逗號分隔)
distribute.ehcache.providerAddresses=127.0.0.1:4446,127.0.0.1:4448
#緩存同步監聽端口和IP
distribute.ehache.listenPort=4447
distribute.ehcache.listenIP=localhost
#應用2,在4448端口監聽
#緩存同步消息發送地址(如果同步到多台需要配置多台地址,多台地址用英文逗號分隔)
distribute.ehcache.providerAddresses=127.0.0.1:4446,127.0.0.1:4447
#緩存同步監聽端口和IP
distribute.ehache.listenPort=4448
distribute.ehcache.listenIP=localhost
#應用3,在4446端口監聽
#緩存同步消息發送地址(如果同步到多台需要配置多台地址,多台地址用英文逗號分隔)
distribute.ehcache.providerAddresses=127.0.0.1:4447,127.0.0.1:4448
#緩存同步監聽端口和IP
distribute.ehache.listenPort=4446
distribute.ehcache.listenIP=localhost
使用
使用的時候直接通過Spring的緩存注解即可。簡單的示例如下:
@CacheConfig("business1Cache")
@Component
public class Business1 {
@Cacheable
public String getData(String key) {
// TODO:...
}
}
說明
前面的實現是通過RMI的方式來實現緩存同步的,相對來說RMI的效率還是很快的。所以如果不需要實時的緩存一致性,允許少許延遲,那么這種方式的實現足夠。
總結
到這篇完成,分布式改造的第一章算是告一段落了。對於分布式,如果可以選擇,必然要選擇現在成熟的框架。但是項目有很多時候,由於各種歷史原因,必須要在原來的基礎上改造。這個時候,希望我寫的這個系列對大家有所幫助。造輪子有時候就是這么簡單。