前不久,Flink社區發布了FLink 1.9版本,在其中包含了一個很重要的新特性,即state processor api,這個框架支持對checkpoint和savepoint進行操作,包括讀取、變更、寫入等等。
savepoint的可操作帶來了很多的可能性:
- 作業遷移
1.跨類型作業,假如有一個storm作業,將狀態緩存在外部系統,希望更好的利用flink的狀態機制來增加作業的穩定和減少數據的延遲,但如果直接遷移,必然面臨狀態的丟失,這時,可以將外部系統的狀態轉換為flink作業的savepoint來啟動。
2.同類型作業,假如有一個flink作業已經在運行,一個新的flink作業希望復用之前的某些狀態,也可以將savepoint進行處理重新寫入,進而使得新的flink作業可以在某個基礎上運行。
- 作業升級
1.有UID升級,一般情況下,如果升級前的operator已經設置了uid,那么可以直接升級,但是如果希望在之前的狀態數據上做些變更,這里就提供了一種接口。
2.無UID升級,在特殊情況下,一開始編寫了沒有UID的作業,后來改成了標准的有UID的作業,反而無法在之前的savepoint上啟動了,這時也可以對savepoint同時做升級。
- 作業校驗
1.異步校驗,一般而言,flink作業的最終結果都會持久化輸出,但在面臨問題的時候,如何確定哪一級出現問題,state processor api也提供了一種可能,去檢驗state中的數據是否與預期的一致。
- 作業擴展
1.橫向擴展,如果在flink作業一開始運行的時候,因為面對的數據量較小,設置了比較小的最大並行度,但在數據量增大的時候,卻沒辦法從老的savepoint以一個比之前的最大並行度更大的並行度來啟動作業,這時,也需要復寫savepoint的同時更改最大並行度。
2.縱向擴展,在flink作業中新添加了一個operator,從savepoint啟動的時候這個operator默認無狀態,可以手動構造數據,使得這個operator的表現和其他operator保持一致。
可以對savepoint進行哪些操作?
- 讀取savepoint
1.驗證,讀取出來的savepoint會轉換為一個dataSet,隨后可以以標准批處理的方式來驗證你的業務預期;
2.source,也可以以savepoint作為數據源,來作為你另一個作業的輸入。
- 寫入savepoint
1.寫入新的savepoint,可以寫入一個全新的savepoint,這個savepoint是獨立的存在,他可以有新的operator uid,新的operator state,以及新的max parallism等等。
2.復用原來的savepoint,可以在原來的savepoint的基礎上加入新的operator的state,在新的savepoint被使用之前,老的savepoint不允許被刪除。
那么究竟哪些state是可讀的?有哪些接口了?
可以看到,主要提供對三種state的訪問,operator state和broadcast state,其中broadcast state是一種特殊的operator state,因為他也支持自定義的serializer。
通關程序
目前在社區或者網上並沒有完整的樣例供大家參考,下面這個例子是完全在測試環境中跑通的,所有的flink相關組件的版本依賴都是1.9.0。
下面我們說明如何使用這個框架。
1.首先我們創建一個樣例作業來生成savepoint
主類代碼
1 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 2 env.enableCheckpointing(60*1000); 3 DataStream<Tuple2<Integer,Integer>> kafkaDataStream = 4 env.addSource(new SourceFunction<Tuple2<Integer,Integer>>() { 5 private boolean running = true; 6 private int key; 7 private int value; 8 private Random random = new Random(); 9 @Override 10 public void run(SourceContext<Tuple2<Integer,Integer>> sourceContext) throws Exception { 11 while (running){ 12 key = random.nextInt(5); 13 sourceContext.collect(new Tuple2<>(key,value++) ); 14 Thread.sleep(100); 15 } 16 } 17 18 @Override 19 public void cancel() { 20 running = false; 21 } 22 }).name("source").uid("source"); 23 24 25 kafkaDataStream 26 .keyBy(tuple -> tuple.f0) 27 .map(new StateTest.StateMap()).name("map").uid("map") 28 .print().name("print").uid("print");
在上面的代碼中,只需要注意在自定義的source中,發送tuple2消息,而做savepoint的
關鍵在於狀態,狀態在StateMap這個類中,如下:
1 public static class StateMap extends RichMapFunction<Tuple2<Integer,Integer>,String> { 2 private transient ListState<Integer> listState; 3 4 @Override 5 public void open(Configuration parameters) throws Exception { 6 ListStateDescriptor<Integer> lsd = 7 new ListStateDescriptor<>("list",TypeInformation.of(Integer.class)); 8 listState = getRuntimeContext().getListState(lsd); 9 } 10 11 @Override 12 public String map(Tuple2<Integer,Integer> value) throws Exception { 13 listState.add(value.f1); 14 return value.f0+"-"+value.f1; 15 } 16 17 @Override 18 public void close() throws Exception { 19 listState.clear(); 20 } 21 }
在上面的Map中,首先在open中聲明了一個ListState,然后在消息處理的邏輯中,也很簡單的只是把tuple2的值放進了listState中。然后提交作業,等作業運行一段時間之后,觸發一個savepoint,
並記錄savepoint的地址。至此,完成了state processor api驗證工作的數據准備。
2.利用state processor api讀取savepoint
這一步只是簡單驗證下savepoint是否能夠被正確讀取,代碼如下:
1 public class ReadListState { 2 protected static final Logger logger = LoggerFactory.getLogger(ReadListState.class); 3 4 public static void main(String[] args) throws Exception { 5 final String operatorUid = "map"; 6 final String savepointPath = 7 "hdfs://xxx/savepoint-41b05d-d517cafb61ba"; 8 9 final String checkpointPath = "hdfs://xxx/checkpoints"; 10 11 // set up the batch execution environment 12 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 13 14 RocksDBStateBackend db = new RocksDBStateBackend(checkpointPath); 15 DataSet<String> dataSet = Savepoint 16 .load(env, savepointPath, db) 17 .readKeyedState(operatorUid, new ReaderFunction()) 18 .flatMap(new FlatMapFunction<KeyedListState, String>() { 19 @Override 20 public void flatMap(KeyedListState keyedListState, Collector<String> collector) throws Exception { 21 keyedListState.value.forEach(new Consumer<Integer>() { 22 @Override 23 public void accept(Integer integer) { 24 collector.collect(keyedListState.key + "-" + integer); 25 } 26 }); 27 } 28 }); 29 30 dataSet.writeAsText("hdfs://xxx/test/savepoint/bravo"); 31 32 // execute program 33 env.execute("read the list state"); 34 } 35 36 static class KeyedListState { 37 Integer key; 38 List<Integer> value; 39 } 40 41 static class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedListState> { 42 private transient ListState<Integer> listState; 43 44 @Override 45 public void open(Configuration parameters) { 46 ListStateDescriptor<Integer> lsd = 47 new ListStateDescriptor<>("list", TypeInformation.of(Integer.class)); 48 listState = getRuntimeContext().getListState(lsd); 49 } 50 51 @Override 52 public void readKey( 53 Integer key, 54 Context ctx, 55 Collector<KeyedListState> out) throws Exception { 56 List<Integer> li = new ArrayList<>(); 57 listState.get().forEach(new Consumer<Integer>() { 58 @Override 59 public void accept(Integer integer) { 60 li.add(integer); 61 } 62 }); 63 64 KeyedListState kl = new KeyedListState(); 65 kl.key = key; 66 kl.value = li; 67 68 out.collect(kl); 69 } 70 } 71 }
在讀取了savepoint中的狀態之后,成功將其轉存為一個文件,文件的部分內容如下,每行的內容分別為key-value對:
3.利用state processor api重寫savepoint
savepoint是對程序某個運行時點的狀態的固化,方便程序在再次提交的時候進行接續,但有時候需要對savepoint中的狀態進行改寫,以方便從特定的狀態來啟動作業。
1 public class ReorganizeListState { 2 protected static final Logger logger = LoggerFactory.getLogger(ReorganizeListState.class); 3 public static void main(String[] args) throws Exception { 4 final String operatorUid = "map"; 5 final String savepointPath = 6 "hdfs://xxx/savepoint-41b05d-d517cafb61ba"; 7 8 final String checkpointPath = "hdfs://xxx/checkpoints"; 9 10 // set up the batch execution environment 11 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 12 13 RocksDBStateBackend db = new RocksDBStateBackend(checkpointPath); 14 DataSet<KeyedListState> dataSet = Savepoint 15 .load(env,savepointPath,db) 16 .readKeyedState(operatorUid,new ReaderFunction()) 17 .flatMap(new FlatMapFunction<KeyedListState, KeyedListState>() { 18 @Override 19 public void flatMap(KeyedListState keyedListState, Collector<KeyedListState> collector) throws Exception { 20 KeyedListState newState = new KeyedListState(); 21 newState.value = keyedListState.value.stream() 22 .map( x -> x+10000).collect(Collectors.toList()); 23 newState.key = keyedListState.key; 24 collector.collect(newState); 25 } 26 }); 27 28 BootstrapTransformation<KeyedListState> transformation = OperatorTransformation 29 .bootstrapWith(dataSet) 30 .keyBy(acc -> acc.key) 31 .transform(new KeyedListStateBootstrapper()); 32 33 Savepoint.create(db,128) 34 .withOperator(operatorUid,transformation) 35 .write("hdfs://xxx/test/savepoint/"); 36 37 // execute program 38 env.execute("read the list state"); 39 } 40 41 static class KeyedListState{ 42 Integer key; 43 List<Integer> value; 44 } 45 46 static class ReaderFunction extends KeyedStateReaderFunction<Integer, KeyedListState> { 47 private transient ListState<Integer> listState; 48 49 @Override 50 public void open(Configuration parameters) { 51 ListStateDescriptor<Integer> lsd = 52 new ListStateDescriptor<>("list",TypeInformation.of(Integer.class)); 53 listState = getRuntimeContext().getListState(lsd); 54 } 55 56 @Override 57 public void readKey( 58 Integer key, 59 Context ctx, 60 Collector<KeyedListState> out) throws Exception { 61 List<Integer> li = new ArrayList<>(); 62 listState.get().forEach(new Consumer<Integer>() { 63 @Override 64 public void accept(Integer integer) { 65 li.add(integer); 66 } 67 }); 68 69 KeyedListState kl = new KeyedListState(); 70 kl.key = key; 71 kl.value = li; 72 73 out.collect(kl); 74 } 75 } 76 77 static class KeyedListStateBootstrapper extends KeyedStateBootstrapFunction<Integer, KeyedListState> { 78 private transient ListState<Integer> listState; 79 80 @Override 81 public void open(Configuration parameters) { 82 ListStateDescriptor<Integer> lsd = 83 new ListStateDescriptor<>("list",TypeInformation.of(Integer.class)); 84 listState = getRuntimeContext().getListState(lsd); 85 } 86 87 @Override 88 public void processElement(KeyedListState value, Context ctx) throws Exception { 89 listState.addAll(value.value); 90 } 91 } 92 }
這里的關鍵在於根據上一步讀取出來dataSet,轉換的過程中將其值全部累加10000,然后將這個dataSet作為輸入來構建一個BootstrapTransformation,然后創建了一個空的savepoint,並把指定
operatorUid的狀態寫為一個savepoint,最終寫入成功,得到了一個新的savepoint,這個新的savepoint包含的狀態中的value相比原先的值發生了變化。
4.驗證新生產的savepoint是否可用
由於驗證用的state是ListState,換言之,是KeyedState,而KeyedState是屬於Flink托管的state,意味着Flink自己掌握狀態的保存和恢復的邏輯,所以為了驗證作業是否正確從新的savepoint
中啟動了,對之前的StateMap改寫如下:
1 public static class StateMap extends RichMapFunction<Tuple2<Integer,Integer>,String> { 2 private transient ListState<Integer> listState; 3 4 @Override 5 public void open(Configuration parameters) throws Exception { 6 ListStateDescriptor<Integer> lsd = 7 new ListStateDescriptor<>("list",TypeInformation.of(Integer.class)); 8 listState = getRuntimeContext().getListState(lsd); 9 } 10 11 @Override 12 public String map(Tuple2<Integer,Integer> value) throws Exception { 13 listState.add(value.f1); 14 log.info("get value:{}-{}",value.f0,value.f1); 15 StringBuilder sb = new StringBuilder(); 16 listState.get().forEach(new Consumer<Integer>() { 17 @Override 18 public void accept(Integer integer) { 19 sb.append(integer).append(";"); 20 } 21 }); 22 log.info("***********************taskNameAndSubTask:{},restored value:{}" 23 ,getRuntimeContext().getTaskNameWithSubtasks(),sb.toString()); 24 return value.f0+"-"+value.f1; 25 } 26 27 @Override 28 public void close() throws Exception { 29 listState.clear(); 30 } 31 }
由於無法在state恢復之后立刻就拿到相應恢復的數據,這里之后在每次消息達到的時候輸出下state中的內容,變通的看看是否恢復成功,結果如下:
可以對比看下上圖中key為4的輸出,可以看到輸出的值即為修改后的值,驗證成功。
5.結語
上面我們以一個keyedState來對state processor api做了驗證,但Flink的state分為KeyedState,OperatorState和BroadcastState,在state processor api中都提供相應的處理接口。
另外,對於keyedState,如果作業的並行度發生了變化會如何?如果Key發生了變化會如何?都需要進一步探究。
官方文檔參見:
https://flink.apache.org/feature/2019/09/13/state-processor-api.html
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html