1、 Environment
1.1 getExecutionEnvironment
- 創建一個執行環境,表示當前執行程序的上下文。
- 如果程序是獨立調用的,則此方法返回本地執行環境
- 如果從命令行客戶端調用程序以提交到集群,則此方法返回此集群的執行環境
- 也就是說,getExecutionEnvironment會根據查詢運行的方式決定返回什么樣的運行環境,是最常用的一種創建執行環境的方式。
批處理環境
val env = ExecutionEnvironment.getExecutionEnvironment
流式數據處理環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
如果沒有設置並行度,會以flink-conf.yaml中的配置為准,默認是1
1.2 createLocalEnvironment
返回本地執行環境,需要在調用時指定默認的並行度。
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
1.3 createRemoteEnvironment
返回集群執行環境,將Jar提交到遠程服務器。
需要在調用時指定JobManager的IP和端口號,並指定要在集群中運行的Jar包。
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")
2、Source
2.1 從集合中讀取數據
def main(args: Array[String]): Unit = { val env1: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataList = List( ("1", 1231231L, 200), ("2", 1231231L, 201), ("3", 1231231L, 202) ).map{ case (id, ts, vc) => { WaterSensor( id, ts, vc ) } } val dataDS: DataStream[WaterSensor] = env1.fromCollection(dataList) dataDS.print() env1.execute() } case class WaterSensor(id:String, ts:Long, vc:Double) def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val sensorDS: DataStream[WaterSensor] = env.fromCollection( List( WaterSensor("ws_001", 1577844001, 45.0), WaterSensor("ws_002", 1577844015, 43.0), WaterSensor("ws_003", 1577844020, 42.0) ) ) sensorDS.print() env.execute("sensor") }
2.2 從文件讀取數據
// TODO 從文件中獲取數據源 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; // 相對路徑 //val fileDS: DataStream[String] = env.readTextFile("input/word.txt") // Flink默認無法識別hdfs協議,需要引入相關jar包 val fileDS: DataStream[String] = env.readTextFile("hdfs://linux1:9000/directory/app-20191213160742-0000") fileDS.print("file>>>>") env.execute()
2.3 從Kafka中讀取數據
引入kafka連接器的依賴
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.7.2</version> </dependency> // TODO 從文件中獲取數據源 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; // 使用kafka作為數據源 val properties = new java.util.Properties() properties.setProperty("bootstrap.servers", "linux1:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") val kafkaDS = env.addSource( new FlinkKafkaConsumer011[String]("waterSensor", new SimpleStringSchema(), properties) ) kafkaDS.print("kafka>>>>") env.execute()
2.4 自定義source
def main(args: Array[String]): Unit = { // TODO 從文件中獲取數據源 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; env.addSource( new MySource() ).print("mine>>>>") env.execute() } // 自定義數據源 // 1. 繼承SourceFunction // 2. 重寫方法 class MySource extends SourceFunction[WaterSensor]{ private var flg = true // 運行數據采集邏輯 override def run(ctx: SourceFunction.SourceContext[WaterSensor]): Unit = { while ( flg ) { // 將數據由數據源環境進行采集 ctx.collect(WaterSensor( "1", 1L, 1 )) Thread.sleep(200) } } // 取消數據采集 override def cancel(): Unit = { flg = false } }
3、Sink
- Flink沒有類似於spark中foreach方法,讓用戶進行迭代的操作。
- 所有對外的輸出操作都要利用Sink完成。
- 最后通過類似如下方式完成整個任務最終輸出操作。
stream.addSink(new MySink(xxxx))
print方法其實就是一種Sink
public DataStreamSink<T> print() { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(); return addSink(printFunction).name("Print to Std. Out"); }
官方提供了一部分的框架的sink。除此以外,需要用戶自定義實現sink。
3.1 Kafka
增加依賴關系:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.7.2</version> </dependency>
主函數中添加sink:
//向kafka中寫入數據 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; env.setParallelism(1) val ds: DataStream[String] = env.readTextFile("input/word.txt") ds.addSink( new FlinkKafkaProducer011[String]( "linux1:9092", "waterSensor", new SimpleStringSchema() ) ) env.execute()
通過kafka消費者控制台查看:
bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --topic sensor
3.2 Redis
增加依賴關系:
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis --> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
定義一個redis的mapper類,用於定義保存到redis時調用的命令:
// TODO 向kafka中寫入數據 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; env.setParallelism(1) val ds: DataStream[String] = env.readTextFile("input/word.txt") val conf = new FlinkJedisPoolConfig.Builder().setHost("linux4").setPort(6379).build() ds.addSink( new RedisSink[String](conf, new RedisMapper[String] { override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET, "word") } override def getKeyFromData(t: String): String = { t.split(" ")(1) } override def getValueFromData(t: String): String = { t.split(" ")(0) } })) env.execute()
訪問redis客戶端查看數據:
HGETALL sensor
3.3 Elasticsearch
增加依賴關系:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.7.2</version> </dependency>
在主函數中調用:
// TODO 向kafka中寫入數據 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; env.setParallelism(1) val list = List( WaterSensor("sensor_1", 150000L, 25), WaterSensor("sensor_1", 150001L, 27), WaterSensor("sensor_1", 150005L, 30), WaterSensor("sensor_1", 150007L, 40) ) val waterSensorDS: DataStream[WaterSensor] = env.fromCollection(list) val httpHosts = new java.util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("linux1", 9200)) val esSinkBuilder = new ElasticsearchSink.Builder[WaterSensor]( httpHosts, new ElasticsearchSinkFunction[WaterSensor] { override def process(t: WaterSensor, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { println("saving data: " + t) val json = new java.util.HashMap[String, String]() json.put("data", t.toString) val indexRequest = Requests.indexRequest().index("water").`type`("readingData").source(json) requestIndexer.add(indexRequest) println("saved successfully") } } ) waterSensorDS.addSink(esSinkBuilder.build()) env.execute()
在ES中查看:
- 訪問路徑:http://linux1:9200/_cat/indices?v
- 訪問路徑:http://linux1:9200/sensor/_search
3.4 JDBC
增加依賴關系:
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency>
添加MyJdbcSink:
def main(args: Array[String]): Unit = { // TODO 向JDBC中寫入數據 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment; env.setParallelism(1) val list = List( WaterSensor("sensor_1", 150000L, 25), WaterSensor("sensor_1", 150001L, 27), WaterSensor("sensor_1", 150005L, 30), WaterSensor("sensor_1", 150007L, 40) ) val waterSensorDS: DataStream[WaterSensor] = env.fromCollection(list) waterSensorDS.addSink( new MyJDBCSink ) env.execute() } // 自定義Sink // 1. 繼承 RichSinkFunction // 2. 重寫方法 class MyJDBCSink extends RichSinkFunction[WaterSensor] { private var conn : Connection = _ private var pstat : PreparedStatement = _ override def open(parameters: Configuration): Unit = { //Class.forName() conn = DriverManager.getConnection("jdbc:mysql://linux1:3306/rdd", "root", "000000") pstat = conn.prepareStatement("insert into user (id, name, age) values (?, ?, ?)") } override def invoke(ws: WaterSensor, context: SinkFunction.Context[_]): Unit = { pstat.setInt(1, 1) pstat.setString(2, ws.id) pstat.setInt(3, ws.vc) pstat.executeUpdate() } override def close(): Unit = { pstat.close() conn.close() } }
3.5 HDFS
The BucketingSink
has been deprecated since Flink 1.9 and will be removed in subsequent releases. Please use the StreamingFileSink instead.
3.5.1 BucketingSink:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>1.10.0</version> </dependency> val input: DataStream[String] = ... input.addSink(new BucketingSink[String]("/base/path"))
By default the bucketing sink will split by the current system time when elements arrive and will use the datetime pattern "yyyy-MM-dd--HH"
to name the buckets
There are two configuration options that specify when a part file should be closed and a new one started:
- By setting a batch size (The default part file size is 384 MB)
- By setting a batch roll over time interval (The default roll over interval is
Long.MAX_VALUE
)
// the SequenceFileWriter only works with Flink Tuples import org.apache.flink.api.java.tuple.Tuple2 val input: DataStream[Tuple2[A, B]] = ... val sink = new BucketingSink[Tuple2[IntWritable, Text]]("/base/path") sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))) sink.setWriter(new SequenceFileWriter[IntWritable, Text]) sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins input.addSink(sink)
This will create a sink that writes to bucket files that follow this schema:
/base/path/{date-time}/part-{parallel-task}-{count}
3.5.2 StreamingFileSink
File Formats
The StreamingFileSink
supports both row-wise and bulk encoding formats, such as Apache Parquet. These two variants come with their respective builders that can be created with the following static methods:
- Row-encoded sink:
StreamingFileSink.forRowFormat(basePath, rowEncoder)
- Bulk-encoded sink:
StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)
Row-encoded Formats
import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy val input: DataStream[String] = ... val sink: StreamingFileSink[String] = StreamingFileSink .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .withMaxPartSize(1024 * 1024 * 1024) .build()) .build() input.addSink(sink)
Bulk-encoded Formats
Flink comes with three built-in BulkWriter factories:
- ParquetWriterFactory
- SequenceFileWriterFactory
- CompressWriterFactory
Parquet format
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet_2.11</artifactId> <version>1.10.0</version> </dependency> import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.formats.parquet.avro.ParquetAvroWriters import org.apache.avro.Schema val schema: Schema = ... val input: DataStream[GenericRecord] = ... val sink: StreamingFileSink[GenericRecord] = StreamingFileSink .forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema)) .build() input.addSink(sink)
Hadoop SequenceFile format
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sequence-file</artifactId> <version>1.10.0</version> </dependency> import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.configuration.GlobalConfiguration import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.SequenceFile import org.apache.hadoop.io.Text; val input: DataStream[(LongWritable, Text)] = ... val hadoopConf: Configuration = HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration()) val sink: StreamingFileSink[(LongWritable, Text)] = StreamingFileSink .forBulkFormat( outputBasePath, new SequenceFileWriterFactory(hadoopConf, LongWritable.class, Text.class)) .build() input.addSink(sink)