在流处理程序中,往往会涉及到关联维表的操作
- 对于维表数据量较大的:我们可以使用异步IO+Guava Cache的方式,通过访问本地缓存来关联数据,缓存中没有的数据通过异步IO去查询,将查询的结果put到Guava Cache中,通过Guava Cache的缓存失效来做到维表数据的定时更新。
- 对于维表数据量较小的表,可以尝试全量加载到内存中,这样会使我们程序的处理效率更快。
下面来介绍一下,自己在开发过程中的两种方法:
Guava Cache:
第一种方式就是使用谷歌的Guava Cache,示例如下:
busCaches = CacheBuilder.newBuilder()
.maximumSize(50000)
.refreshAfterWrite(24, TimeUnit.HOURS)
.build(new CacheLoader<String, Map<String, String>>() {
@Override
public Map<String, String> load( @Nullable String key) {
return getBusCache();
}
});
简单说下Guava Cache的缓存失效和刷新策略
- expireAfterWrite(long duration, TimeUnit unit):在元素 写入或者更新 后的一段时间之后,下次访问时,如果超过了这个时间,那么会回收该Key,然后同步执行 load() 。
- expireAfterAccess(long duration, TimeUnit unit):在元素 写入或者更新或者读 后的一段时间之后,下次访问时,如果超过了这个时间,那么会回收该Key,然后同步执行 load() 。
- refreshAfterWrite(long duration, TimeUnit unit):在元素 写入或者更新 后的一段时间之后,下次访问时,如果超过了这个时间,会进行异步刷新,其他线程访问旧值,当前线程更新值,能有效的避免阻塞,但缺点是可能访问不到最新的值。
ScheduledExecutorService线程调度 + ReentrantLock锁:
第二种方式是通过线程池调度的方式去定时更新,在进行更新操作的时候可以进行加锁操作,在实际的获取维表数据的方法里,也需要对应的加上锁操作
ReentrantLock reentrantLock = new ReentrantLock();
ScheduledExecutorService poolExecutor;
long delay = Calculate.getScheduleTime(2,32);
long period = Calculate.getRefreshTime(24);
int poolSize = 1;
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("threadName")
.build();
poolExecutor = new ScheduledThreadPoolExecutor(poolSize,threadFactory);
poolExecutor.scheduleAtFixedRate(() -> {
reentrantLock.lock();
try {
initData();
log.info("routePoint2idx:" + routePoint2idx.size());
log.info("routeGrid2key:" + routeGrid2key.size());
log.info("数据预处理over");
} catch (Exception e) {
log.error("定时更新线路数据异常:{}", e.getMessage());
}finally {
reentrantLock.unlock();
}
}, delay, period, TimeUnit.SECONDS);
注意点:这样的操作我是写在open方法中的,所以在高并行度的情况下,维表数据会加载n(并行度)份,有可能会使得taskmanager内存消耗增加。
