Flink廣播流Demo


廣播狀態
從版本1.5.0開始,Apache Flink具有一種新的狀態,稱為廣播狀態。

三種應用場景

  1. 動態配置更新
  2. 規則改變
  3. 類似開關的功能
    假設場景,
    有兩條流,一條是普通的流,另一條是控制流,如果需要動態調整代碼邏輯時,可以使用廣播狀態
package com.haoziqi.chapter_09;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.nio.file.attribute.UserPrincipalLookupService;

/**
 * description
 * created by A on 2021/3/16
 */
public class State_BroadcastState {
    public static void main(String[] args) {
        //控制流發送到普通流后,普通流會收到一個廣播狀態
        //1.創建環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputDS = env.socketTextStream("localhost", 8888);
        DataStreamSource<String> controlDS = env.socketTextStream("localhost", 9999);
        //TODO 1.把其中一條流(控制流) 廣播出去
        //定義一個Map狀態描述器,控制流會把這個狀態廣播出去
        MapStateDescriptor<String, String> broadcast = new MapStateDescriptor<>("boradcast-state", Types.STRING, Types.STRING);
        BroadcastStream<String> contrlBS = controlDS.broadcast(broadcast);

        //TODO 2.把另一條流和廣播流關聯起來
        BroadcastConnectedStream<String, String> inputBCS = inputDS.connect(contrlBS);
        
        //TODO 3.調用Process

        inputBCS.process(
                new BroadcastProcessFunction<String, String, String>() {
                    /*
                        獲取廣播狀態,獲取數據進行處理
                     */
                    @Override
                    public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                        //TODO 5.通過上下文獲取廣播狀態,取出里面的值
                        ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(broadcast);
                        String aSwitch = broadcastState.get("switch");
                        if("1".equals(aSwitch)){
                            out.collect("切換到1的邏輯");
                        }else if("2".equals(aSwitch)){
                            out.collect("切換到2的邏輯");
                        }


                    }

                    /**
                     * 處理廣播流的數據:這里主要定義,什么數據往廣播狀態存
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                        // TODO 4.通過上下文獲取廣播狀態,並往廣播狀態里存數據
                        BroadcastState<String, String> broadcastState = ctx.getBroadcastState(broadcast);
                        broadcastState.put("switch",value);
                    }
                }
        ).print();
        //提交任務
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM