flink數據廣播場景總結


數據集廣播,主要分為廣播變量,廣播維表(數據集)兩種,一種為變量,一種為常量(抽象的說法);

一.數據廣播背景

對於小變量,小數據集,需要和大數據集,大流進行聯合計算的時候,往往把小數據集廣播出去,整體直接和大數據集(流)的分布式最小粒度數據進行計算,最后把計算結果合並,這樣效率更高,省去分布式節點之間的數據傳輸及二次計算。

例如:在Flink使用場景中,外部的配置文件或計算規則及維表等進行預加載,並定期更新,流式計算中廣播小變量等場景。

數據集的廣播,主要有以下幾種方式可以實現

1.預加載

在算子的open()方法中讀取MySQL或其他存儲介質,獲取全量維表信息比如在算子RichMapFunction的open()方法中獲取全部數據,然后在算子中進行使用,這種方法的缺點是如果外部數據更新了Flink是沒法知道的,這就需要在開啟一個定時任務定時從MySQL中獲取最新的數據。

2.外部查詢

數據不需要存儲,僅需要用到外部數據的時候去進行查詢,可以保證查詢到的數據是最新的,但是對於吞吐量較高的場景,可能與外部(比如MySQL)交互就變成了 Flink任務的瓶頸,雖然可以設置為異步I/O的形式進行交互優化,但優化程度一般有限。

3.本地緩存

需要設置過期時間或者定時更新數據,當數據到達過期時間后從新從外部獲取,或者定時從外部撈取數據進行更新,不能在外部數據發生變動時,及時更新到Flink程序中。

預加載和本地緩存難以應對當外部數據發生變化時,數據實時在Flink中保持更新。

二.什么是廣播

類似於全局性共享的數據,詳見官方文檔

https://flink.apache.org/2019/06/26/broadcast-state.html

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/stream/state/broadcast_state.html

 

廣播的優勢

廣播變量創建后,它可以運行在集群中的任何function上,而不需要多次傳遞給集群節點,可以直接在內存中拿數據,避免了大量的shuffle,導致集群性能下降。我們可以把一個dataset或者不變的緩存對象(例如maplist集合對象等)數據集廣播出去,然后不同的任務在節點上都能夠獲取到,並在每個節點上只會存在一份,而不是在每個並發線程中存在。

如果不使用broadcast,則在每個節點中的每個任務中都需要拷貝一份dataset數據集,比較浪費內存(也就是一個節點中可能會存在多份dataset數據)。廣播變量,可以借助下圖輔助理解。

 

三.廣播的使用

根據廣播使用場景將廣播的類型分為廣播變量和廣播流(其實廣播原理是一樣的)。

 

1.廣播變量

將廣播的數據作為一個整體或對象廣播,比如從MySQL中一次獲取全部數據,然后廣播出去,因為數據在MySql中,如果MySql中某條記錄發生變動,Flink的souce是沒法知道,也不會廣播。所以只能在souce中定時從MySql中獲取全部數據,然后廣播更新。

示例數據格式:

kafka源流數據,只有itemid,沒有ip和port
{"host":"orcl", "itemid":"7875", "value":1}
{"host":"orcl2", "itemid":"7876", "value":2}
規則數據集在MySql,itemid關聯ip和port
itemid  ip              port
7875 192.168.199.105 1521
7876 192.168.199.106 1526

自定義MySql source, 定時 從Mysql獲取全部數據

@Override
public void run(SourceContext<HashMap<String, Tuple2<String, Integer>>> ctx) {
try {
 while (isRunning) {
  HashMap<String, Tuple2<String, Integer>> output = new HashMap<>();
  ResultSet resultSet = preparedStatement.executeQuery();
  //每隔60s獲取全部外部數據集
  while (resultSet.next()) {
   String itemid = resultSet.getString("itemid");
   String ip = resultSet.getString("ip");
   int port = resultSet.getInt("port");
   output.put(itemid, new Tuple2<>(ip, port));
  }
  ctx.collect(output);
  Thread.sleep(1000 * 60);
  }
 } catch (Exception ex) {
  log.error("從Mysql獲取配置異常...", ex);
 }
}

廣播代碼實現:

public void processElement(Map<String,Object> value, ReadOnlyContext ctx, Collector<Map<String,Object>> out) throws Exception {
   //從廣播中獲取全量數據
  ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>>      broadcastState = ctx.getBroadcastState(ruleStateDescriptor);
 //獲取全部規則數據進行匹配

Map<String, Tuple2<String, Integer>>  itemrules=  broadcastState.get(null);
  //規則數據集為空跳過
    if(itemrules==null) {
     return;
    }
    //事件流中的itemid
    Object itemidObj = value.get("itemid"); // value kafka流中數據獲取itemid
if (itemidObj == null) {
 return;
}
    Tuple2<String, Integer> itemruld = itemrules.get(itemidObj.toString());
    if(itemruld!=null){
     //匹配成功增加ip,port字段
     value.put("ip", itemruld.f0);
     value.put("port", itemruld.f1);
        out.collect(value);
    }
}
@Override
public void processBroadcastElement(HashMap<String, Tuple2<String, Integer>> value, Context ctx, Collector<Map<String,Object>> out) throws Exception {
   //數據全部更新
BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx
  .getBroadcastState(ruleStateDescriptor);
//每次更新全部規則數據
broadcastState.put(null, value);
    System.out.println("規則全部更新成功,更新item規則:" + value);
}

執行結果:

//itemid=7875關聯ip=192.168.199.104
{host=orcl, itemid=7875, value=1, ip=192.168.199.104, port=1521}
//手動將mysql中的ip=192.168.199.104改為ip=192.168.199.105,在source 休眠結束后將會更新數據
規則更新成功,更新item規則:{7875=(192.168.199.105,1521), 7876=(192.168.199.106,1526)}
//itemid=7875關聯ip=192.168.199.105
{host=orcl, itemid=7875, value=1, ip=192.168.199.105, port=1521}

這種方式和預加載很像,都是通過定時任務加載全部數據,只不過是方法的位置不同,一個是在自定義source中設置休眠時間,另外一個是在算子的open方法中設置定時任務,廣播變量的方式同樣無法做到數據修改后實時更新。

 

2.廣播流

當數據來源於kafka時,Flink消費kafka獲取流,將流數據存儲在廣播狀態中,稱之為廣播流,不同於廣播變量一次獲取全部數據,廣播流是kafka新增一條記錄就將這條記錄存儲到廣播中,那廣播流如何實現外部數據的新增和更新?

kafka源流數據,只有itemid,沒有ip和port
{"host":"orcl", "itemid":"7875", "value":1}
{"host":"orcl2", "itemid":"7876", "value":2}
規則數據集在kafka,itemid關聯ip和port
{"itemid":"7875","ip":"192.168.199.104","port":1521}
{"itemid":"7876","ip":"192.168.199.106","port":1526}

2.1 外部數據新增和修改記錄

// 廣播狀態底層結構是Map結構
//kafka中的數據,flink消費后存儲到廣播狀態,在廣播狀態中以itemid為key進行存儲
{"itemid":"7875","ip":"192.168.199.104","port":1521}
{"itemid":"7876","ip":"192.168.199.106","port":1526}
//新增  往kafka寫入新記錄(key不相同),flink會持續消費kafka並將數據通過Map的put()方法存入廣播狀態
//修改  往kafka寫入新記錄(key相同),put()方法覆蓋之前的這條記錄以達到更新的目的
//比如需要更新itemid的ip和port值,要求往kafka中寫入一條新數據,比如更新itemid 7875的ip和port
{"itemid":"7875","ip":"192.168.199.105","port":1525}

2.2 刪除記錄

//kafka中的數據,flink消費后存儲到廣播狀態,以itemid為key進行存儲
{"itemid":"7875","ip":"192.168.199.104","port":1521}
{"itemid":"7876","ip":"192.168.199.106","port":1526}
//如果需要刪除某條記錄,往kafka中寫入帶有key的數據和刪除標記即可
//比如刪除itemid為7875的記錄,要求往kafka中寫入一條新數據,程序刪除廣播中itemid7875的記錄
{"itemid":"7875","isRemove":true}

由於消費kafka流是實時的,kafka的新記錄會實時進行消費,根據新記錄的內容對廣播數據實時的進行新增,修改或刪除

同時由於kafka中的數據是不可變的,當程序需要重啟時,只需從頭消費kafka即可,由於具有冪等性,最終的廣播數據是不會變的。

示例代碼

//Flink消費外部kafka規則數據作為流
FlinkKafkaConsumer<ItemRuleEntiy> ruleKafkaConsumer = new FlinkKafkaConsumer<ItemRuleEntiy>("topic",new ItemRuleEntiyPojoSchema(),properties);
DataStream<ItemRuleEntiy> ruleStream = env.addSource(ruleKafkaConsumer);

//廣播方法
@Override
public void processBroadcastElement(ItemRuleEntiy value,
 BroadcastProcessFunction<Map<String, Object>, ItemRuleEntiy, Map<String, Object>>.Context ctx,
 Collector<Map<String, Object>> out) throws Exception {
BroadcastState<String, ItemRuleEntiy> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);
if (StringUtils.isNoneBlank(value.getItemid())) {
 System.out.println("獲取到新的廣播規則:" + value);
 //相比廣播變量,這里每次只存一條規則,相同key則覆蓋修改
 broadcastState.put(value.getItemid(), value); // 存放數據到廣播
}
}
@Override
public void processElement(Map<String, Object> value,
 BroadcastProcessFunction<Map<String, Object>, ItemRuleEntiy, Map<String, Object>>.ReadOnlyContext ctx,
 Collector<Map<String, Object>> out) throws Exception {
ReadOnlyBroadcastState<String, ItemRuleEntiy> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);
Object itemidObj = value.get("itemid"); // 源kafka流中數據獲取itemid
if (itemidObj == null) {
 return;
}
// 根據item從廣播數據中查找規則,能查到,則增加ip,port字段
ItemRuleEntiy itemRule = broadcastState.get(itemidObj.toString());
if (itemRule != null) { // 從廣播中撈取到數據時
 value.put("ip", itemRule.getIp());
 value.put("port", itemRule.getPort());
 out.collect(value);
}
}

執行結果

//  所有廣播規則數據:
7875={itemid=7875, ip=192.168.199.104, port=1521}
7876={itemid=7876, ip=192.168.199.106, port=1526}
//itemid=7875關聯ip=192.168.199.104
({host=orcl, itemid=7875,value=1, ip=192.168.199.104, port=1521},7875)
//kafka寫入{itemid=7875, ip=192.168.199.105, port=1521}
 獲取到新的廣播規則:{itemid=7875, ip=192.168.199.105, port=1521}
 //itemid=7875關聯ip=192.168.199.105
{host=orcl,itemid=7875, value=1, ip=192.168.199.105, port=1521},7875)

 

四.總結

 

通過kafka廣播流的方式最終實現了Flink與外部數據交互的實時更新,不僅是kafka,還有MQ,甚至文件格式都可以作為廣播流,廣播流要求數據不能從內部更改(無法作為流消息被實時消費),只能通過新增的方式進行修改和刪除(新增記錄中key相同的表示覆蓋修改,帶key和刪除標記的表示刪除)

相比表(mysql,oracle)只是結果的呈現,日志(kafka或其它隊列)是一種帶有時間維度(或先后順序)信息的存儲,可以說表是二維的,日志是三維的,通過日志可以復原每個時間點的表,但是表不能還原日志。

廣播作為一種流(流明顯帶有時間特性),所以當不帶時間維度的表作為流時,是沒法形成真正意義上的流,只能通過定時獲取表的全部數據作為偽流,流中每個時間點的數據也只能是全量表數據,同時定時也就沒法做到實時獲取。只有帶時間維度的日志作為流時,才能做到實時獲取,而且每次只獲取最新的一條記錄即可,不用每次獲取全部數據。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM