Flink StreamExecutionEnvironment API


Flink流式處理API流程圖

# 創建流式處理任務環境
StreamExecutionEnvironment env = StreamExceptionEnvironment.getExceptionEnvironment();

創建一個執行環境,表示當前執行程序的上下文,類似於SparkContext.

如果程序是獨立調用的,則此方法返回本地執行環境.;如果從命令行客戶端調用程序以提交到集群,則此方法返回此集群的執行環境.

案例:讀取攝氏度

Source:從集合和元素讀取數據

public class SensorReading {
    private String id;

    private Long ts;

    private Double temperature;

    public SensorReading(){
    }

    public SensorReading(String id,Long ts,Double temperature){
        this.id = id;
        this.ts = ts;
        this.temperature = temperature;
    }

    public String getId(){
    }

    public void setId(String id){
        this.id = id;
    }
}
# 主程序

import com.patrick.examples.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * @ClassName CollectionSourceTest
 * @Description 從集合中讀取數據
 * @Author Administrator
 * @Version 1.0
 **/
public class CollectionSourceTest {

    public static void main(String[] args) throws Exception {

        // 創建執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 從集合讀取數據
        DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList(
                new SensorReading("sensor_1", 1547718199L, 35.80018327300259),
                new SensorReading("sensor_6", 1547718201L, 15.402984393403084),
                new SensorReading("sensor_7", 1547718202L, 6.720945201171228),
                new SensorReading("sensor_10", 1547718205L, 38.101067604893444)
        ));

        // 直接讀取元素
        DataStream<Integer> integerDataStream = env.fromElements(10, 9, 2, 19, 87);

        // 輸出  設置每個stream的前綴標識
        dataStream.print("data");
        integerDataStream.print("int");

        // 執行 設置StreamJob名字
        env.execute("CollectionSourceTest");
    }
}

執行結果:

 

 

POJO和JavaBean規范:

  1) JavaBean的規范如下:

  a.實現 java.io.Serializable 接口.

  b.是一個公共類,類中必須存在一個無參數的構造函數,提供對應的setXxx()和getXxx()方法來存取類中的屬性.

  2) Flink的POJO規范如下:

  a.該類是公有的(public)和獨立的(沒有非靜態內部類)

  b.該類擁有公有的無參構造器

  c.該(以及所有超類)中的所有非靜態,非transient字段都是公有的(非final的),或者具有遵循JavaBean對於getter和setter命名規則的公有getter和setter方法.

案例:從文件讀取數據

import com.patrick.examples.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @ClassName file
 * @Description TODO
 * @Author Administrator
 * @Version 1.0
 **/
public class FileSourceTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt";

        // 從文件讀取數據
        DataStream<String> dataStream = env.readTextFile(fpath);

        // 轉換成SensorReading
        DataStream<SensorReading> sensorReadingDataStream = dataStream.map(line -> {
            String[] split = line.split(",");
            return new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
        });

        sensorReadingDataStream.print();

        env.execute();
    }
}

運行結果:

 

 

從Kafka讀取數據

針對不同版本的Kafka,Flink有對應的connectors,請參考這里.

當前案例版本Flink1.10.1 Kafka1.1.0

# pom
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.10.1</version>
</dependency>
# 主程序
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * @ClassName KafkaSourceTest
 * @Description 消費kafka信息
 * @Author Administrator
 * @Version 1.0
 **/
public class KafkaSourceTest {

    public static void main(String[] args) throws Exception {

        // 創建流式執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        properties.setProperty("group.id", "FlinkKafkaSourceTest");

        // 添加kafka source
        DataStream<String> kafkaDataStream = env.addSource(new FlinkKafkaConsumer<String>("patrick", new SimpleStringSchema(), properties))
                .setParallelism(6);

        System.out.println("kafkaDataStream parallelism: "+kafkaDataStream.getParallelism());

        kafkaDataStream.print();

        env.execute();
    }
}

創建消費者查看消費情況:

 

 

# 注意
# 創建分區數為3副本為2的topic
kafka-topics.sh --zookeeper hadoop1:2181 --create --replication-factor 2 --partitions 3 --topic patrick

# 控制台生成消息
kafka-console-producer.sh --broker-list hadoop1:9092 --topic patrick

 

自定義數據源

需要實現接口 org.apache.flink.streaming.api.functions.source.SourceFunction

import com.patrick.examples.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName CustomSourceTest
 * @Description 自定義Source
 * @Author Administrator
 * @Version 1.0
 **/
public class CustomSourceTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<SensorReading> dataStream = env.addSource(new MySensorSourceFunction(), "MySensorSourceFunction");

        dataStream.print();

        env.execute();
    }

    /**
     * 生產傳感器數據
     */
    public static class MySensorSourceFunction implements SourceFunction<SensorReading>{

        private boolean running = true;

        /**
         * 產生數據
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<SensorReading> ctx) throws Exception {
            Random random = new Random();

            // 這里想按照傳感器最后面的ID來按序輸出, 所以使用TreeMap並自定義比較器
            Map<String, Double> sensorTemp = new TreeMap<String, Double>((o1, o2)->{
                Integer s1 = Integer.valueOf(o1.split("_")[1]);
                Integer s2 = Integer.valueOf(o2.split("_")[1]);
                return s1.compareTo(s2);
            });

            // 定義10個傳感器並為其設置初始溫度
            for (int i=0; i<10; ++i){
                sensorTemp.put("sensor_"+(i+1), 60+random.nextGaussian()*20);
            }

            while (running){
                for(String sensorId : sensorTemp.keySet()){
                    Double newTemp = sensorTemp.get(sensorId)+random.nextGaussian();
                    sensorTemp.put(sensorId, newTemp);
                    // 發送數據
                    ctx.collect(new SensorReading(sensorId, System.currentTimeMillis()/1000, newTemp));

                }
                System.out.println("--------------------------------");
                // 控制發送頻率
                TimeUnit.SECONDS.sleep(3);
            }

        }

        @Override
        public void cancel() {
            running=false;
        }
    }

}

 

 

Transform

基本算子(map,flatMap,filter)

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @ClassName BaseTransformTest
 * @Description 基本算子 map flatMap filter
 * @Author Administrator
 * @Version 1.0
 **/
public class BaseTransformTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt";

        // 從文件讀取數據
        DataStream<String> inputStream = env.readTextFile(fpath);

        // 1. map 將每行數據轉換成每行的字符串長度
        DataStream<Integer> mapStream = inputStream.map(value -> value.length());

        // 2. flatMap 將每行數據按照逗號分割並輸出; 第二個參數是為了指定返回flatMap的返回類型
        DataStream<String> flatMapStream = inputStream.flatMap((String value, Collector<String> out) -> {
            for (String field : value.split(",")) {
                out.collect(field);
            }
        }, TypeInformation.of(String.class));

        // 3. filter 只輸出sensorId是奇數的數據
        DataStream<String> filterStream = inputStream.filter(value -> Integer.valueOf(value.split(",")[0].split("_")[1]) % 2 != 0);

        // 輸出
        mapStream.print("map");
        flatMapStream.print("flatMap");
        filterStream.print("filter");

        env.execute();
    }
}

 

 

分組(keyBy)+滾動聚合算子(Rolling Aggregation)

  1) 分組算子如下:

  DataStream->KeyedStream:邏輯上將一個流分成不相交的分區,每個分區包含具有相同key的元素,在內部以hash的形式實現的.

  KeyedStream才有聚合算子(其並行度依賴於前置的DataStream),不能單獨設置並行度),普通的DataStream是沒有的,但是KeyedStream依舊繼承於DataStream.

 

   2) 滾動算子如下:

  這些算子可以針對KeyedStream的每一個支流做聚合

  sum(),min(),max(),minBy(),maxBy()

import com.patrick.examples.apitest.beans.SensorReading;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @ClassName RollTransformTest
 * @Description keyBy + RollingAggregation
 * @Author Administrator
 * @Version 1.0
 **/
public class RollTransformTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt";

        // 從文件讀取數據
        DataStream<String> inputStream = env.readTextFile(fpath);

        // 先通過map轉換成SensorReading
        DataStream<SensorReading> dataStream = inputStream.map(value -> {
            String[] fields = value.split(",");
            return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
        });

        // keyBy分組獲取KeydStream 聚合API在KeydStream才有  只有當輸入類型是Tuple時keyBy分組這里才能寫數字
        KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
//        KeyedStream<SensorReading, String> keyedStream = dataStream.keyBy(data -> data.getId());
//        KeyedStream<SensorReading, String> keyedStream = dataStream.keyBy(SensorReading::getId);


        // 滾動聚合 獲取溫度最大的
        // max會更新temperature值, 其他值不變
        // maxBy是獲取temperature值最大的那條數據
        DataStream<SensorReading> resultMaxStream = keyedStream.max("temperature");
        DataStream<SensorReading> resultMaxByStream = keyedStream.maxBy("temperature");
//
        resultMaxStream.print("resultMaxStream");
        resultMaxByStream.print("resultMaxByStream");
        // 分組求最大值
        dataStream.keyBy("id").sum("temperature").map(data->data.getId()+"\t"+data.getTemperature()).print();
//        // 全局求所有溫度 僅限練習API操作
//        dataStream.map(data->new Tuple2("", data), Types.TUPLE(Types.STRING, Types.POJO(SensorReading.class)))
//                .keyBy(0).sum("f1.temperature")
//                .map(d->((SensorReading)d.f1).getTemperature())
//                .print();
        env.execute();

    }
}

Reduce算子

KeyedStream->DataStream:一個分組數據流的聚合操作.合並當前的元素和上次聚合的結果,產生一個新的值.返回的流中包含每一次聚合的結果,而不是只返回最后一次聚合的最終結果.

import com.patrick.examples.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @ClassName ReduceTransformTest
 * @Description TODO
 * @Author Administrator
 * @Version 1.0
 **/
public class ReduceTransformTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt";

        // 從文件讀取數據
        DataStream<String> inputStream = env.readTextFile(fpath);

        // 先通過map轉換成SensorReading
        DataStream<SensorReading> dataStream = inputStream.map(value -> {
            String[] fields = value.split(",");
            return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
        });

        // keyBy分組獲取KeydStream 聚合API在KeydStream才有  只有當輸入類型是Tuple時keyBy分組這里才能寫數字
        KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");

        // reduce操作  獲取最新時間戳+最大溫度值
        DataStream<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {
            @Override
            public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
                return new SensorReading(value1.getId(), value2.getTs(), Math.max(value1.getTemperature(), value2.getTemperature()));
            }
        });
        
        // 采用lambda函數
        DataStream<SensorReading> resultLambdaStream = keyedStream.reduce((currState, newValue) -> new SensorReading(currState.getId(), newValue.getTs(), Math.max(currState.getTemperature(), newValue.getTemperature())));

        resultStream.print("resultStream");
        resultLambdaStream.print("resultLambdaStream");

        env.execute();

    }
}

 

 

多流操作算子 Split+Select

  1) Split算子

  DataStream->SplitStream:根據某些特征把一個DataStream拆分成兩個或者多個DataStream

 

  1) Select算子

  SplitStream->DataStream:從一個SplitStream中獲取一個或者多個DataStream.需求:傳感器數據按照溫度高低(以37度為界),拆分成兩個流.

 

 Connect+map(CoMapFunction)

  1) Connect算子

  DataStream,DataStream->ConnectedStreams:連接兩個保持他們的類型的數據流,兩個數據流被Connect之后,只是被放在了同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立.

 

   2) map(CoMapFunction)算子

  ConnectedStream->DataStream:作用於ConnectedStreams上,功能與map和flatMap一樣,對ConnectedStream中的每一個Stream分別進行map和flatMap處理.

 

 

Union

DataStream->DataStream:對兩個或者兩個以上的DataStream進行union操作.產生一個包含所有的DataStream元素的新DataStream

 

   Connect和Union的區別:

  1) Union之前兩個流的類型必須一樣,Connect可以不一樣,在之后的CoMap中再調整成為一樣的

  2) Connect只能操作兩個流,Union可以多個.

import com.patrick.examples.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.Collections;

/**
 * @ClassName SplitTransformTest
 * @Description 分流操作
 * @Author Administrator
 * @Version 1.0
 **/
public class MultipleTransformTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt";

        // 從文件讀取數據
        DataStream<String> inputStream = env.readTextFile(fpath);

        // 先通過map轉換成SensorReading
        DataStream<SensorReading> dataStream = inputStream.map(value -> {
            String[] fields = value.split(",");
            return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
        });

        // 1.1 分流 split 實際上通俗地說是打標 當然每條數據可能有多個標記
        SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() {
            @Override
            public Iterable<String> select(SensorReading value) {
                return value.getTemperature()>37? Collections.singletonList("high") : Collections.singletonList("low");
            }
        });

        // 1.2 獲取 select 從SplitStream中獲取對應標簽數據
        DataStream<SensorReading> highStream = splitStream.select("high");
        DataStream<SensorReading> lowStream = splitStream.select("low");
        DataStream<SensorReading> allStream = splitStream.select("high", "low");

        // 輸出
//        highStream.print("high");
//        lowStream.print("low");
//        allStream.print("all");


        // 2.1 不同數據類型合流 connect 只能合並兩條流,但是兩條流的數據類型可以不是一樣的
        // 將高溫流轉換成 Tuple2<String, Double>
        DataStream<Tuple2<String, Double>> warningStream = highStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> map(SensorReading value) throws Exception {
                return new Tuple2<String, Double>(value.getId(), value.getTemperature());
            }
        });
        ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStream = warningStream.connect(lowStream);

        // 2.2 map(CoMapFunction) 合並流
        DataStream<Object> coMapStream = connectedStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
            @Override
            public Object map1(Tuple2<String, Double> value) throws Exception {
                return new Tuple3<String, Double, String>(value.f0, value.f1, "warning");
            }

            @Override
            public Object map2(SensorReading value) throws Exception {
                return new Tuple2<String, String>(value.getId(), "normal");
            }
        });

//        coMapStream.print();


        // 3. 將相同數據類型的兩個或兩個以上的流進行合並 union
        highStream.union(lowStream, allStream).print();

        env.execute();

    }
}

 

 流式轉換關系圖:

 

 

Sink

Flink沒有類似於spark中foreach方法,讓用戶進行迭代的操作。所有對外的輸出操作都要利用Sink完成。最后通過類似如stream.addSink(new MySink(xxxx))完成整個任務最終輸出操作。

 

Kafka

import com.patrick.examples.apitest.beans.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

/**
 * @ClassName KafkaSinkTest
 * @Description KafkaSink測試
 * @Author Administrator
 * @Version 1.0
 **/
public class KafkaSinkTest {

    public static void main(String[] args) throws Exception {

        // 創建流式執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        properties.setProperty("group.id", "FlinkKafkaSourceTest");

        // 添加kafka source
        DataStream<String> kafkaDataStream = env.addSource(new FlinkKafkaConsumer<String>("patrick", new SimpleStringSchema(), properties))
                ;


        // 先通過map轉換成SensorReading的字符串
        DataStream<String> dataStream = kafkaDataStream.map(value -> {
            String[] fields = value.split(",");
            return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2])).toString();
        });


        dataStream.addSink(new FlinkKafkaProducer<String>(
                "hadoop1:9092,hadoop2:9092,hadoop3:9092",
                "sinktest",
                new SimpleStringSchema()));

        env.execute();
    }
}
# 控制台生成消息
kafka-console-producer.sh --broker-list hadoop1:9092 --topic patrick

# 用命令行消費消息
kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --group sinktest_group --topic sinktest

執行結果圖如下所示。那么該Flink程序相當於做了ETL工作從Kafka的一個topic傳輸到另一個topic

 

 

Redis

# pom
<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>
# 主程序
import com.patrick.examples.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**
 * @ClassName RedisSinkTest
 * @Description 輸出到Redis
 * @Author Administrator
 * @Version 1.0
 **/
public class RedisSinkTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt";

        // 從文件讀取數據
        DataStream<String> inputStream = env.readTextFile(fpath);

        // 先通過map轉換成SensorReading
        DataStream<SensorReading> dataStream = inputStream.map(value -> {
            String[] fields = value.split(",");
            return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
        });

        // 定義Jedis連接配置
        FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop1")
                .setPort(6379)
                .build();


        dataStream.addSink( new RedisSink<>(jedisPoolConfig, new MyRedisMapper()));

        env.execute();
    }


    public static class MyRedisMapper implements RedisMapper<SensorReading>{
        // 例如將每個傳感器對應的溫度值存儲成哈希表 hset key field value
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");
        }

        @Override
        public String getKeyFromData(SensorReading data) {
            return data.getId();
        }

        @Override
        public String getValueFromData(SensorReading data) {
            return data.getTemperature().toString();
        }
    }
}

 

 

 

ElasticSearch

# pom
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
import com.patrick.examples.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Requests;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
 * @ClassName EsSinkTest
 * @Description 輸出到ES
 * @Author Administrator
 * @Version 1.0
 **/
public class EsSinkTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt";

        // 從文件讀取數據
        DataStream<String> inputStream = env.readTextFile(fpath);

        // 先通過map轉換成SensorReading
        DataStream<SensorReading> dataStream = inputStream.map(value -> {
            String[] fields = value.split(",");
            return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
        });
        
        // 定義ES連接配置
        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("master", 9200));
        dataStream.addSink(new ElasticsearchSink.Builder<>(httpHosts, new MyEsSinkFunction()).build());

        env.execute();
    }
    
    public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading>{
        @Override
        public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
            HashMap<String, String> sourceMap = new HashMap<>();
            sourceMap.put("id", element.getId());
            sourceMap.put("ts", element.getTs().toString());
            sourceMap.put("temperature", element.getTemperature().toString());

            indexer.add(Requests.indexRequest("sensor_temp")
                    // 指定文檔ID  不指定則ES默認生成
                    .id(element.getId())
                    .source(sourceMap));
        }
    }
}

 

 

JDBC

為了保證高效這里使用到了Druid連接池。
但是在Flink的1.11版本已經有了官方的JDBC連接器

# pom
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.10</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.6</version>
</dependency>
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import com.patrick.examples.apitest.beans.SensorReading;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName JdbcSinkTest
 * @Description 自定以Sink輸出到Mysql
 * @Author Administrator
 * @Version 1.0
 **/
public class JdbcSinkTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt";

        // 從文件讀取數據
        DataStream<String> inputStream = env.readTextFile(fpath);

        // 先通過map轉換成SensorReading
        DataStream<SensorReading> dataStream = inputStream.map(value -> {
            String[] fields = value.split(",");
            return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
        });

        dataStream.addSink(new MyJDBCSink()).setParallelism(5).name("JDBCSink");

        env.execute();
    }

    public static class MyJDBCSink extends RichSinkFunction<SensorReading>{

        private DruidDataSource dataSource;

        public static final String INSERT_SQL = "INSERT INTO `sensor_temp` (`id`, `ts`, `temperature`) VALUES (?, ?, ?)";

        @Override
        public void open(Configuration parameters) throws Exception {
            // 創建JDBC連接
            Properties properties = new Properties();
            properties.setProperty(DruidDataSourceFactory.PROP_URL, "jdbc:mysql://hadoop1:3306/test?useUnicode=true&characterEncoding=utf-8");
            properties.setProperty(DruidDataSourceFactory.PROP_USERNAME, "root");
            properties.setProperty(DruidDataSourceFactory.PROP_PASSWORD, "000000");
            properties.setProperty(DruidDataSourceFactory.PROP_DRIVERCLASSNAME, "com.mysql.jdbc.Driver");
            // 使用Druid連接池 這樣在處理每個event獲取連接時就是線程安全的,且效率也會比較高
            // 當然到JDBCSink的每個分區里都是線程安全的,每個分區對應一個線程,每個線程都調用自己的open方法
            dataSource = (DruidDataSource)DruidDataSourceFactory.createDataSource(properties);
            System.out.println(getRuntimeContext().getTaskNameWithSubtasks()+"        "+dataSource);
            System.out.println(getRuntimeContext().getTaskNameWithSubtasks()+"        "+dataSource.getClass());
            // 只有第一個任務才執行刪除表內容的操作
            if(getRuntimeContext().getIndexOfThisSubtask()==0){
                deleteRecords();
            }else{
                // 這里只是保證先刪除表, 實際上在生產環境上這個根本沒有作用,因為是並發,先后順序不可控的
                TimeUnit.SECONDS.sleep(5);
            }

        }

        @Override
        public void close() throws Exception {

        }

        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
            insertSensorReading(value);
        }

        // 刪除表
        private void deleteRecords() throws SQLException {
            try(Connection connection = dataSource.getConnection();
                PreparedStatement preparedStatement = connection.prepareStatement("delete from sensor_temp");
            ){
                boolean execute = preparedStatement.execute();
            }
        }

        // 添加記錄
        private void insertSensorReading(SensorReading value) throws SQLException {
            try(Connection connection = dataSource.getConnection();
                PreparedStatement preparedStatement = connection.prepareStatement(INSERT_SQL);
                ){
                preparedStatement.setString(1, value.getId());
                preparedStatement.setTimestamp(2, new Timestamp(value.getTs()));
                preparedStatement.setDouble(3, value.getTemperature());

                boolean execute = preparedStatement.executeUpdate() == 1?true:false;

                System.out.println(getRuntimeContext().getTaskNameWithSubtasks()+" 處理SQL "+value+" 結果是 "+execute + "連接是"+connection.toString());

            }

        }
    }
}

 

 

參考這里

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM