實現SourceFunction接口
package com.kong.flink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
/**
* 利用 Flink 提供的自定義 Source 功能來實現一個自定義的實時數據源
* 實現SourceFunction接口,重寫run()和cancel()
* 也可以生成自己定義的類數據,這里簡單的隨機獲取word
*/
public class MyStreamingSource implements SourceFunction<String> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
List<String> list = Arrays.asList("flink", "ksw", "hbase", "spark");
while (isRunning) {
int i = new Random().nextInt(4);
sourceContext.collect(list.get(i));
//每秒產生一條數據
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
流wordcount示例
package com.kong.flink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types
* 通過下標指定數據,只支持tuple類型
*/
public class FlinkStreamingDemo2 {
public static void main(String[] args) throws Exception {
//創建執行環境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//使用我們定義的流數據
DataStreamSource<String> source = streamEnv.addSource(new MyStreamingSource2()).setParallelism(1);
//map操作,轉換一下數據格式,跟spark中map一樣
SingleOutputStreamOperator<WordWithCount> windowCounts = source
.map(word -> new WordWithCount(word, 1L)).keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(3)).sum("count");
windowCounts.print().setParallelism(1);
//定義一個job名字
streamEnv.execute("job1");
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}