在流处理程序中,往往会涉及到关联维表的操作
- 对于维表数据量较大的:我们可以使用异步IO+Guava Cache的方式,通过访问本地缓存来关联数据,缓存中没有的数据通过异步IO去查询,将查询的结果put到Guava Cache中,通过Guava Cache的缓存失效来做到维表数据的定时更新。
- 对于维表数据量较小的表,可以尝试全量加载到内存中,这样会使我们程序的处理效率更快。
下面来介绍一下,自己在开发过程中的两种方法:
Guava Cache:
第一种方式就是使用谷歌的Guava Cache,示例如下:
busCaches = CacheBuilder.newBuilder() .maximumSize(50000) .refreshAfterWrite(24, TimeUnit.HOURS) .build(new CacheLoader<String, Map<String, String>>() { @Override public Map<String, String> load( @Nullable String key) { return getBusCache(); } });
简单说下Guava Cache的缓存失效和刷新策略
- expireAfterWrite(long duration, TimeUnit unit):在元素 写入或者更新 后的一段时间之后,下次访问时,如果超过了这个时间,那么会回收该Key,然后同步执行 load() 。
- expireAfterAccess(long duration, TimeUnit unit):在元素 写入或者更新或者读 后的一段时间之后,下次访问时,如果超过了这个时间,那么会回收该Key,然后同步执行 load() 。
- refreshAfterWrite(long duration, TimeUnit unit):在元素 写入或者更新 后的一段时间之后,下次访问时,如果超过了这个时间,会进行异步刷新,其他线程访问旧值,当前线程更新值,能有效的避免阻塞,但缺点是可能访问不到最新的值。
ScheduledExecutorService线程调度 + ReentrantLock锁:
第二种方式是通过线程池调度的方式去定时更新,在进行更新操作的时候可以进行加锁操作,在实际的获取维表数据的方法里,也需要对应的加上锁操作
ReentrantLock reentrantLock = new ReentrantLock(); ScheduledExecutorService poolExecutor; long delay = Calculate.getScheduleTime(2,32); long period = Calculate.getRefreshTime(24); int poolSize = 1; ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("threadName") .build(); poolExecutor = new ScheduledThreadPoolExecutor(poolSize,threadFactory); poolExecutor.scheduleAtFixedRate(() -> { reentrantLock.lock(); try { initData(); log.info("routePoint2idx:" + routePoint2idx.size()); log.info("routeGrid2key:" + routeGrid2key.size()); log.info("数据预处理over"); } catch (Exception e) { log.error("定时更新线路数据异常:{}", e.getMessage()); }finally { reentrantLock.unlock(); } }, delay, period, TimeUnit.SECONDS);
注意点:这样的操作我是写在open方法中的,所以在高并行度的情况下,维表数据会加载n(并行度)份,有可能会使得taskmanager内存消耗增加。