flink中定时加载维表


在流处理程序中,往往会涉及到关联维表的操作

  • 对于维表数据量较大的:我们可以使用异步IO+Guava Cache的方式,通过访问本地缓存来关联数据,缓存中没有的数据通过异步IO去查询,将查询的结果put到Guava Cache中,通过Guava Cache的缓存失效来做到维表数据的定时更新。
  • 对于维表数据量较小的表,可以尝试全量加载到内存中,这样会使我们程序的处理效率更快。

下面来介绍一下,自己在开发过程中的两种方法:

Guava Cache:

第一种方式就是使用谷歌的Guava Cache,示例如下:

busCaches = CacheBuilder.newBuilder()
        .maximumSize(50000)
        .refreshAfterWrite(24, TimeUnit.HOURS)
        .build(new CacheLoader<String, Map<String, String>>() {
            @Override
            public Map<String, String> load( @Nullable String key) {
                return getBusCache();
            }
        });

简单说下Guava Cache的缓存失效和刷新策略

  • expireAfterWrite(long duration, TimeUnit unit):在元素 写入或者更新 后的一段时间之后,下次访问时,如果超过了这个时间,那么会回收该Key,然后同步执行 load() 。
  • expireAfterAccess(long duration, TimeUnit unit):在元素 写入或者更新或者读 后的一段时间之后,下次访问时,如果超过了这个时间,那么会回收该Key,然后同步执行 load() 。
  • refreshAfterWrite(long duration, TimeUnit unit):在元素 写入或者更新 后的一段时间之后,下次访问时,如果超过了这个时间,会进行异步刷新,其他线程访问旧值,当前线程更新值,能有效的避免阻塞,但缺点是可能访问不到最新的值

ScheduledExecutorService线程调度 + ReentrantLock锁:

第二种方式是通过线程池调度的方式去定时更新,在进行更新操作的时候可以进行加锁操作,在实际的获取维表数据的方法里,也需要对应的加上锁操作

ReentrantLock reentrantLock = new ReentrantLock();
ScheduledExecutorService poolExecutor;
long delay = Calculate.getScheduleTime(2,32);
long period = Calculate.getRefreshTime(24);
int poolSize = 1;
ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("threadName")
                .build();
poolExecutor = new ScheduledThreadPoolExecutor(poolSize,threadFactory);
poolExecutor.scheduleAtFixedRate(() -> {
    reentrantLock.lock();
    try {
        initData();
        log.info("routePoint2idx:" + routePoint2idx.size());
        log.info("routeGrid2key:" + routeGrid2key.size());
        log.info("数据预处理over");
    } catch (Exception e) {
        log.error("定时更新线路数据异常:{}", e.getMessage());
    }finally {
        reentrantLock.unlock();
    }
}, delay, period, TimeUnit.SECONDS);

注意点:这样的操作我是写在open方法中的,所以在高并行度的情况下,维表数据会加载n(并行度)份,有可能会使得taskmanager内存消耗增加。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM