在流處理程序中,往往會涉及到關聯維表的操作
- 對於維表數據量較大的:我們可以使用異步IO+Guava Cache的方式,通過訪問本地緩存來關聯數據,緩存中沒有的數據通過異步IO去查詢,將查詢的結果put到Guava Cache中,通過Guava Cache的緩存失效來做到維表數據的定時更新。
- 對於維表數據量較小的表,可以嘗試全量加載到內存中,這樣會使我們程序的處理效率更快。
下面來介紹一下,自己在開發過程中的兩種方法:
Guava Cache:
第一種方式就是使用谷歌的Guava Cache,示例如下:
busCaches = CacheBuilder.newBuilder()
.maximumSize(50000)
.refreshAfterWrite(24, TimeUnit.HOURS)
.build(new CacheLoader<String, Map<String, String>>() {
@Override
public Map<String, String> load( @Nullable String key) {
return getBusCache();
}
});
簡單說下Guava Cache的緩存失效和刷新策略
- expireAfterWrite(long duration, TimeUnit unit):在元素 寫入或者更新 后的一段時間之后,下次訪問時,如果超過了這個時間,那么會回收該Key,然后同步執行 load() 。
- expireAfterAccess(long duration, TimeUnit unit):在元素 寫入或者更新或者讀 后的一段時間之后,下次訪問時,如果超過了這個時間,那么會回收該Key,然后同步執行 load() 。
- refreshAfterWrite(long duration, TimeUnit unit):在元素 寫入或者更新 后的一段時間之后,下次訪問時,如果超過了這個時間,會進行異步刷新,其他線程訪問舊值,當前線程更新值,能有效的避免阻塞,但缺點是可能訪問不到最新的值。
ScheduledExecutorService線程調度 + ReentrantLock鎖:
第二種方式是通過線程池調度的方式去定時更新,在進行更新操作的時候可以進行加鎖操作,在實際的獲取維表數據的方法里,也需要對應的加上鎖操作
ReentrantLock reentrantLock = new ReentrantLock();
ScheduledExecutorService poolExecutor;
long delay = Calculate.getScheduleTime(2,32);
long period = Calculate.getRefreshTime(24);
int poolSize = 1;
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("threadName")
.build();
poolExecutor = new ScheduledThreadPoolExecutor(poolSize,threadFactory);
poolExecutor.scheduleAtFixedRate(() -> {
reentrantLock.lock();
try {
initData();
log.info("routePoint2idx:" + routePoint2idx.size());
log.info("routeGrid2key:" + routeGrid2key.size());
log.info("數據預處理over");
} catch (Exception e) {
log.error("定時更新線路數據異常:{}", e.getMessage());
}finally {
reentrantLock.unlock();
}
}, delay, period, TimeUnit.SECONDS);
注意點:這樣的操作我是寫在open方法中的,所以在高並行度的情況下,維表數據會加載n(並行度)份,有可能會使得taskmanager內存消耗增加。
