flink中定時加載維表


在流處理程序中,往往會涉及到關聯維表的操作

  • 對於維表數據量較大的:我們可以使用異步IO+Guava Cache的方式,通過訪問本地緩存來關聯數據,緩存中沒有的數據通過異步IO去查詢,將查詢的結果put到Guava Cache中,通過Guava Cache的緩存失效來做到維表數據的定時更新。
  • 對於維表數據量較小的表,可以嘗試全量加載到內存中,這樣會使我們程序的處理效率更快。

下面來介紹一下,自己在開發過程中的兩種方法:

Guava Cache:

第一種方式就是使用谷歌的Guava Cache,示例如下:

busCaches = CacheBuilder.newBuilder()
        .maximumSize(50000)
        .refreshAfterWrite(24, TimeUnit.HOURS)
        .build(new CacheLoader<String, Map<String, String>>() {
            @Override
            public Map<String, String> load( @Nullable String key) {
                return getBusCache();
            }
        });

簡單說下Guava Cache的緩存失效和刷新策略

  • expireAfterWrite(long duration, TimeUnit unit):在元素 寫入或者更新 后的一段時間之后,下次訪問時,如果超過了這個時間,那么會回收該Key,然后同步執行 load() 。
  • expireAfterAccess(long duration, TimeUnit unit):在元素 寫入或者更新或者讀 后的一段時間之后,下次訪問時,如果超過了這個時間,那么會回收該Key,然后同步執行 load() 。
  • refreshAfterWrite(long duration, TimeUnit unit):在元素 寫入或者更新 后的一段時間之后,下次訪問時,如果超過了這個時間,會進行異步刷新,其他線程訪問舊值,當前線程更新值,能有效的避免阻塞,但缺點是可能訪問不到最新的值

ScheduledExecutorService線程調度 + ReentrantLock鎖:

第二種方式是通過線程池調度的方式去定時更新,在進行更新操作的時候可以進行加鎖操作,在實際的獲取維表數據的方法里,也需要對應的加上鎖操作

ReentrantLock reentrantLock = new ReentrantLock();
ScheduledExecutorService poolExecutor;
long delay = Calculate.getScheduleTime(2,32);
long period = Calculate.getRefreshTime(24);
int poolSize = 1;
ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("threadName")
                .build();
poolExecutor = new ScheduledThreadPoolExecutor(poolSize,threadFactory);
poolExecutor.scheduleAtFixedRate(() -> {
    reentrantLock.lock();
    try {
        initData();
        log.info("routePoint2idx:" + routePoint2idx.size());
        log.info("routeGrid2key:" + routeGrid2key.size());
        log.info("數據預處理over");
    } catch (Exception e) {
        log.error("定時更新線路數據異常:{}", e.getMessage());
    }finally {
        reentrantLock.unlock();
    }
}, delay, period, TimeUnit.SECONDS);

注意點:這樣的操作我是寫在open方法中的,所以在高並行度的情況下,維表數據會加載n(並行度)份,有可能會使得taskmanager內存消耗增加。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM