Flink流式處理API流程圖
# 創建流式處理任務環境
StreamExecutionEnvironment env = StreamExceptionEnvironment.getExceptionEnvironment();
創建一個執行環境,表示當前執行程序的上下文,類似於SparkContext.
如果程序是獨立調用的,則此方法返回本地執行環境.;如果從命令行客戶端調用程序以提交到集群,則此方法返回此集群的執行環境.
案例:讀取攝氏度
Source:從集合和元素讀取數據
public class SensorReading { private String id; private Long ts; private Double temperature; public SensorReading(){ } public SensorReading(String id,Long ts,Double temperature){ this.id = id; this.ts = ts; this.temperature = temperature; } public String getId(){ } public void setId(String id){ this.id = id; } }
# 主程序 import com.patrick.examples.apitest.beans.SensorReading; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Arrays; /** * @ClassName CollectionSourceTest * @Description 從集合中讀取數據 * @Author Administrator * @Version 1.0 **/ public class CollectionSourceTest { public static void main(String[] args) throws Exception { // 創建執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 從集合讀取數據 DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList( new SensorReading("sensor_1", 1547718199L, 35.80018327300259), new SensorReading("sensor_6", 1547718201L, 15.402984393403084), new SensorReading("sensor_7", 1547718202L, 6.720945201171228), new SensorReading("sensor_10", 1547718205L, 38.101067604893444) )); // 直接讀取元素 DataStream<Integer> integerDataStream = env.fromElements(10, 9, 2, 19, 87); // 輸出 設置每個stream的前綴標識 dataStream.print("data"); integerDataStream.print("int"); // 執行 設置StreamJob名字 env.execute("CollectionSourceTest"); } }
執行結果:
POJO和JavaBean規范:
1) JavaBean的規范如下:
a.實現 java.io.Serializable 接口.
b.是一個公共類,類中必須存在一個無參數的構造函數,提供對應的setXxx()和getXxx()方法來存取類中的屬性.
2) Flink的POJO規范如下:
a.該類是公有的(public)和獨立的(沒有非靜態內部類)
b.該類擁有公有的無參構造器
c.該(以及所有超類)中的所有非靜態,非transient字段都是公有的(非final的),或者具有遵循JavaBean對於getter和setter命名規則的公有getter和setter方法.
案例:從文件讀取數據
import com.patrick.examples.apitest.beans.SensorReading; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @ClassName file * @Description TODO * @Author Administrator * @Version 1.0 **/ public class FileSourceTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt"; // 從文件讀取數據 DataStream<String> dataStream = env.readTextFile(fpath); // 轉換成SensorReading DataStream<SensorReading> sensorReadingDataStream = dataStream.map(line -> { String[] split = line.split(","); return new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2])); }); sensorReadingDataStream.print(); env.execute(); } }
運行結果:
從Kafka讀取數據
針對不同版本的Kafka,Flink有對應的connectors,請參考這里.
當前案例版本Flink1.10.1 Kafka1.1.0
# pom <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.10.1</version> </dependency>
# 主程序 import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; /** * @ClassName KafkaSourceTest * @Description 消費kafka信息 * @Author Administrator * @Version 1.0 **/ public class KafkaSourceTest { public static void main(String[] args) throws Exception { // 創建流式執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Kafka信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092"); properties.setProperty("group.id", "FlinkKafkaSourceTest"); // 添加kafka source DataStream<String> kafkaDataStream = env.addSource(new FlinkKafkaConsumer<String>("patrick", new SimpleStringSchema(), properties)) .setParallelism(6); System.out.println("kafkaDataStream parallelism: "+kafkaDataStream.getParallelism()); kafkaDataStream.print(); env.execute(); } }
創建消費者查看消費情況:
# 注意 # 創建分區數為3副本為2的topic kafka-topics.sh --zookeeper hadoop1:2181 --create --replication-factor 2 --partitions 3 --topic patrick # 控制台生成消息 kafka-console-producer.sh --broker-list hadoop1:9092 --topic patrick
自定義數據源
需要實現接口 org.apache.flink.streaming.api.functions.source.SourceFunction
import com.patrick.examples.apitest.beans.SensorReading; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.*; import java.util.concurrent.TimeUnit; /** * @ClassName CustomSourceTest * @Description 自定義Source * @Author Administrator * @Version 1.0 **/ public class CustomSourceTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<SensorReading> dataStream = env.addSource(new MySensorSourceFunction(), "MySensorSourceFunction"); dataStream.print(); env.execute(); } /** * 生產傳感器數據 */ public static class MySensorSourceFunction implements SourceFunction<SensorReading>{ private boolean running = true; /** * 產生數據 * @param ctx * @throws Exception */ @Override public void run(SourceContext<SensorReading> ctx) throws Exception { Random random = new Random(); // 這里想按照傳感器最后面的ID來按序輸出, 所以使用TreeMap並自定義比較器 Map<String, Double> sensorTemp = new TreeMap<String, Double>((o1, o2)->{ Integer s1 = Integer.valueOf(o1.split("_")[1]); Integer s2 = Integer.valueOf(o2.split("_")[1]); return s1.compareTo(s2); }); // 定義10個傳感器並為其設置初始溫度 for (int i=0; i<10; ++i){ sensorTemp.put("sensor_"+(i+1), 60+random.nextGaussian()*20); } while (running){ for(String sensorId : sensorTemp.keySet()){ Double newTemp = sensorTemp.get(sensorId)+random.nextGaussian(); sensorTemp.put(sensorId, newTemp); // 發送數據 ctx.collect(new SensorReading(sensorId, System.currentTimeMillis()/1000, newTemp)); } System.out.println("--------------------------------"); // 控制發送頻率 TimeUnit.SECONDS.sleep(3); } } @Override public void cancel() { running=false; } } }
Transform
基本算子(map,flatMap,filter)
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @ClassName BaseTransformTest * @Description 基本算子 map flatMap filter * @Author Administrator * @Version 1.0 **/ public class BaseTransformTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt"; // 從文件讀取數據 DataStream<String> inputStream = env.readTextFile(fpath); // 1. map 將每行數據轉換成每行的字符串長度 DataStream<Integer> mapStream = inputStream.map(value -> value.length()); // 2. flatMap 將每行數據按照逗號分割並輸出; 第二個參數是為了指定返回flatMap的返回類型 DataStream<String> flatMapStream = inputStream.flatMap((String value, Collector<String> out) -> { for (String field : value.split(",")) { out.collect(field); } }, TypeInformation.of(String.class)); // 3. filter 只輸出sensorId是奇數的數據 DataStream<String> filterStream = inputStream.filter(value -> Integer.valueOf(value.split(",")[0].split("_")[1]) % 2 != 0); // 輸出 mapStream.print("map"); flatMapStream.print("flatMap"); filterStream.print("filter"); env.execute(); } }
分組(keyBy)+滾動聚合算子(Rolling Aggregation)
1) 分組算子如下:
DataStream->KeyedStream:邏輯上將一個流分成不相交的分區,每個分區包含具有相同key的元素,在內部以hash的形式實現的.
KeyedStream才有聚合算子(其並行度依賴於前置的DataStream),不能單獨設置並行度),普通的DataStream是沒有的,但是KeyedStream依舊繼承於DataStream.
2) 滾動算子如下:
這些算子可以針對KeyedStream的每一個支流做聚合
sum(),min(),max(),minBy(),maxBy()
import com.patrick.examples.apitest.beans.SensorReading; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @ClassName RollTransformTest * @Description keyBy + RollingAggregation * @Author Administrator * @Version 1.0 **/ public class RollTransformTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt"; // 從文件讀取數據 DataStream<String> inputStream = env.readTextFile(fpath); // 先通過map轉換成SensorReading DataStream<SensorReading> dataStream = inputStream.map(value -> { String[] fields = value.split(","); return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2])); }); // keyBy分組獲取KeydStream 聚合API在KeydStream才有 只有當輸入類型是Tuple時keyBy分組這里才能寫數字 KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id"); // KeyedStream<SensorReading, String> keyedStream = dataStream.keyBy(data -> data.getId()); // KeyedStream<SensorReading, String> keyedStream = dataStream.keyBy(SensorReading::getId); // 滾動聚合 獲取溫度最大的 // max會更新temperature值, 其他值不變 // maxBy是獲取temperature值最大的那條數據 DataStream<SensorReading> resultMaxStream = keyedStream.max("temperature"); DataStream<SensorReading> resultMaxByStream = keyedStream.maxBy("temperature"); // resultMaxStream.print("resultMaxStream"); resultMaxByStream.print("resultMaxByStream"); // 分組求最大值 dataStream.keyBy("id").sum("temperature").map(data->data.getId()+"\t"+data.getTemperature()).print(); // // 全局求所有溫度 僅限練習API操作 // dataStream.map(data->new Tuple2("", data), Types.TUPLE(Types.STRING, Types.POJO(SensorReading.class))) // .keyBy(0).sum("f1.temperature") // .map(d->((SensorReading)d.f1).getTemperature()) // .print(); env.execute(); } }
Reduce算子
KeyedStream->DataStream:一個分組數據流的聚合操作.合並當前的元素和上次聚合的結果,產生一個新的值.返回的流中包含每一次聚合的結果,而不是只返回最后一次聚合的最終結果.
import com.patrick.examples.apitest.beans.SensorReading; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @ClassName ReduceTransformTest * @Description TODO * @Author Administrator * @Version 1.0 **/ public class ReduceTransformTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt"; // 從文件讀取數據 DataStream<String> inputStream = env.readTextFile(fpath); // 先通過map轉換成SensorReading DataStream<SensorReading> dataStream = inputStream.map(value -> { String[] fields = value.split(","); return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2])); }); // keyBy分組獲取KeydStream 聚合API在KeydStream才有 只有當輸入類型是Tuple時keyBy分組這里才能寫數字 KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id"); // reduce操作 獲取最新時間戳+最大溫度值 DataStream<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>() { @Override public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception { return new SensorReading(value1.getId(), value2.getTs(), Math.max(value1.getTemperature(), value2.getTemperature())); } }); // 采用lambda函數 DataStream<SensorReading> resultLambdaStream = keyedStream.reduce((currState, newValue) -> new SensorReading(currState.getId(), newValue.getTs(), Math.max(currState.getTemperature(), newValue.getTemperature()))); resultStream.print("resultStream"); resultLambdaStream.print("resultLambdaStream"); env.execute(); } }
多流操作算子 Split+Select
1) Split算子
DataStream->SplitStream:根據某些特征把一個DataStream拆分成兩個或者多個DataStream
1) Select算子
SplitStream->DataStream:從一個SplitStream中獲取一個或者多個DataStream.需求:傳感器數據按照溫度高低(以37度為界),拆分成兩個流.
Connect+map(CoMapFunction)
1) Connect算子
DataStream,DataStream->ConnectedStreams:連接兩個保持他們的類型的數據流,兩個數據流被Connect之后,只是被放在了同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立.
2) map(CoMapFunction)算子
ConnectedStream->DataStream:作用於ConnectedStreams上,功能與map和flatMap一樣,對ConnectedStream中的每一個Stream分別進行map和flatMap處理.
Union
DataStream->DataStream:對兩個或者兩個以上的DataStream進行union操作.產生一個包含所有的DataStream元素的新DataStream
Connect和Union的區別:
1) Union之前兩個流的類型必須一樣,Connect可以不一樣,在之后的CoMap中再調整成為一樣的
2) Connect只能操作兩個流,Union可以多個.
import com.patrick.examples.apitest.beans.SensorReading; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import java.util.Collections; /** * @ClassName SplitTransformTest * @Description 分流操作 * @Author Administrator * @Version 1.0 **/ public class MultipleTransformTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt"; // 從文件讀取數據 DataStream<String> inputStream = env.readTextFile(fpath); // 先通過map轉換成SensorReading DataStream<SensorReading> dataStream = inputStream.map(value -> { String[] fields = value.split(","); return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2])); }); // 1.1 分流 split 實際上通俗地說是打標 當然每條數據可能有多個標記 SplitStream<SensorReading> splitStream = dataStream.split(new OutputSelector<SensorReading>() { @Override public Iterable<String> select(SensorReading value) { return value.getTemperature()>37? Collections.singletonList("high") : Collections.singletonList("low"); } }); // 1.2 獲取 select 從SplitStream中獲取對應標簽數據 DataStream<SensorReading> highStream = splitStream.select("high"); DataStream<SensorReading> lowStream = splitStream.select("low"); DataStream<SensorReading> allStream = splitStream.select("high", "low"); // 輸出 // highStream.print("high"); // lowStream.print("low"); // allStream.print("all"); // 2.1 不同數據類型合流 connect 只能合並兩條流,但是兩條流的數據類型可以不是一樣的 // 將高溫流轉換成 Tuple2<String, Double> DataStream<Tuple2<String, Double>> warningStream = highStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() { @Override public Tuple2<String, Double> map(SensorReading value) throws Exception { return new Tuple2<String, Double>(value.getId(), value.getTemperature()); } }); ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStream = warningStream.connect(lowStream); // 2.2 map(CoMapFunction) 合並流 DataStream<Object> coMapStream = connectedStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() { @Override public Object map1(Tuple2<String, Double> value) throws Exception { return new Tuple3<String, Double, String>(value.f0, value.f1, "warning"); } @Override public Object map2(SensorReading value) throws Exception { return new Tuple2<String, String>(value.getId(), "normal"); } }); // coMapStream.print(); // 3. 將相同數據類型的兩個或兩個以上的流進行合並 union highStream.union(lowStream, allStream).print(); env.execute(); } }
流式轉換關系圖:
Sink
Flink沒有類似於spark中foreach方法,讓用戶進行迭代的操作。所有對外的輸出操作都要利用Sink完成。最后通過類似如stream.addSink(new MySink(xxxx))
完成整個任務最終輸出操作。
Kafka
import com.patrick.examples.apitest.beans.SensorReading; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.util.Properties; /** * @ClassName KafkaSinkTest * @Description KafkaSink測試 * @Author Administrator * @Version 1.0 **/ public class KafkaSinkTest { public static void main(String[] args) throws Exception { // 創建流式執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Kafka信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092"); properties.setProperty("group.id", "FlinkKafkaSourceTest"); // 添加kafka source DataStream<String> kafkaDataStream = env.addSource(new FlinkKafkaConsumer<String>("patrick", new SimpleStringSchema(), properties)) ; // 先通過map轉換成SensorReading的字符串 DataStream<String> dataStream = kafkaDataStream.map(value -> { String[] fields = value.split(","); return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2])).toString(); }); dataStream.addSink(new FlinkKafkaProducer<String>( "hadoop1:9092,hadoop2:9092,hadoop3:9092", "sinktest", new SimpleStringSchema())); env.execute(); } }
# 控制台生成消息 kafka-console-producer.sh --broker-list hadoop1:9092 --topic patrick # 用命令行消費消息 kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --group sinktest_group --topic sinktest
執行結果圖如下所示。那么該Flink程序相當於做了ETL工作從Kafka的一個topic傳輸到另一個topic
Redis
# pom <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
# 主程序 import com.patrick.examples.apitest.beans.SensorReading; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; /** * @ClassName RedisSinkTest * @Description 輸出到Redis * @Author Administrator * @Version 1.0 **/ public class RedisSinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt"; // 從文件讀取數據 DataStream<String> inputStream = env.readTextFile(fpath); // 先通過map轉換成SensorReading DataStream<SensorReading> dataStream = inputStream.map(value -> { String[] fields = value.split(","); return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2])); }); // 定義Jedis連接配置 FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder() .setHost("hadoop1") .setPort(6379) .build(); dataStream.addSink( new RedisSink<>(jedisPoolConfig, new MyRedisMapper())); env.execute(); } public static class MyRedisMapper implements RedisMapper<SensorReading>{ // 例如將每個傳感器對應的溫度值存儲成哈希表 hset key field value @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp"); } @Override public String getKeyFromData(SensorReading data) { return data.getId(); } @Override public String getValueFromData(SensorReading data) { return data.getTemperature().toString(); } } }
ElasticSearch
# pom <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_2.11</artifactId> <version>${flink.version}</version> </dependency>
import com.patrick.examples.apitest.beans.SensorReading; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.http.HttpHost; import org.elasticsearch.client.Requests; import java.util.ArrayList; import java.util.HashMap; import java.util.List; /** * @ClassName EsSinkTest * @Description 輸出到ES * @Author Administrator * @Version 1.0 **/ public class EsSinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt"; // 從文件讀取數據 DataStream<String> inputStream = env.readTextFile(fpath); // 先通過map轉換成SensorReading DataStream<SensorReading> dataStream = inputStream.map(value -> { String[] fields = value.split(","); return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2])); }); // 定義ES連接配置 List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("master", 9200)); dataStream.addSink(new ElasticsearchSink.Builder<>(httpHosts, new MyEsSinkFunction()).build()); env.execute(); } public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading>{ @Override public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) { HashMap<String, String> sourceMap = new HashMap<>(); sourceMap.put("id", element.getId()); sourceMap.put("ts", element.getTs().toString()); sourceMap.put("temperature", element.getTemperature().toString()); indexer.add(Requests.indexRequest("sensor_temp") // 指定文檔ID 不指定則ES默認生成 .id(element.getId()) .source(sourceMap)); } } }
JDBC
為了保證高效這里使用到了Druid
連接池。
但是在Flink的1.11
版本已經有了官方的JDBC連接器
# pom <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.10</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.6</version> </dependency>
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidDataSourceFactory; import com.patrick.examples.apitest.beans.SensorReading; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import javax.sql.DataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; import java.util.Properties; import java.util.concurrent.TimeUnit; /** * @ClassName JdbcSinkTest * @Description 自定以Sink輸出到Mysql * @Author Administrator * @Version 1.0 **/ public class JdbcSinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); String fpath = "F:\\FlinkDemo\\src\\main\\resources\\sensor.txt"; // 從文件讀取數據 DataStream<String> inputStream = env.readTextFile(fpath); // 先通過map轉換成SensorReading DataStream<SensorReading> dataStream = inputStream.map(value -> { String[] fields = value.split(","); return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2])); }); dataStream.addSink(new MyJDBCSink()).setParallelism(5).name("JDBCSink"); env.execute(); } public static class MyJDBCSink extends RichSinkFunction<SensorReading>{ private DruidDataSource dataSource; public static final String INSERT_SQL = "INSERT INTO `sensor_temp` (`id`, `ts`, `temperature`) VALUES (?, ?, ?)"; @Override public void open(Configuration parameters) throws Exception { // 創建JDBC連接 Properties properties = new Properties(); properties.setProperty(DruidDataSourceFactory.PROP_URL, "jdbc:mysql://hadoop1:3306/test?useUnicode=true&characterEncoding=utf-8"); properties.setProperty(DruidDataSourceFactory.PROP_USERNAME, "root"); properties.setProperty(DruidDataSourceFactory.PROP_PASSWORD, "000000"); properties.setProperty(DruidDataSourceFactory.PROP_DRIVERCLASSNAME, "com.mysql.jdbc.Driver"); // 使用Druid連接池 這樣在處理每個event獲取連接時就是線程安全的,且效率也會比較高 // 當然到JDBCSink的每個分區里都是線程安全的,每個分區對應一個線程,每個線程都調用自己的open方法 dataSource = (DruidDataSource)DruidDataSourceFactory.createDataSource(properties); System.out.println(getRuntimeContext().getTaskNameWithSubtasks()+" "+dataSource); System.out.println(getRuntimeContext().getTaskNameWithSubtasks()+" "+dataSource.getClass()); // 只有第一個任務才執行刪除表內容的操作 if(getRuntimeContext().getIndexOfThisSubtask()==0){ deleteRecords(); }else{ // 這里只是保證先刪除表, 實際上在生產環境上這個根本沒有作用,因為是並發,先后順序不可控的 TimeUnit.SECONDS.sleep(5); } } @Override public void close() throws Exception { } @Override public void invoke(SensorReading value, Context context) throws Exception { insertSensorReading(value); } // 刪除表 private void deleteRecords() throws SQLException { try(Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement("delete from sensor_temp"); ){ boolean execute = preparedStatement.execute(); } } // 添加記錄 private void insertSensorReading(SensorReading value) throws SQLException { try(Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(INSERT_SQL); ){ preparedStatement.setString(1, value.getId()); preparedStatement.setTimestamp(2, new Timestamp(value.getTs())); preparedStatement.setDouble(3, value.getTemperature()); boolean execute = preparedStatement.executeUpdate() == 1?true:false; System.out.println(getRuntimeContext().getTaskNameWithSubtasks()+" 處理SQL "+value+" 結果是 "+execute + "連接是"+connection.toString()); } } } }