2. Flink 的 DataSource 數據源
4) 自定義 Source
當然也可以自定義數據源,有兩種方式實現:
- 通過實現 SourceFunction 接口來自定義無並行度(也就是並行度只能為 1)的 Source。
- 通過實現 ParallelSourceFunction 接口或者繼承 RichParallelSourceFunction 來自定義有並行度的數據源。
代碼示例:
1 package com.it.flink.source 2 3 import java.util.Properties 4 5 import org.apache.flink.api.common.typeinfo.TypeInformation 6 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation} 7 import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} 8 import org.apache.kafka.clients.consumer.ConsumerRecord 9 import org.apache.kafka.common.serialization.StringDeserializer 10 11 object SourceFromKafkaByKeyValue { 12 def main(args: Array[String]): Unit = { 13 14 // 1. 初始化流計算的環境 15 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment 16 streamEnv.setParallelism(1) 17 18 val properties: Properties = new Properties() 19 properties.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092") 20 properties.setProperty("group.id", "fink02") 21 properties.setProperty("key.deserializer", classOf[StringDeserializer].getName) 22 properties.setProperty("value.deserializer", classOf[StringDeserializer].getName) 23 properties.setProperty("auto.offset.reset", "latest") 24 25 val stream: DataStream[(String, String)] = streamEnv.addSource( 26 new FlinkKafkaConsumer[(String, String)]("topic2", new MyKafkaReader, properties)) 27 stream.print() 28 streamEnv.execute("SourceFromKafkaByKeyValue") 29 } 30 } 31 32 class MyKafkaReader extends KafkaDeserializationSchema[(String, String)] { 33 override def isEndOfStream(t: (String, String)): Boolean = false 34 35 override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { 36 if (consumerRecord == null) { 37 return ("null", "null") 38 } 39 var key = "" 40 var value = "" 41 if (consumerRecord.key() != null) { 42 key = new String(consumerRecord.key(), "UTF-8") 43 } 44 if (consumerRecord.value() != null) { 45 value = new String(consumerRecord.value(), "UTF-8") 46 } 47 (key, value) 48 } 49 50 override def getProducedType: TypeInformation[(String, String)] = { 51 createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) 52 } 53 }
3. Flink 的 Sink 數據目標
Flink 針對 DataStream 提供了大量的已經實現的數據目標(Sink),包括文件、Kafka、Redis、HDFS、Elasticsearch 等等。
1) 基於 HDFS 的 Sink
首先配置支持 Hadoop FileSystem 的連接器依賴。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.12</artifactId> <version>1.11.2</version> </dependency>
Streaming File Sink 能把數據寫入 HDFS 中,還可以支持分桶寫入,每一個分桶就對應 HDFS 中的一個目錄。默認按照小時來分桶,在一個桶內部,會進一步將輸出基於滾動策略切分成更小的文件。這有助於防止桶文件變得過大。滾動策略也是可以配置的,默認 策略會根據文件大小和超時時間來滾動文件,超時時間是指沒有新數據寫入部分文件(part file)的時間。
代碼示例:
package com.it.sink import com.it.flink.source.{MyCustomerSource, StationLog} import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} /** * 每隔10s向hdfs中生成一條數據 */ object HdfsSink { def main(args: Array[String]): Unit = { // 1. 初始化流計算的環境 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) // 讀取數據源 val stream: DataStream[StationLog] = streamEnv.addSource(new MyCustomerSource) // 默認一個小時一個目錄(分桶) // 設置一個滾動策略 val rollingPolicy: DefaultRollingPolicy[StationLog, String] = DefaultRollingPolicy.builder() .withInactivityInterval(2000) //不活動的分桶等待時間 .withRolloverInterval(10000) // 每隔10秒生成一個文件 .build() // 創建HDFS的sink val hdfsSink: StreamingFileSink[StationLog] = StreamingFileSink.forRowFormat[StationLog]( new Path("hdfs://node1/temp/MySink001"), new SimpleStringEncoder[StationLog]("UTF-8") ).withRollingPolicy(rollingPolicy) .withBucketCheckInterval(1000) // 檢查時間間隔 .build() stream.addSink(hdfsSink) streamEnv.execute() } }
2) 基於 Redis 的 Sink
https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
Flink 除了內置的連接器外,還有一些額外的連接器通過 Apache Bahir 發布,包括:
- Apache ActiveMQ (source/sink)
- Apache Flume (sink)
- Redis (sink)
- Akka (sink)
- Netty (source)
這里我用 Redis 來舉例,首先需要配置 Redis 連接器的依賴:
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.12</artifactId> <version>1.1-SNAPSHOT</version> </dependency>
接下來我們可以把 WordCount 的結果寫入 Redis 中:
package com.it.sink import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} /** * 把netcat作為數據源,統計單詞數量並存入redis */ object RedisSink { def main(args: Array[String]): Unit = { // 1. 初始化流計算的環境 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) val stream: DataStream[String] = streamEnv.socketTextStream("node1", 8888) val result: DataStream[(String, Int)] = stream.flatMap(_.split(" ")) .map((_, 1)) .keyBy(0) .sum(1) val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder() .setDatabase(3) .setHost("node3") .setPort(6379) .build() result.addSink(new RedisSink[(String, Int)](config, new RedisMapper[(String, Int)] { // 設置redis的命令 override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET, "t_wc") } override def getKeyFromData(t: (String, Int)): String = { t._1 } override def getValueFromData(t: (String, Int)): String = { t._2.toString } })) streamEnv.execute() } }
3) 基於 Kafka 的 Sink
由於前面有的課程已經講過 Flink 的 Kafka 連接器,所以還是一樣需要配置 Kafka 連接器的依賴配置,接下我們還是把 WordCout 的結果寫入 Kafka:
本案例有netcat作為數據源,將kafka的生產者作為flink的sink
package com.it.sink import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer object KafkaSinkByString { def main(args: Array[String]): Unit = { val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) val stream: DataStream[String] = streamEnv.socketTextStream("node1", 8888) val words: DataStream[String] = stream.flatMap(_.split(" ")) words.addSink(new FlinkKafkaProducer[String]("node1:9092,node2:9092,node3:9092", "t_2020", new SimpleStringSchema())) streamEnv.execute("KafkaSinkByString") } }
得到結果:
當然 生產中我們更多的是將KeyValue格式的數據寫入kafka:
package com.it.sink import java.lang import java.util.Properties import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema} import org.apache.kafka.clients.producer.ProducerRecord /** * kafka作為sink的第二種 (KV) */ object KafkaSinkByKeyValue { def main(args: Array[String]): Unit = { val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) val stream: DataStream[String] = streamEnv.socketTextStream("node1", 8888) val properties: Properties = new Properties() properties.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092") val result: DataStream[(String, Int)] = stream.flatMap(_.split(" ")) .map((_, 1)) .keyBy(0) .sum(1) result.addSink(new FlinkKafkaProducer[(String, Int)]( "t_2020", new KafkaSerializationSchema[(String, Int)] { override def serialize(t: (String, Int), aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { new ProducerRecord("t_2020", t._1.getBytes(), t._2.toString.getBytes()) } }, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)) streamEnv.execute() } }
結果展示,注意加參數:
4) 自定義的 Sink
生產中會用,不太常用
當然你可以自己定義 Sink,有兩種實現方式:1、實現 SinkFunction 接口。2、實現RichSinkFunction 類。后者增加了生命周期的管理功能。比如需要在 Sink 初始化的時候創建連接對象,則最好使用第二種。案例需求:把 StationLog 對象寫入 Mysql 數據庫中。
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.17</version> </dependency>
package com.it.sink import java.sql.{Connection, DriverManager, PreparedStatement} import com.it.flink.source.{MyCustomerSource, StationLog} import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} /** * 隨機生成stationLog對象,寫入mysql數據庫的表(t_station_log)中
* 需要自己手動創建表 */ object CustomerJdbcSink { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val stream: DataStream[StationLog] = env.addSource(new MyCustomerSource) stream.addSink(new MyCustomerJdbcSink) env.execute() } } class MyCustomerJdbcSink extends RichSinkFunction[StationLog] { var conn: Connection = _ var pst: PreparedStatement = _ //生命周期管理,在Sink初始化的時候調用 override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://node1/test", "root", "12345678") pst = conn.prepareStatement( "insert into t_station_log(sid, call_out, call_in, call_type, call_time, duration) " + "values(?, ?, ?, ?, ?, ?)") } override def invoke(value: StationLog, context: SinkFunction.Context[_]): Unit = { pst.setString(1, value.sid) pst.setString(2, value.callOut) pst.setString(3, value.callIn) pst.setString(4, value.callType) pst.setLong(5, value.callTime) pst.setLong(6, value.duration) pst.executeUpdate() } override def close(): Unit = { pst.close() conn.close() } }
4. DataStream 轉換算子
即通過從一個或多個 DataStream 生成新的 DataStream 的過程被稱為 Transformation操作。在轉換過程中,每種操作類型被定義為不同的 Operator,Flink 程序能夠將多個
Transformation 組成一個 DataFlow 的拓撲。
1) Map [DataStream->DataStream]
調 用 用 戶 定 義 的 MapFunction 對 DataStream[T] 數 據 進 行 處 理 , 形 成 新 的Data-Stream[T],其中數據格式可能會發生變化,常用作對數據集內數據的清洗和轉換。例如將輸入數據集中的每個數值全部加 1 處理,並且將數據輸出到下游數據集。
Map DataStream → DataStream |
Takes one element and produces one element. A map function that doubles the values of the input stream: |
2) FlatMap [DataStream->DataStream]
該算子主要應用處理輸入一個元素產生一個或者多個元素的計算場景,比較常見的是在經典例子 WordCount 中,將每一行的文本數據切割,生成單詞序列如在圖所示,對於輸入DataStream[String]通過 FlatMap 函數進行處理,字符串數字按逗號切割,然后形成新的整數數據集。FlatMap可以替換Map
FlatMap DataStream → DataStream |
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words: |
3) Filter [DataStream->DataStream]
該算子將按照條件對輸入數據集進行篩選操作,將符合條件的數據集輸出,將不符合條件的數據過濾掉。
Filter DataStream → DataStream |
Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values: |
4) KeyBy [DataStream->KeyedStream]
該算子根據指定的 Key 將輸入的 DataStream[T]數據格式轉換為 KeyedStream[T],也就是在數據集中執行 Partition 操作,將相同的 Key 值的數據放置在相同的分區中。如下圖所
示,將白色方塊和灰色方塊通過顏色的 Key 值重新分區,將數據集分為具有灰色方塊的數據集合。
KeyBy DataStream → KeyedStream |
Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedStream. |
將數據集中第一個參數作為 Key,對數據集進行 KeyBy 函數操作,形成根據 id 分區的KeyedStream 數據集。其中 keyBy 方法輸入為 DataStream[T]數據集。
5) Reduce [KeyedStream->DataStream]
Reduce KeyedStream → DataStream |
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value. |
該算子和 MapReduce 中 Reduce 原理基本一致,主要目的是將輸入的 KeyedStream 通過傳 入 的 用 戶 自 定 義 的 ReduceFunction 滾 動 地 進 行 數 據 聚 合 處 理 , 其 中 定 義 的
ReduceFunciton 必須滿足運算結合律和交換律。如下代碼對傳入 keyedStream 數據集中相同的 key 值的數據獨立進行求和運算,得到每個 key 所對應的求和值。
val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("c",5), ("a", 5)) //指定第一個字段為分區Key val keyedStream: KeyedStream[(String,Int), Tuple] =dataStream.keyBy(0) /滾動對第二個字段進行reduce相加求和 val reduceStream = keyedStream.reduce { (t1, t2) => (t1._1, t1._2 + t2._2) }
6) Aggregations[KeyedStream->DataStream]
Aggregations KeyedStream → DataStream |
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). |
Aggregations 是 KeyedDataStream 接口提供的聚合算子,根據指定的字段進行聚合操
作,滾動地產生一系列數據聚合結果。其實是將 Reduce 算子中的函數進行了封裝,封裝的
聚合操作有sum,min,max 等,這樣就不需要用戶自己定義 Reduce 函數。
如下代碼所示,指定數據集中第一個字段作為 key,用第二個字段作為累加字段,然后滾動
地對第二個字段的數值進行累加並輸出。
7) Union[DataStream ->DataStream]
Union DataStream* → DataStream |
Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream. |
Union 算子主要是將兩個或者多個輸入的數據集合並成一個數據集,需要保證兩個數據集的格式一致,輸出的數據集的格式和輸入的數據集格式保持一致,如圖所示,將灰色方塊數據集和黑色方塊數據集合並成一個大的數據集。
8) Connect,CoMap,CoFlatMap[DataStream ->ConnectedStream->DataStream]
Connect DataStream,DataStream → ConnectedStreams |
"Connects" two data streams retaining their types, allowing for shared state between the two streams. |
CoMap, CoFlatMap ConnectedStreams → DataStream |
Similar to map and flatMap on a connected data stream |
Connect 算子主要是為了合並兩種或者多種不同數據類型的數據集,合並后會保留原來數據集的數據類型。例如:dataStream1 數據集為(String, Int)元祖類型,dataStream2數據集為 Int 類型,通過 connect 連接算子將兩個不同數據類型的流結合在一起,形成格式為 ConnectedStreams 的數據集,其內部數據為[(String, Int), Int]的混合數據類型,保留了兩個原始數據集的數據類型。
- Union 之前兩個流的類型必須是一樣,Connect 可以不一樣,在之后的 coMap 中再去調整成為一樣的。
- Connect 只能操作兩個流,Union 可以操作多個。
9) Split 和 select [DataStream->SplitStream->DataStream]
Split DataStream → SplitStream |
Split the stream into two or more streams according to some criterion.
|
Select SplitStream → DataStream |
Select one or more streams from a split stream. |
Split 算子是將一個 DataStream 數據集按照條件進行拆分,形成兩個數據集的過程,也是 union 算子的逆向實現。每個接入的數據都會被路由到一個或者多個輸出數據集中。
在使用 split 函數中,需要定義 split 函數中的切分邏輯,通過調用 split 函數,然后指定條件判斷函數,如下面的代碼所示:將根據第二個字段的奇偶性將數據集標記出來,如果是偶數則標記為 even,如果是奇數則標記為 odd,然后通過集合將標記返回,最終生成格式 SplitStream 的數據集。
//創建數據集 val dataStream1: DataStream[(String, Int)] = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("c", 5), ("a", 5)) //合並兩個DataStream數據集 val splitedStream: SplitStream[(String, Int)] = dataStream1.split(t => if (t._2 % 2 == 0) Seq("even") else Seq("odd"))
split 函數本身只是對輸入數據集進行標記,並沒有將數據集真正的實現切分,因此需要借助 Select 函數根據標記將數據切分成不同的數據集。如下代碼所示,通過調用SplitStream 數據集的 select()方法,傳入前面已經標記好的標簽信息,然后將符合條件的數據篩選出來,形成新的數據集。
//篩選出偶數數據集 val evenStream: DataStream[(String, Int)] = splitedStream.select("even") //篩選出奇數數據集 val oddStream: DataStream[(String, Int)] = splitedStream.select("odd") //篩選出奇數和偶數數據集 val allStream: DataStream[(String, Int)] = splitedStream.select("even", "odd")
更多算子請參考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/operators/
5. 函數類和富函數類
前面的所有算子幾乎都可以自定義一個函數類、富函數類作為參數。因為 Flink暴露了者兩種函數類的接口,常見的函數接口有:
- MapFunction
- FlatMapFunction
- ReduceFunction
- 。。。。。
富函數接口它其他常規函數接口的不同在於:可以獲取運行環境的上下文,在上下文環境中可以管理狀態(State 在下一章節中提到),並擁有一些生命周期方法,所以可以實現更復雜的功能。富函數的接口有:
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- 。。。。。
1)普通函數類舉例:按照指定的時間格式輸出每個通話的撥號時間和結束時間。數據如下:
station_4,18600003294,18900004149,busy,1606550754162,0 station_6,18600007904,18900004783,success,1606550754162,20 station_8,18600000183,18900000865,success,1606550754162,10 station_0,18600005712,18900000678,failed,1606550754162,0 station_6,18600002230,18900000053,busy,1606550754163,0 station_0,18600008917,18900008108,barring,1606550754163,0 station_0,18600002944,18900008155,busy,1606550754163,0 station_4,18600004526,18900006991,barring,1606550754163,0 station_5,18600003263,18900006649,barring,1606550754163,0 station_9,18600007853,18900004122,success,1606550754163,60
代碼示例:
package com.it.transformation import java.text.SimpleDateFormat import com.it.flink.source.StationLog import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object TestFunctionClass { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val path: String = getClass.getResource("/station.log").getPath() val stream: DataStream[StationLog] = env.readTextFile(path) .map(line => { val arr: Array[String] = line.split(",") StationLog(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5).toLong) }) val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") stream.filter(_.callType.equals("success")) .map(new MyMapFunction(format)) .print() env.execute() } } /** * 繼承MapFunction實現自定義函數類 * * @param format */ class MyMapFunction(format: SimpleDateFormat) extends MapFunction[StationLog, String] { override def map(t: StationLog): String = { var startTime = t.callTime var endTime = t.callTime + t.duration * 1000 "主叫號碼:" + t.callIn + ",被叫號碼:" + t.callOut + ",起始時間:" + format.format(startTime) + ",結束時間:" + format.format(endTime) } }
結果顯示:
主叫號碼:18900004783,被叫號碼:18600007904,起始時間:2020-11-28 16:05:54,結束時間:2020-11-28 16:06:14 主叫號碼:18900000865,被叫號碼:18600000183,起始時間:2020-11-28 16:05:54,結束時間:2020-11-28 16:06:04 主叫號碼:18900004122,被叫號碼:18600007853,起始時間:2020-11-28 16:05:54,結束時間:2020-11-28 16:06:54
使用富函數類方法示例:
/** * 把通話成功的電話號碼轉化成真實的用戶姓名,用戶姓名保存在mysql數據庫 */ object TestRichFunctionClass { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val path: String = getClass.getResource("/station.log").getPath() val stream: DataStream[StationLog] = env.readTextFile(path) .map(line => { val arr: Array[String] = line.split(",") StationLog(arr(0), arr(1), arr(2), arr(3), arr(4).toLong, arr(5).toLong) }) stream.filter(_.callType.equals("success")) .map(new MyRichMapFunction()) .print() env.execute() } } class MyRichMapFunction extends RichMapFunction[StationLog, StationLog] { var conn: Connection = _ var pst: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc://node1/test", "root", "12345678") pst = conn.prepareStatement("select name from t_phone where phone_number=?") } override def map(in: StationLog): StationLog = { //查詢主叫用戶的名字 pst.setString(1, in.callOut) val set1: ResultSet = pst.executeQuery() if (set1.next()) { in.callOut = set1.getString(1) } //查詢被叫用戶的名字 pst.setString(1, in.callIn) val set2: ResultSet = pst.executeQuery() if (set2.next()) { in.callIn = set2.getString(1) } in } override def close(): Unit = { pst.close() conn.close() } }
6. 底層 ProcessFunctionAPI
ProcessFunction 是一個低層次的流處理操作,允許返回所有 Stream 的基礎構建模塊:
- 訪問 Event 本身數據(比如:Event 的時間,Event 的當前 Key 等)
- 管理狀態 State(僅在 Keyed Stream 中)
- 管理定時器 Timer(包括:注冊定時器,刪除定時器等)
總而言之,ProcessFunction 是 Flink 最底層的 API,也是功能最強大的。
例如:監控每一個手機,如果在 5 秒內呼叫它的通話都是失敗的,發出警告信息。注意:這個案例中會用到狀態編程,只要知道狀態的意思,不需要掌握。后面的章節中會詳細講解 State 編程。
案例代碼:
package com.it.transformation import com.it.flink.source.StationLog import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector /** * 監控每一個手機號,如果在5秒內呼叫它的通話都是失敗的,發出警告信息 * 在5秒中內只要有一個呼叫不是fail則不用警告 */ object TestProcessFunction { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //讀取文件數據 val data = env.socketTextStream("node1", 8888) .map(line => { var arr = line.split(",") StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong) }) //處理數據 data.keyBy(_.callOut) .process(new MonitorCallFail()) .print() env.execute() } } /** * 自定義底層類 */ class MonitorCallFail extends KeyedProcessFunction[String, StationLog, String] { // 使用狀態對象記錄時間 lazy val timeState: ValueState[Long] = getRuntimeContext .getState(new ValueStateDescriptor[Long]("time", classOf[Long])) override def processElement(value: StationLog, ctx: KeyedProcessFunction[String, StationLog, String]#Context, out: Collector[String]): Unit = { //從狀態中取得時間 var time = timeState.value() if (value.callType.equals("fail") && time == 0) { //表示第一次發現呼叫當前手機號是失敗的 //獲取當前時間,並注冊定時器 var nowTime = ctx.timerService().currentProcessingTime() var onTime = nowTime + 5000L //5秒后觸發 ctx.timerService().registerProcessingTimeTimer(onTime) timeState.update(onTime) } if (!value.callType.equals("fail") && time != 0) { //表示有呼叫成功了,可以取消觸發器 ctx.timerService().deleteProcessingTimeTimer(time) timeState.clear() } } //時間到了,執行觸發器,發出告警 override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, StationLog, String]#OnTimerContext, out: Collector[String]): Unit = { var warnStr = "觸發時間:" + timestamp + " 手機號:" + ctx.getCurrentKey out.collect(warnStr) timeState.clear() } }
7. 側輸出流 Side Output
在 flink 處理數據流時,我們經常會遇到這樣的情況:在處理一個數據源時,往往需要將該源中的不同類型的數據做分割處理,如果使用 filter 算子對數據源進行篩選分割的話,勢必會造成數據流的多次復制,造成不必要的性能浪費;flink 中的側輸出就是將數據流進行分割,而不對流進行復制的一種分流機制。flink 的側輸出的另一個作用就是對延時遲到的數據進行處理,這樣就可以不必丟棄遲到的數據。在后面的章節中會講到!
案例:根據基站的日志,請把呼叫成功的 Stream(主流)和不成功的 Stream(側流)分別輸出。
sum