利用Flink提供的接口產生實時數據源


實現SourceFunction接口

package com.kong.flink;
​
import org.apache.flink.streaming.api.functions.source.SourceFunction;
​
import java.util.Arrays;
import java.util.List;
import java.util.Random;
​
/**
 * 利用 Flink 提供的自定義 Source 功能來實現一個自定義的實時數據源
 * 實現SourceFunction接口,重寫run()和cancel()
 * 也可以生成自己定義的類數據,這里簡單的隨機獲取word
 */
public class MyStreamingSource implements SourceFunction<String> {
​
    private volatile boolean isRunning = true;
​
    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        List<String> list = Arrays.asList("flink", "ksw", "hbase", "spark");
        while (isRunning) {
            int i = new Random().nextInt(4);
            sourceContext.collect(list.get(i));
            //每秒產生一條數據
            Thread.sleep(1000);
        }
    }
​
    @Override
    public void cancel() {
        isRunning = false;
    }
​
}

 

流wordcount示例

package com.kong.flink;
​
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
​
/**
 * org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types
 * 通過下標指定數據,只支持tuple類型
 */
public class FlinkStreamingDemo2 {
    public static void main(String[] args) throws Exception {
​
        //創建執行環境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        //使用我們定義的流數據
        DataStreamSource<String> source = streamEnv.addSource(new MyStreamingSource2()).setParallelism(1);
        //map操作,轉換一下數據格式,跟spark中map一樣
        SingleOutputStreamOperator<WordWithCount> windowCounts = source
                .map(word -> new WordWithCount(word, 1L)).keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(3)).sum("count");
​
        windowCounts.print().setParallelism(1);
​
        //定義一個job名字
        streamEnv.execute("job1");
    }
​
    public static class WordWithCount {
        public String word;
        public long count;
​
        public WordWithCount() {
        }
​
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }
​
        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
​
}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM