1 前言
公司有一個項目整體的架構是要消費kafka數據並存入數據庫,以前選用的工具是spark streaming,最近flink已經變得比較流行了,所以也嘗試一下flink消費數據與spark streaming的區別。首先來簡單了解一下flink,它具有了流計算和批處理功能。它可以處理有界數據和無界數據,也就是可以處理永遠生產的數據。具體的細節我們不討論,我們直接搭建一個flink功能。總體的思路是source -> transform -> sink,即從source獲取相應的數據來源,然后進行數據轉換,將數據從比較亂的格式,轉換成我們需要的格式,轉換處理后,然后進行sink功能,也就是將數據寫入到相應的db里邊或文件中用於存儲和展現。
2 環境准備
JDK 1.8
Scala 2.11.8
Flink 1.10.1
3 代碼
3.1 添加依賴
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.10.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.12</artifactId>
<version>1.10.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.8</version>
</dependency>
</dependencies>
3.2 程序入口
/**
* flink 消費kafka數據 並將消費后的數據存入 PostgreSQL
*/
object FlinkTest {
def main(args: Array[String]): Unit = {
//創建表描述器
val filterRules: MapStateDescriptor[String, String] = new MapStateDescriptor("filter_rule", BasicTypeInfo.STRING_TYPE_INFO, Types.STRING)
// 讀取命令行參數
val fromArgs = ParameterTool.fromArgs(args)
// 讀取配置文件
val parameterTool = ParameterTool.fromPropertiesFile(fromArgs.get("properties"))
//checkpoint配置
val checkpointDirectory = parameterTool.getRequired("checkpointDirectory.test")
val checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval")
// kafka設置
val kafkaBootStrapServers = parameterTool.getRequired("kafka.bootstrap.servers.test")
val kafkaGroupID = parameterTool.getRequired("kafka.group.id.test")
val kafkaSourceTopic = parameterTool.getRequired("kafka.topic.test")
// Postgresql 配置
val postgresqlHost = parameterTool.getRequired("postgresql.host")
val postgresqlPort = parameterTool.getInt("postgresql.port")
val postgresqlDB = parameterTool.getRequired("postgresql.db")
val postgresqlUser = parameterTool.getRequired("postgresql.user")
val postgresqlPassword = parameterTool.getRequired("postgresql.password")
val postgresqlInterval = parameterTool.getInt("postgre.secondInterval")
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 開啟Checkpoint ,每10000毫秒進行依次 checkoint
//env.enableCheckpointing(10000);
env.enableCheckpointing(60000)
// Checkpoint 語義設置為 Exactly_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// Checkpoint 的超時時間
env.getCheckpointConfig.setCheckpointTimeout(120000)
// 同時只允許一個Checkpoint在發生
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 兩次Checkpoint之間的最小時間間隔為 checkpointSecondInterval * 1000 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(checkpointSecondInterval * 1000)
// 當FLink任務取消時保留外部保存的Checkpoint信息
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// 作業最多允許Checkpoint失敗10次
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(10)
// 設置狀態后端
val backend = new RocksDBStateBackend(checkpointDirectory, true)
env.setStateBackend(backend.asInstanceOf[StateBackend])
/*
kafka配置
*/
val kafkaSourceProps = new Properties()
kafkaSourceProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapServers)
kafkaSourceProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupID)
//增加kafkasource 數據源 kafka的消費者
val kafkaSource = env.addSource(new FlinkKafkaConsumer(kafkaSourceTopic, new SimpleStringSchema, kafkaSourceProps))
.uid("kafkaSource").name("kafkaSource").setParallelism(3)
import scala.collection.mutable.Map
//提取數據,並解析
val alarmEventStream = kafkaSource.process(new ProcessFunction[String, Map[String,String]] {
override def processElement(value: String, ctx: ProcessFunction[String, Map[String, String]]#Context, out: Collector[Map[String, String]]): Unit = {
if(value.contains("業務所需信息")){
val arrs = value.split("\n")
var map:Map[String,String] = Map()
var key = ""
for(a <- arrs){
key = a.split(":")(0)
map.put(key,a.split(key+":")(1))
}
out.collect(map)
}
}
}).uid("test").name("test").setParallelism(2)
// 自定義PostgreSQL Source,周期性地從PostgreSQL中獲取過濾數據,並廣播出去
val pgsource = new PostgreSQLSource(postgresqlHost, postgresqlPort, postgresqlDB, postgresqlUser, postgresqlPassword, postgresqlInterval)
val configStream: SingleOutputStreamOperator[ListBuffer[String]] = env.addSource(pgsource)
.uid("DataListFromPostgreSQL")
.name("DataListFromPostgreSQL").setParallelism(3)
// 將過濾流廣播,形成BroadcastStream
val filterRulesStream: BroadcastStream[ListBuffer[String]] = configStream.broadcast(filterRules)
//實時流和過濾流連接
val connectedStream: BroadcastConnectedStream[Map[String, String], ListBuffer[String]] =alarmEventStream.connect(filterRulesStream)
val filter = new FilterSpecificProblemIDProcessFunction()
val filterStream: SingleOutputStreamOperator[Map[String,String]] = connectedStream.process(filter)
// 將過濾后的數據寫入 PostgreSQL
filterStream.addSink(
new ESBPostgreSQLSink
(postgresqlHost, postgresqlPort, postgresqlDB, postgresqlUser, postgresqlPassword))
.uid("PostgresSQLSink")
.name("PostgresSQLSink")
.setParallelism(1)
//執行任務
env.execute("test")
}
}
3.3 自定義source
在提取pg數據庫過濾數據的時候,由於flink沒有實現pg數據庫的source,所以需要我們自己定義一個source
class PostgreSQLSource(host:String,port:Int,db:String,username:String,pass:String,secondInterval:Int) extends RichSourceFunction[ListBuffer[String]]{
private var connection: Connection = null
private var ps: PreparedStatement = null
//表示數據源是否運行正常
var running: Boolean = true
override def open(parameters: Configuration): Unit = {
super.open(parameters)
Class.forName("org.postgresql.Driver")
connection = DriverManager.getConnection("jdbc:postgresql://" + host + ":" + port + "/" + db, username, pass)
val sql = "select testData from testTable"
ps = connection.prepareStatement(sql)
}
override def close(): Unit = {
if (connection!=null){
connection.close()
}
if (ps !=null){
ps.close()
}
}
override def cancel(): Unit = {
running = false
}
override def run(ctx: SourceFunction.SourceContext[ListBuffer[String]]): Unit = {
try {
while (running) {
var listBuffer: ListBuffer[String] = ListBuffer()
var res: ResultSet = ps.executeQuery;
while (res.next()) {
val testData = res.getString("testData");
listBuffer += testData
}
ctx.collect(listBuffer)
}
}catch {
case e : Exception=>{
println("出現異常")
}
}
}
}
3.4 自定義BroadcastProcessFunction
在處理實時流和過濾流連接后的連接流時,需要我們傳入一個BroadcastProcessFunction,這里面我們自定義一個類繼承BroadcastProcessFunction,然后再完成我們的業務邏輯
class FilterSpecificProblemIDProcessFunction extends BroadcastProcessFunction[Map[String,String],ListBuffer[String],Map[String,String]]{
/**
* 定義MapStateDescriptor 狀態描述符
*/
val filterRules: MapStateDescriptor[String, String] = new MapStateDescriptor("filter_rule", BasicTypeInfo.STRING_TYPE_INFO, Types.STRING)
override def processElement(value: Map[String, String], readOnlyContext: BroadcastProcessFunction[Map[String, String], ListBuffer[String], Map[String, String]]#ReadOnlyContext, collector: Collector[Map[String,String]]): Unit = {
//實時流id
val id:String = value.getOrElse("id","NIL")
//獲取連接狀態
val broadcastState: ReadOnlyBroadcastState[String, String] = readOnlyContext.getBroadcastState(filterRules)
if (broadcastState.contains(id)){
collector.collect(value)
}
}
override def processBroadcastElement(value: ListBuffer[String], context: BroadcastProcessFunction[Map[String, String], ListBuffer[String], Map[String, String]]#Context, collector: Collector[ Map[String, String]]): Unit = {
if (value.size == 0 || value == null){
return
}
val broadcastState: BroadcastState[String, String]= context.getBroadcastState(filterRules)
//清空狀態
broadcastState.clear()
//更新狀態
for (i <- 0 until value.size) {
broadcastState.put(value.apply(i), value.apply(i))
}
}
}
3.5 自定義sink
同理,在將數據存入pg數據庫的時候,由於flink沒有實現pg數據庫的sink,所以需要我們自己定義一個sink
class ESBPostgreSQLSink(postgresqlHost: String, postgresqlPort: Int, postgresqlDB: String, postgresqlUser: String, postgresqlPassword: String) extends RichSinkFunction[Map[String,String]] {
private var connection:Connection = null
private var psInsert: PreparedStatement = null
private var psUpdate: PreparedStatement = null
private var psInsertUpdate: PreparedStatement = null
override def open(parameters: Configuration): Unit = {
super.open(parameters)
Class.forName("org.postgresql.Driver")
connection = DriverManager.getConnection("jdbc:postgresql://" + postgresqlHost + ":" + postgresqlPort + "/" + postgresqlDB, postgresqlUser, postgresqlPassword)
val insert = "新增"
val update ="更新"
val insertUpdate = "存在則更新,不存在則新增"
psInsert = this.connection.prepareStatement(insert)
psUpdate = this.connection.prepareStatement(update)
psInsertUpdate = this.connection.prepareStatement(insertUpdate)
}
override def close(): Unit = {
super.close()
//關閉連接和釋放資源
if (connection != null) connection.close()
if (psInsert != null) psInsert.close()
if (psUpdate != null) psUpdate.close()
}
override def invoke(value: Map[String, String], context: SinkFunction.Context[_]): Unit = {
//此處就是具體插入數據庫的業務邏輯
}
}
}
}
注意事項
1 Flink每次做Checkpoint的時候,會Flush緩沖區的數據,以及將Pending(已經完成的文件,但為被Checkpoint記錄,可以通過sink.setPendingSuffix("xxx")來設置)結尾的文件記錄下來
2 Flink每60秒(可以通過sink.setInactiveBucketCheckInterval(60 * 1000)來進行設置)檢測,如果一個文件的FSDataOutputStream在60秒內(可以通過sink.setInactiveBucketThreshold(60 * 1000)來設置),都還沒有接收到數據,Flink就會認為該文件是不活躍的Bucket,那么就會被Flush后關閉該文件;
3 在Flink內部封裝了一個集合Map<String, BucketState<T>> bucketStates = new HashMap<>();用來記錄當前正在使用的文件,key是文件的路徑,BucketState內部封裝了該文件的所有信息,包括創建時間,最后一次寫入時間(這里的寫入指的是寫入緩存區的時間,不是Flush的時間)。當前文件是打開還是關閉,寫緩沖區的方法。都在這里。每次Flink要對文件進行操作的時候,都會從這里拿到文件的封裝對象;
4 在代碼里設置合理並行度是一個需要長久觀察和調試的過程!!可能也是我后面需要更加深入學習flink的地方!!並行度並不是設置的越高越好,太高會浪費資源,並且效率也不會提升太多,太少則會造成執行效率低下!主要還是要根據具體的業務邏輯取設置比較合理
總結
總體來說,相對於spark streaming,flink的代碼略顯復雜!畢竟也是處在一個上升的階段,各方面還是有待完善的地方!但是,對於業務有很高的實時性要求的,flink絕對是一個不錯的選擇,處理速度是非常快的!這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以發送郵件給我,我會盡我所能為您解答,與君共勉!
