@
還有視頻講解在我的B站-寶哥chbxw, 希望大家可以支持一下,謝謝。
前言之分層 API
Flink 根據抽象程度分層,提供了三種不同的 API。每一種 API 在簡潔性和表達力上有着不同的側重,並且針對不同的應用場景。
- ProcessFunction 是 Flink 所提供最底層接口。ProcessFunction 可以處理一或兩條輸入數據流中的單個事件或者歸入一個特定窗口內的多個事件。它提供了對於時間和狀態的細粒度控制。開發者可以在其中任意地修改狀態,也能夠注冊定時器用以在未來的某一時刻觸發回調函數。因此,你可以利用 ProcessFunction 實現許多有狀態的事件驅動應用所需要的基於單個事件的復雜業務邏輯。
- DataStream API 為許多通用的流處理操作提供了處理原語。這些操作包括窗口、逐條記錄的轉換操作,在處理事件時進行外部數據庫查詢等。DataStream API 支持 Java 和Scala 語言,預先定義了例如 map()、reduce()、aggregate() 等函數。你可以通過擴展實現預定義接口或使用 Java、Scala 的 lambda 表達式實現自定義的函數。
- SQL & Table API:Flink 支持兩種關系型的 API,Table API 和 SQL。這兩個 API 都是批處理和流處理統一的 API,這意味着在無邊界的實時數據流和有邊界的歷史記錄數據流上,關系型 API 會以相同的語義執行查詢,並產生相同的結果。Table API 和 SQL借助了 Apache Calcite 來進行查詢的解析,校驗以及優化。它們可以與 DataStream 和DataSet API 無縫集成,並支持用戶自定義的標量函數,聚合函數以及表值函數。
- 擴展庫
- 復雜事件處理(CEP):模式檢測是事件流處理中的一個非常常見的用例。Flink 的 CEP庫提供了 API,使用戶能夠以例如正則表達式或狀態機的方式指定事件模式。CEP 庫與Flink 的 DataStream API 集成,以便在 DataStream 上評估模式。CEP 庫的應用包括網絡入侵檢測,業務流程監控和欺詐檢測。
- DataSet API:DataSet API 是 Flink 用於批處理應用程序的核心 API。DataSet API 所提供的基礎算子包括 map、reduce、(outer) join、co-group、iterate 等。所有算子都有相應的算法和數據結構支持,對內存中的序列化數據進行操作。如果數據大小超過預留內存,則過量數據將存儲到磁盤。Flink 的 DataSet API 的數據處理算法借鑒了傳統數據庫算法的實現,例如混合散列連接(hybrid hash-join)和外部歸並排序(external merge-sort)。
- Gelly: Gelly 是一個可擴展的圖形處理和分析庫。Gelly 是在 DataSet API 之上實現的,並與 DataSet API 集成。因此,它能夠受益於其可擴展且健壯的操作符。Gelly 提供了內置算法,如 label propagation、triangle enumeration 和 page rank 算法,也提供了一個簡化自定義圖算法實現的 Graph API。
一、DataStream 的編程模型
DataStream 的編程模型包括四個部分:Environment,DataSource,Transformation,Sink。
二、Flink 的 DataSource 數據源
2.1、基於文件,此處是HDFS
package com.chb.flink.source
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object FileSource {
def main(args: Array[String]): Unit = {
//初始化Flink的Streaming(流計算)上下文執行環境
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//導入隱式轉換,建議寫在這里,可以防止IDEA代碼提示出錯的問題
import org.apache.flink.streaming.api.scala._
//讀取數據
val stream = streamEnv.readTextFile("hdfs://10.0.0.201:9000/README.txt")
//轉換計算
val result: DataStream[(String, Int)] = stream.flatMap(_.split(","))
.map((_, 1))
.keyBy(0)
.sum(1)
//打印結果到控制台
result.print()
//啟動流式處理,如果沒有該行代碼上面的程序不會運行
streamEnv.execute("wordcount")
}
}
2.2、基於集合的源
有點像Spark的序列化
package com.chb.flink.source
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object CollectionSource {
def main(args: Array[String]): Unit = {
//初始化Flink的Streaming(流計算)上下文執行環境
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//導入隱式轉換,建議寫在這里,可以防止IDEA代碼提示出錯的問題
import org.apache.flink.streaming.api.scala._
//讀取數據
var dataStream = streamEnv.fromCollection(Array(
new StationLog("001", "186", "189", "busy", 1577071519462L, 0),
new StationLog("002", "186", "188", "busy", 1577071520462L, 0),
new StationLog("003", "183", "188", "busy", 1577071521462L, 0),
new StationLog("004", "186", "188", "success", 1577071522462L, 32)
))
dataStream.print()
streamEnv.execute()
}
}
/*
* 通信基站日志數據
* @param sid 基站ID
* @param callOut 主叫號碼
* @param callIn 被叫號碼
* @param callType 通話類型eg:呼叫失敗(fail),占線(busy),拒接(barring),接通(success)
* @param callTime 呼叫時間戳,精確到毫秒
* @Param duration 通話時長 單位:秒
*/
class StationLog(sid: String, callOut: String, callIn: String, callType: String, callTime: Long, duration: Long)
2.3、Kafka
首 先 需 要 配 置 Kafka 連 接 器 的 依 賴 , 另 外 更 多 的 連 接 器 可 以 查 看 官 網
2.3.1、引入依賴
<!-- Kafka connector-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.1</version>
<exclusions>
<exclusion>
<!-- 排除對Jackson 的引用 ; -->
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
2.3.2、Kafka第一種Source
package com.chb.flink.source
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaSourceByString {
def main(args: Array[String]): Unit = {
//初始化Flink的Streaming(流計算)上下文執行環境
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//導入隱式轉換
import org.apache.flink.streaming.api.scala._
// kafka配置
val props = new Properties()
props.setProperty("bootstrap.servers", "ShServer:9092")
props.setProperty("group.id", "chb01")
props.setProperty("key.deserializer", classOf[StringDeserializer].getName)
props.setProperty("value.deserializer", classOf[StringDeserializer].getName)
props.setProperty("auto.offset.reset", "latest")
//設置kafka為數據源
val flinkKafkaConSumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), props)
val stream = streamEnv.addSource(flinkKafkaConSumer)
stream.print()
streamEnv.execute()
}
}
2.3.3、Kafka第二種Source
package com.chb.flink.source
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaSourceByKeyValue {
def main(args: Array[String]): Unit = {
//初始化Flink的Streaming(流計算)上下文執行環境
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//導入隱式轉換
import org.apache.flink.streaming.api.scala._
val props = new Properties()
props.setProperty("bootstrap.servers", "ShServer:9092")
props.setProperty("group.id", "fink02")
props.setProperty("key.deserializer", classOf[StringDeserializer].getName)
props.setProperty("value.deserializer", classOf[StringDeserializer].getName)
props.setProperty("auto.offset.reset", "latest")
//設置kafka為數據源
val stream = streamEnv.addSource(new
FlinkKafkaConsumer[(String, String)]("test", new KafkaDeserializationSchema[(String, String)] {
//流是否結束
override def isEndOfStream(t: (String, String)) = false
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]) = {
if (consumerRecord != null) {
var key = "null"
var value = "null"
if (consumerRecord.key() != null)
key = new String(consumerRecord.key(), "UTF-8")
if (consumerRecord.value() != null)
value = new String(consumerRecord.value(), "UTF-8")
(key, value)
} else { //如果kafka中的數據為空返回一個固定的二元組
("null", "null")
}
}
//設置返回類型為二元組
override def getProducedType =
createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[
String])
}
, props).setStartFromEarliest())
stream.print()
streamEnv.execute()
}
}
2.3.3.1、Kafka生產測試
package com.chb.flink.source
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.util.Random
object MyKafkaProducer {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.setProperty("bootstrap.servers", "ShServer:9092")
// 注意此處是序列化
props.setProperty("key.serializer", classOf[StringSerializer].getName)
props.setProperty("value.serializer", classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](props)
val random = new Random()
while(true) {
producer.send(new ProducerRecord[String, String]("test", "key" + random.nextInt(), "value" + random.nextInt()))
Thread.sleep(1000)
}
}
}
2.4、自定義Source
自定義數據源,有兩種方式實現:
通過實現 SourceFunction 接口來自定義無並行度(也就是並行度只能為 1)的 Source。
通過實現 ParallelSourceFunction 接口或者繼承 RichParallelSourceFunction 來自定義有並行度的數據源。
2.4.1、實現SourceFunction的自定義Source
package com.chb.flink.source
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.util.Random
/**
* 當然也可以自定義數據源,有兩種方式實現:
* 通過實現 SourceFunction 接口來自定義無並行度(也就是並行度只能為 1)的 Source。
* 通過實現 ParallelSourceFunction 接口或者繼承 RichParallelSourceFunction 來自
* 定義有並行度的數據源。
* *
* 寫一個實現SourceFunction接口
*/
class MyCustomerSource extends SourceFunction[StationLog] {
//是否終止數據流的標記
var flag = true;
/**
* 主要的方法
* 啟動一個Source
* 大部分情況下,都需要在這個run方法中實現一個循環,這樣就可以循環產生數據了
*
* @param sourceContext * @throws Exception
*/
override def run(sourceContext: SourceFunction.SourceContext[StationLog]):
Unit = {
val random = new Random()
var types = Array("fail", "busy", "barring", "success")
while (flag) { //如果流沒有終止,繼續獲取數據
1.to(5).map(i => {
var callOut = "1860000%04d".format(random.nextInt(10000))
var callIn = "1890000%04d".format(random.nextInt(10000))
new StationLog("station_" + random.nextInt(10), callOut, callIn, types(random.nextInt(4)), System.currentTimeMillis(), 0)
}).foreach(sourceContext.collect(_)) //發數據
Thread.sleep(2000) //每發送一次數據休眠2秒
}
}
//終止數據流
override def cancel(): Unit = flag = false
}
object CustomerSource {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
val stream: DataStream[StationLog] = env.addSource(new MyCustomerSource)
stream.print()
env.execute()
}
}
三、 Flink 的 Sink 數據目標
Flink 針對 DataStream 提供了大量的已經實現的數據目標(Sink),包括文件、Kafka、Redis、HDFS、Elasticsearch 等等。
3.1、HDFS Sink
3.1.1、配置支持 Hadoop FileSystem 的連接器依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.10.1</version>
</dependency>
3.1.2、Streaming File Sink
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
Streaming File Sink 能把數據寫入 HDFS 中,還可以支持分桶寫入,每一個分桶就對應 HDFS 中的一個目錄。默認按照小時來分桶,在一個桶內部,會進一步將輸出基於滾動策略切分成更小的文件。這有助於防止桶文件變得過大。滾動策略也是可以配置的,默認策略會根據文件大小和超時時間來滾動文件,超時時間是指沒有新數據寫入部分文件(part file)的時間。
3.1.2.1、滾動策略
- DefaultRollingPolicy
- CheckpointRollingPolicy
3.1.2.2、分桶策略
- DateTimeBucketAssigner : Default time based assigner
- BasePathBucketAssigner : Assigner that stores all part files in the base path (single global bucket)
注意必須開啟checkpoint, 否則生成的文件都是inprocess狀態
3.1.2.3、代碼實現
package com.chb.flink.sink
import com.chb.flink.source.{MyCustomerSource, StationLog}
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object HDFSFileSink {
def main(args: Array[String]): Unit = {
//初始化Flink的Streaming(流計算)上下文執行環境
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//導入隱式轉換
import org.apache.flink.streaming.api.scala._
// 啟動checkPoint, 否則,生成的文件都是inprocess狀態的
streamEnv.enableCheckpointing(1000)
// 數據源
val data: DataStream[StationLog] = streamEnv.addSource(new MyCustomerSource)
//創建一個文件滾動規則
val rolling: DefaultRollingPolicy[StationLog, String] = DefaultRollingPolicy.create()
.withInactivityInterval(2000) //不活動的間隔時間。
.withRolloverInterval(2000) //每隔兩秒生成一個文件 ,重要
.build()
//創建一個HDFS Sink
var hdfsSink = StreamingFileSink.forRowFormat[StationLog](
// 注意此處是flink的Path
new Path("hdfs://ShServer:9000/sink001/"), new SimpleStringEncoder[StationLog]("UTF-8"))
.withBucketCheckInterval(1000) //檢查分桶的間隔時間
// .withBucketAssigner(new MemberBucketAssigner)
.withRollingPolicy(rolling)
.build()
// 添加sink
data.addSink(hdfsSink)
streamEnv.execute()
}
import org.apache.flink.core.io.SimpleVersionedSerializer
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer
/**
* 自定義分桶策略
*/
class MemberBucketAssigner extends BucketAssigner[StationLog, String] {
// 指定桶名 yyyy-mm-dd
override def getBucketId(info: StationLog, context: BucketAssigner.Context): String = {
val date = new Date(info.callTime)
new SimpleDateFormat("yyyy-MM-dd/HH").format(date)
}
override def getSerializer: SimpleVersionedSerializer[String] = SimpleVersionedStringSerializer.INSTANCE
}
}
3.2、基於 Redis 的 Sink
Flink 除了內置的連接器外,還有一些額外的連接器通過 Apache Bahir 發布,包括:
Apache ActiveMQ (source/sink)
Apache Flume (sink)
Redis (sink)
Akka (sink)
Netty (source)
3.2.1、依賴
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
3.2.2、將結果寫道redis
package com.chb.flink.sink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object RedisSink {
def main(args: Array[String]): Unit = {
//初始化Flink的Streaming(流計算)上下文執行環境
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//導入隱式轉換,建議寫在這里,可以防止IDEA代碼提示出錯的問題
import org.apache.flink.streaming.api.scala._
//讀取數據
val stream = streamEnv.socketTextStream("hadoop01", 8888)
//轉換計算
val result = stream.flatMap(_.split(","))
.map((_, 1))
.keyBy(0)
.sum(1)
//連接redis的配置
val config = new FlinkJedisPoolConfig.Builder().setDatabase(1).setHost("hadoop01").setPort(6379).build()
//寫入redis
result.addSink(new RedisSink[(String, Int)](config, new RedisMapper[(String, Int)] {
override def getCommandDescription = new
RedisCommandDescription(RedisCommand.HSET, "t_wc")
override def getKeyFromData(data: (String, Int)) = {
data._1 //單詞
}
override def getValueFromData(data: (String, Int)) = {
data._2 + "" //單詞出現的次數
}
}))
streamEnv.execute()
}
}
3.3、Kafka Sink
3.3.1、第一種
package com.chb.flink.sink
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
/**
* Kafka Sink
*/
object KafkaSinkByString {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1) //默認情況下每個任務的並行度為1
import org.apache.flink.streaming.api.scala._
//讀取netcat流中數據 (實時流)
val stream1: DataStream[String] = streamEnv.socketTextStream("hadoop01", 8888)
//轉換計算
val result = stream1.flatMap(_.split(","))
//數據寫入Kafka,並且是KeyValue格式的數據
result.addSink(new FlinkKafkaProducer[String]("hadoop01:9092", "t_topic", new SimpleStringSchema()))
streamEnv.execute()
}
}
3.3.2、第二種
package com.chb.flink.sink
import java.lang
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
/**
* Kafka Sink
*/
object KafkaSinkByKeyValue {
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1) //默認情況下每個任務的並行度為1
import org.apache.flink.streaming.api.scala._
//讀取netcat流中數據 (實時流)
val stream1: DataStream[String] = streamEnv.socketTextStream("hadoop01", 8888)
//轉換計算
val result = stream1.flatMap(_.split(","))
.map((_, 1))
.keyBy(0)
.sum(1)
//Kafka生產者的配置
val props = new Properties()
props.setProperty("bootstrap.servers", "hadoop01:9092")
props.setProperty("key.serializer", classOf[StringSerializer].getName)
props.setProperty("value.serializer", classOf[StringSerializer].getName)
//數據寫入Kafka,並且是KeyValue格式的數據
result.addSink(new FlinkKafkaProducer[(String, Int)]("t_topic",
new KafkaSerializationSchema[(String, Int)] {
override def serialize(element: (String, Int), aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord("t_topic", element._1.getBytes, (element._2 + "").getBytes())
}
}, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)) //EXACTLY_ONCE 精確一次
streamEnv.execute()
}
}
3.4、自定義Sink
package com.chb.flink.sink
import java.sql.{Connection, DriverManager, PreparedStatement}
import com.chb.flink.source.{MyCustomerSource, StationLog}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/*
*從自定義的Source中讀取StationLog數據,通過Flink寫入Mysql數據庫
*
* 當然你可以自己定義 Sink,有兩種實現方式:
* 1、實現 SinkFunction 接口。
* 2、實現RichSinkFunction 類。后者增加了生命周期的管理功能。
* 比如需要在 Sink 初始化的時候創建連接對象,則最好使用第二種。
* 案例需求:把 StationLog 對象寫入 Mysql 數據庫中。
*/
object CustomJdbcSink {
//自定義一個Sink寫入Mysql
class MyCustomSink extends RichSinkFunction[StationLog] {
var conn: Connection = _
var pst: PreparedStatement = _
//生命周期管理,在Sink初始化的時候調用
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "123123")
pst = conn.prepareStatement("insert into t_station_log(sid, call_out, call_in, call_type, call_time, duration) values(?, ?, ?, ?, ?, ?)")
}
//把StationLog 寫入到表t_station_log
override def invoke(value: StationLog, context: SinkFunction.Context[_]): Unit = {
pst.setString(1, value.sid)
pst.setString(2, value.callOut)
pst.setString(3, value.callIn)
pst.setString(4, value.callType)
pst.setLong(5, value.callTime)
pst.setLong(6, value.duration)
pst.executeUpdate()
}
override def close(): Unit = {
pst.close()
conn.close()
}
}
def main(args: Array[String]): Unit = {
//初始化Flink的Streaming(流計算)上下文執行環境
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//導入隱式轉換,建議寫在這里,可以防止IDEA代碼提示出錯的問題
import org.apache.flink.streaming.api.scala._
val data: DataStream[StationLog] = streamEnv.addSource(new
MyCustomerSource)
//數據寫入msyql
data.addSink(new MyCustomSink)
streamEnv.execute()
}
}
四、DataStream 轉換算子
這個非常簡單,看api就知道
五、函數類和富函數類
上節的所有算子幾乎都可以自定義一個函數類、富函數類作為參數。因為Flink暴露了這兩種函數類的接口,常見的函數接口有:
- MapFunction
- FlatMapFunction
- ReduceFunction
- 。。。。。
富函數接口同其他常規函數接口的不同在於:可以獲取運行環境的上下文,在上下文環境中可以管理狀態(State),並擁有一些生命周期方法,所以可以實現更復雜的功能。富函數的接口有:
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- 。。。。。
5.1、普通函數類舉例:按照指定的時間格式輸出每個通話的撥號時間和結束時間
5.2、富函數類舉例:把呼叫成功的通話信息轉化成真實的用戶姓名
通話用戶對應的用戶表(在 Mysql 數據中)為:
由於需要從數據庫中查詢數據,就需要創建連接,創建連接的代碼必須寫在生命周期的open方法中。所以需要使用富函數類。
Rich Function 有一個生命周期的概念。典型的生命周期方法有:
- open()方法是 rich function 的初始化方法,當一個算子例如 map 或者 filter 被調用之前 open()會被調用。
- close()方法是生命周期中的最后一個調用的方法,做一些清理工作。
- getRuntimeContext()方法提供了函數的 RuntimeContext 的一些信息,例如函數執行的並行度,任務的名字,以及 state 狀態
package com.chb.flink.func
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import java.text.SimpleDateFormat
import com.chb.flink.source.StationLog
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 富函數類舉例:把呼叫成功的通話信息轉化成真實的用戶姓名
*/
object TestFunction {
def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 隱式轉換
import org.apache.flink.streaming.api.scala._
val data: DataStream[StationLog] = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)
.map(line => {
val arr = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
//定義時間輸出格式
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//過濾那些通話成功的
data.filter(_.callType.equals("success"))
.map(new CallMapFunction(format))
.print()
streamEnv.execute()
}
}
//自定義的富函數類
class CallRichMapFunction() extends RichMapFunction[StationLog, StationLog] {
var conn: Connection = _
var pst: PreparedStatement
= _
//生命周期管理,初始化的時候創建數據連接
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "123456")
pst = conn.prepareStatement("select name from t_phone where phone_number =?")
}
override def map(in: StationLog): StationLog = {
//查詢主叫用戶的名字
pst.setString(1, in.callOut)
val set1: ResultSet = pst.executeQuery()
if (set1.next()) {
in.callOut = set1.getString(1)
}
//查詢被叫用戶的名字
pst.setString(1, in.callIn)
val set2: ResultSet = pst.executeQuery()
if (set2.next()) {
in.callIn = set2.getString(1)
}
in
}
//關閉連接
override def close(): Unit = {
pst.close()
conn.close()
}
}
六、底層 ProcessFunctionAPI
ProcessFunction 是一個低層次的流處理操作,允許返回所有 Stream 的基礎構建模塊:
- 訪問 Event 本身數據(比如:Event 的時間,Event 的當前 Key 等)
- 管理狀態 State(僅在 Keyed Stream 中)
- 管理定時器 Timer(包括:注冊定時器,刪除定時器等)
總而言之,ProcessFunction 是 Flink 最底層的 API,也是功能最強大的。
例如:監控每一個手機,如果在 5 秒內呼叫它的通話都是失敗的,發出警告信息。
package com.chb.flink.func
import java.text.SimpleDateFormat
import java.util.Date
import com.chb.flink.source.StationLog
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
/**
* 監控每一個手機號,如果在5秒內呼叫它的通話都是失敗的,發出警告信息
* 在5秒中內只要有一個呼叫不是fail則不用警告
*/
object TestProcessFunction {
def main(args: Array[String]): Unit = {
//初始化Flink的Streaming(流計算)上下文執行環境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//導入隱式轉換
import org.apache.flink.streaming.api.scala._
//讀取socket數據
val data = streamEnv.socketTextStream("10.0.0.201", 8888)
.map(line => {
var arr = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
//處理數據
data.keyBy(_.callOut)
.process(new MonitorCallFail())
.print()
streamEnv.execute()
}
class MonitorCallFail() extends KeyedProcessFunction[String, StationLog, String] {
// 定義一個狀態記錄時間
lazy val timeState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time", classOf[Long]))
// 處理數據
override def processElement(value: StationLog,
context: KeyedProcessFunction[String, StationLog, String]#Context,
collector: Collector[String]): Unit = {
val time = timeState.value() // 從狀態中取出時間
if (value.callType.equals("fail") && time == 0) { // 第一次失敗
// 獲取當前時間, 注冊定時器
val now = context.timerService().currentProcessingTime()
var onTime = now + 5000L // 5秒后觸發
context.timerService().registerProcessingTimeTimer(onTime);
println("first time: " + new Date())
timeState.update(onTime)
}
// 有呼叫成功, 取消觸發器
if (!value.callType.equals("fail") && time != 0) {
context.timerService().deleteProcessingTimeTimer(time)
timeState.clear()
}
}
// 時間到, 執行觸發器,發出告警
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, StationLog, String]#OnTimerContext,
out: Collector[String]): Unit = {
val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
var warnStr = "觸發時間:" + df.format(new Date(timestamp)) + " 手機號:" + ctx.getCurrentKey
out.collect(warnStr)
timeState.clear()
}
}
}
七、側輸出流 Side Output
在 flink 處理數據流時,我們經常會遇到這樣的情況:在處理一個數據源時,往往需要將該源中的不同類型的數據做分割處理
- 如果使用 filter 算子對數據源進行篩選分割的話,勢必會造成數據流的多次復制,造成不必要的性能浪費;
- 側輸出就是將數據流進行分割,而不對流進行復制的一種分流機制。
- flink 的側輸出的另一個作用就是對延時遲到的數據進行處理,這樣就可以不必丟棄遲到的數據。
案例:根據基站的日志,請把呼叫成功的 Stream(主流)和不成功的 Stream(側流)分別輸出。
package com.chb.flink.func
import com.chb.flink.source.StationLog
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
/**
* 把呼叫成功的Stream(主流)和不成功的Stream(側流)分別輸出。
*/
object TestSideOutputStream {
//側輸出流首先需要定義一個流的標簽 , 此處需要將隱式轉換放在前面
import org.apache.flink.streaming.api.scala._
var notSuccessTag = new OutputTag[StationLog]("not_success")
def main(args: Array[String]): Unit = {
//初始化Flink的Streaming(流計算)上下文執行環境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1)
//讀取文件數據
val data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)
.map(line => {
var arr = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
val mainStream: DataStream[StationLog] = data.process(new CreateSideOutputStream(notSuccessTag))
//得到側流
val sideOutput: DataStream[StationLog] = mainStream.getSideOutput(notSuccessTag)
mainStream.print("main")
sideOutput.print("sideoutput")
streamEnv.execute()
}
class CreateSideOutputStream(tag: OutputTag[StationLog]) extends ProcessFunction[StationLog, StationLog] {
override def processElement(value: StationLog, ctx: ProcessFunction[StationLog, StationLog]#Context, out: Collector[StationLog]): Unit = {
if (value.callType.equals("success")) { //輸出主流
out.collect(value)
} else { //輸出側流
ctx.output(tag, value)
}
}
}
}