Flink之state processor api實踐


 
        

    

    前不久,Flink社區發布了FLink 1.9版本,在其中包含了一個很重要的新特性,即state processor api,這個框架支持對checkpoint和savepoint進行操作,包括讀取、變更、寫入等等。

savepoint的可操作帶來了很多的可能性:

  • 作業遷移

  1.跨類型作業,假如有一個storm作業,將狀態緩存在外部系統,希望更好的利用flink的狀態機制來增加作業的穩定和減少數據的延遲,但如果直接遷移,必然面臨狀態的丟失,這時,可以將外部系統的狀態轉換為flink作業的savepoint來啟動。
  2.同類型作業,假如有一個flink作業已經在運行,一個新的flink作業希望復用之前的某些狀態,也可以將savepoint進行處理重新寫入,進而使得新的flink作業可以在某個基礎上運行。

  • 作業升級

  1.有UID升級,一般情況下,如果升級前的operator已經設置了uid,那么可以直接升級,但是如果希望在之前的狀態數據上做些變更,這里就提供了一種接口。
  2.無UID升級,在特殊情況下,一開始編寫了沒有UID的作業,后來改成了標准的有UID的作業,反而無法在之前的savepoint上啟動了,這時也可以對savepoint同時做升級。

  • 作業校驗

  1.異步校驗,一般而言,flink作業的最終結果都會持久化輸出,但在面臨問題的時候,如何確定哪一級出現問題,state processor api也提供了一種可能,去檢驗state中的數據是否與預期的一致。

  • 作業擴展

  1.橫向擴展,如果在flink作業一開始運行的時候,因為面對的數據量較小,設置了比較小的最大並行度,但在數據量增大的時候,卻沒辦法從老的savepoint以一個比之前的最大並行度更大的並行度來啟動作業,這時,也需要復寫savepoint的同時更改最大並行度。
  2.縱向擴展,在flink作業中新添加了一個operator,從savepoint啟動的時候這個operator默認無狀態,可以手動構造數據,使得這個operator的表現和其他operator保持一致。

可以對savepoint進行哪些操作?

  • 讀取savepoint

  1.驗證,讀取出來的savepoint會轉換為一個dataSet,隨后可以以標准批處理的方式來驗證你的業務預期;
  2.source,也可以以savepoint作為數據源,來作為你另一個作業的輸入。

  • 寫入savepoint

  1.寫入新的savepoint,可以寫入一個全新的savepoint,這個savepoint是獨立的存在,他可以有新的operator uid,新的operator state,以及新的max parallism等等。
  2.復用原來的savepoint,可以在原來的savepoint的基礎上加入新的operator的state,在新的savepoint被使用之前,老的savepoint不允許被刪除。

那么究竟哪些state是可讀的?有哪些接口了?

可以看到,主要提供對三種state的訪問,operator state和broadcast state,其中broadcast state是一種特殊的operator state,因為他也支持自定義的serializer。

通關程序

目前在社區或者網上並沒有完整的樣例供大家參考,下面這個例子是完全在測試環境中跑通的,所有的flink相關組件的版本依賴都是1.9.0。

下面我們說明如何使用這個框架。

1.首先我們創建一個樣例作業來生成savepoint
主類代碼
 1 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 2         env.enableCheckpointing(60*1000);
 3         DataStream<Tuple2<Integer,Integer>> kafkaDataStream =
 4                 env.addSource(new SourceFunction<Tuple2<Integer,Integer>>() {
 5             private boolean running = true;
 6             private int key;
 7             private int value;
 8             private Random random = new Random();
 9             @Override
10             public void run(SourceContext<Tuple2<Integer,Integer>> sourceContext) throws Exception {
11                 while (running){
12                     key = random.nextInt(5);
13                     sourceContext.collect(new Tuple2<>(key,value++) );
14                     Thread.sleep(100);
15                 }
16             }
17 
18             @Override
19             public void cancel() {
20                 running = false;
21             }
22         }).name("source").uid("source");
23 
24 
25         kafkaDataStream
26                 .keyBy(tuple -> tuple.f0)
27                 .map(new StateTest.StateMap()).name("map").uid("map")
28                 .print().name("print").uid("print");

在上面的代碼中,只需要注意在自定義的source中,發送tuple2消息,而做savepoint的
關鍵在於狀態,狀態在StateMap這個類中,如下:

 1 public static class StateMap extends RichMapFunction<Tuple2<Integer,Integer>,String>  {
 2         private transient ListState<Integer> listState;
 3 
 4         @Override
 5         public void open(Configuration parameters) throws Exception {
 6             ListStateDescriptor<Integer> lsd =
 7                     new ListStateDescriptor<>("list",TypeInformation.of(Integer.class));
 8             listState = getRuntimeContext().getListState(lsd);
 9         }
10 
11         @Override
12         public String map(Tuple2<Integer,Integer> value) throws Exception {
13             listState.add(value.f1);
14             return value.f0+"-"+value.f1;
15         }
16 
17         @Override
18         public void close() throws Exception {
19             listState.clear();
20         }
21     }

在上面的Map中,首先在open中聲明了一個ListState,然后在消息處理的邏輯中,也很簡單的只是把tuple2的值放進了listState中。然后提交作業,等作業運行一段時間之后,觸發一個savepoint,
並記錄savepoint的地址。至此,完成了state processor api驗證工作的數據准備。

2.利用state processor api讀取savepoint
這一步只是簡單驗證下savepoint是否能夠被正確讀取,代碼如下:

 1 public class ReadListState {
 2     protected static final Logger logger = LoggerFactory.getLogger(ReadListState.class);
 3 
 4     public static void main(String[] args) throws Exception {
 5         final String operatorUid = "map";
 6         final String savepointPath =
 7                 "hdfs://xxx/savepoint-41b05d-d517cafb61ba";
 8 
 9         final String checkpointPath = "hdfs://xxx/checkpoints";
10 
11         // set up the batch execution environment
12         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
13 
14         RocksDBStateBackend db = new RocksDBStateBackend(checkpointPath);
15         DataSet<String> dataSet = Savepoint
16                 .load(env, savepointPath, db)
17                 .readKeyedState(operatorUid, new ReaderFunction())
18                 .flatMap(new FlatMapFunction<KeyedListState, String>() {
19                     @Override
20                     public void flatMap(KeyedListState keyedListState, Collector<String> collector) throws Exception {
21                         keyedListState.value.forEach(new Consumer<Integer>() {
22                             @Override
23                             public void accept(Integer integer) {
24                                 collector.collect(keyedListState.key + "-" + integer);
25                             }
26                         });
27                     }
28                 });
29 
30         dataSet.writeAsText("hdfs://xxx/test/savepoint/bravo");
31 
32         // execute program
33         env.execute("read the list state");
34     }
35 
36     static class KeyedListState {
37         Integer key;
38         List<Integer> value;
39     }
40 
41     static class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedListState> {
42         private transient ListState<Integer> listState;
43 
44         @Override
45         public void open(Configuration parameters) {
46             ListStateDescriptor<Integer> lsd =
47                     new ListStateDescriptor<>("list", TypeInformation.of(Integer.class));
48             listState = getRuntimeContext().getListState(lsd);
49         }
50 
51         @Override
52         public void readKey(
53                 Integer key,
54                 Context ctx,
55                 Collector<KeyedListState> out) throws Exception {
56             List<Integer> li = new ArrayList<>();
57             listState.get().forEach(new Consumer<Integer>() {
58                 @Override
59                 public void accept(Integer integer) {
60                     li.add(integer);
61                 }
62             });
63 
64             KeyedListState kl = new KeyedListState();
65             kl.key = key;
66             kl.value = li;
67 
68             out.collect(kl);
69         }
70     }
71 }

在讀取了savepoint中的狀態之后,成功將其轉存為一個文件,文件的部分內容如下,每行的內容分別為key-value對:

3.利用state processor api重寫savepoint

savepoint是對程序某個運行時點的狀態的固化,方便程序在再次提交的時候進行接續,但有時候需要對savepoint中的狀態進行改寫,以方便從特定的狀態來啟動作業。

 1 public class ReorganizeListState {
 2     protected static final Logger logger = LoggerFactory.getLogger(ReorganizeListState.class);
 3     public static void main(String[] args) throws Exception {
 4         final String operatorUid = "map";
 5         final String savepointPath =
 6                 "hdfs://xxx/savepoint-41b05d-d517cafb61ba";
 7 
 8         final String checkpointPath = "hdfs://xxx/checkpoints";
 9 
10         // set up the batch execution environment
11         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
12 
13         RocksDBStateBackend db = new RocksDBStateBackend(checkpointPath);
14         DataSet<KeyedListState> dataSet = Savepoint
15                 .load(env,savepointPath,db)
16                 .readKeyedState(operatorUid,new ReaderFunction())
17                 .flatMap(new FlatMapFunction<KeyedListState, KeyedListState>() {
18                     @Override
19                     public void flatMap(KeyedListState keyedListState, Collector<KeyedListState> collector) throws Exception {
20                         KeyedListState newState = new KeyedListState();
21                         newState.value = keyedListState.value.stream()
22                         .map( x -> x+10000).collect(Collectors.toList());
23                         newState.key = keyedListState.key;
24                         collector.collect(newState);
25                     }
26                 });
27         
28         BootstrapTransformation<KeyedListState> transformation = OperatorTransformation
29                 .bootstrapWith(dataSet)
30                 .keyBy(acc -> acc.key)
31                 .transform(new KeyedListStateBootstrapper());
32 
33         Savepoint.create(db,128)
34                 .withOperator(operatorUid,transformation)
35                 .write("hdfs://xxx/test/savepoint/");
36 
37         // execute program
38         env.execute("read the list state");
39     }
40 
41     static class KeyedListState{
42         Integer key;
43         List<Integer> value;
44     }
45 
46     static class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedListState> {
47         private transient ListState<Integer> listState;
48 
49         @Override
50         public void open(Configuration parameters) {
51             ListStateDescriptor<Integer> lsd =
52                     new ListStateDescriptor<>("list",TypeInformation.of(Integer.class));
53             listState = getRuntimeContext().getListState(lsd);
54         }
55 
56         @Override
57         public void readKey(
58                 Integer key,
59                 Context ctx,
60                 Collector<KeyedListState> out) throws Exception {
61             List<Integer> li = new ArrayList<>();
62             listState.get().forEach(new Consumer<Integer>() {
63                 @Override
64                 public void accept(Integer integer) {
65                     li.add(integer);
66                 }
67             });
68 
69             KeyedListState kl = new KeyedListState();
70             kl.key = key;
71             kl.value = li;
72 
73             out.collect(kl);
74         }
75     }
76 
77     static class KeyedListStateBootstrapper extends KeyedStateBootstrapFunction<Integer, KeyedListState> {
78         private transient ListState<Integer> listState;
79 
80         @Override
81         public void open(Configuration parameters) {
82             ListStateDescriptor<Integer> lsd =
83                     new ListStateDescriptor<>("list",TypeInformation.of(Integer.class));
84             listState = getRuntimeContext().getListState(lsd);
85         }
86 
87         @Override
88         public void processElement(KeyedListState value, Context ctx) throws Exception {
89             listState.addAll(value.value);
90         }
91     }
92 }

這里的關鍵在於根據上一步讀取出來dataSet,轉換的過程中將其值全部累加10000,然后將這個dataSet作為輸入來構建一個BootstrapTransformation,然后創建了一個空的savepoint,並把指定
operatorUid的狀態寫為一個savepoint,最終寫入成功,得到了一個新的savepoint,這個新的savepoint包含的狀態中的value相比原先的值發生了變化。

4.驗證新生產的savepoint是否可用

由於驗證用的state是ListState,換言之,是KeyedState,而KeyedState是屬於Flink托管的state,意味着Flink自己掌握狀態的保存和恢復的邏輯,所以為了驗證作業是否正確從新的savepoint
中啟動了,對之前的StateMap改寫如下:

 1 public static class StateMap extends RichMapFunction<Tuple2<Integer,Integer>,String>  {
 2         private transient ListState<Integer> listState;
 3 
 4         @Override
 5         public void open(Configuration parameters) throws Exception {
 6             ListStateDescriptor<Integer> lsd =
 7                     new ListStateDescriptor<>("list",TypeInformation.of(Integer.class));
 8             listState = getRuntimeContext().getListState(lsd);
 9         }
10 
11         @Override
12         public String map(Tuple2<Integer,Integer> value) throws Exception {
13             listState.add(value.f1);
14             log.info("get value:{}-{}",value.f0,value.f1);
15             StringBuilder sb = new StringBuilder();
16             listState.get().forEach(new Consumer<Integer>() {
17                 @Override
18                 public void accept(Integer integer) {
19                     sb.append(integer).append(";");
20                 }
21             });
22             log.info("***********************taskNameAndSubTask:{},restored value:{}"
23                     ,getRuntimeContext().getTaskNameWithSubtasks(),sb.toString());
24             return value.f0+"-"+value.f1;
25         }
26 
27         @Override
28         public void close() throws Exception {
29             listState.clear();
30         }
31     }

由於無法在state恢復之后立刻就拿到相應恢復的數據,這里之后在每次消息達到的時候輸出下state中的內容,變通的看看是否恢復成功,結果如下:

可以對比看下上圖中key為4的輸出,可以看到輸出的值即為修改后的值,驗證成功。

5.結語
上面我們以一個keyedState來對state processor api做了驗證,但Flink的state分為KeyedState,OperatorState和BroadcastState,在state processor api中都提供相應的處理接口。
另外,對於keyedState,如果作業的並行度發生了變化會如何?如果Key發生了變化會如何?都需要進一步探究。

官方文檔參見:
https://flink.apache.org/feature/2019/09/13/state-processor-api.html
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM