1 前言
公司有一个项目整体的架构是要消费kafka数据并存入数据库,以前选用的工具是spark streaming,最近flink已经变得比较流行了,所以也尝试一下flink消费数据与spark streaming的区别。首先来简单了解一下flink,它具有了流计算和批处理功能。它可以处理有界数据和无界数据,也就是可以处理永远生产的数据。具体的细节我们不讨论,我们直接搭建一个flink功能。总体的思路是source -> transform -> sink,即从source获取相应的数据来源,然后进行数据转换,将数据从比较乱的格式,转换成我们需要的格式,转换处理后,然后进行sink功能,也就是将数据写入到相应的db里边或文件中用于存储和展现。
2 环境准备
JDK 1.8
Scala 2.11.8
Flink 1.10.1
3 代码
3.1 添加依赖
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.10.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.12</artifactId> <version>1.10.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.10.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.10.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.11.8</version> </dependency> </dependencies>
3.2 程序入口
/** * flink 消费kafka数据 并将消费后的数据存入 PostgreSQL */ object FlinkTest { def main(args: Array[String]): Unit = { //创建表描述器 val filterRules: MapStateDescriptor[String, String] = new MapStateDescriptor("filter_rule", BasicTypeInfo.STRING_TYPE_INFO, Types.STRING) // 读取命令行参数 val fromArgs = ParameterTool.fromArgs(args) // 读取配置文件 val parameterTool = ParameterTool.fromPropertiesFile(fromArgs.get("properties")) //checkpoint配置 val checkpointDirectory = parameterTool.getRequired("checkpointDirectory.test") val checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval") // kafka设置 val kafkaBootStrapServers = parameterTool.getRequired("kafka.bootstrap.servers.test") val kafkaGroupID = parameterTool.getRequired("kafka.group.id.test") val kafkaSourceTopic = parameterTool.getRequired("kafka.topic.test") // Postgresql 配置 val postgresqlHost = parameterTool.getRequired("postgresql.host") val postgresqlPort = parameterTool.getInt("postgresql.port") val postgresqlDB = parameterTool.getRequired("postgresql.db") val postgresqlUser = parameterTool.getRequired("postgresql.user") val postgresqlPassword = parameterTool.getRequired("postgresql.password") val postgresqlInterval = parameterTool.getInt("postgre.secondInterval") val env = StreamExecutionEnvironment.getExecutionEnvironment // 开启Checkpoint ,每10000毫秒进行依次 checkoint //env.enableCheckpointing(10000); env.enableCheckpointing(60000) // Checkpoint 语义设置为 Exactly_ONCE env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // Checkpoint 的超时时间 env.getCheckpointConfig.setCheckpointTimeout(120000) // 同时只允许一个Checkpoint在发生 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) // 两次Checkpoint之间的最小时间间隔为 checkpointSecondInterval * 1000 ms env.getCheckpointConfig.setMinPauseBetweenCheckpoints(checkpointSecondInterval * 1000) // 当FLink任务取消时保留外部保存的Checkpoint信息 env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // 作业最多允许Checkpoint失败10次 env.getCheckpointConfig.setTolerableCheckpointFailureNumber(10) // 设置状态后端 val backend = new RocksDBStateBackend(checkpointDirectory, true) env.setStateBackend(backend.asInstanceOf[StateBackend]) /* kafka配置 */ val kafkaSourceProps = new Properties() kafkaSourceProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapServers) kafkaSourceProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupID) //增加kafkasource 数据源 kafka的消费者 val kafkaSource = env.addSource(new FlinkKafkaConsumer(kafkaSourceTopic, new SimpleStringSchema, kafkaSourceProps)) .uid("kafkaSource").name("kafkaSource").setParallelism(3) import scala.collection.mutable.Map //提取数据,并解析 val alarmEventStream = kafkaSource.process(new ProcessFunction[String, Map[String,String]] { override def processElement(value: String, ctx: ProcessFunction[String, Map[String, String]]#Context, out: Collector[Map[String, String]]): Unit = { if(value.contains("业务所需信息")){ val arrs = value.split("\n") var map:Map[String,String] = Map() var key = "" for(a <- arrs){ key = a.split(":")(0) map.put(key,a.split(key+":")(1)) } out.collect(map) } } }).uid("test").name("test").setParallelism(2) // 自定义PostgreSQL Source,周期性地从PostgreSQL中获取过滤数据,并广播出去 val pgsource = new PostgreSQLSource(postgresqlHost, postgresqlPort, postgresqlDB, postgresqlUser, postgresqlPassword, postgresqlInterval) val configStream: SingleOutputStreamOperator[ListBuffer[String]] = env.addSource(pgsource) .uid("DataListFromPostgreSQL") .name("DataListFromPostgreSQL").setParallelism(3) // 将过滤流广播,形成BroadcastStream val filterRulesStream: BroadcastStream[ListBuffer[String]] = configStream.broadcast(filterRules) //实时流和过滤流连接 val connectedStream: BroadcastConnectedStream[Map[String, String], ListBuffer[String]] =alarmEventStream.connect(filterRulesStream) val filter = new FilterSpecificProblemIDProcessFunction() val filterStream: SingleOutputStreamOperator[Map[String,String]] = connectedStream.process(filter) // 将过滤后的数据写入 PostgreSQL filterStream.addSink( new ESBPostgreSQLSink (postgresqlHost, postgresqlPort, postgresqlDB, postgresqlUser, postgresqlPassword)) .uid("PostgresSQLSink") .name("PostgresSQLSink") .setParallelism(1) //执行任务 env.execute("test") } }
3.3 自定义source
在提取pg数据库过滤数据的时候,由于flink没有实现pg数据库的source,所以需要我们自己定义一个source
class PostgreSQLSource(host:String,port:Int,db:String,username:String,pass:String,secondInterval:Int) extends RichSourceFunction[ListBuffer[String]]{ private var connection: Connection = null private var ps: PreparedStatement = null //表示数据源是否运行正常 var running: Boolean = true override def open(parameters: Configuration): Unit = { super.open(parameters) Class.forName("org.postgresql.Driver") connection = DriverManager.getConnection("jdbc:postgresql://" + host + ":" + port + "/" + db, username, pass) val sql = "select testData from testTable" ps = connection.prepareStatement(sql) } override def close(): Unit = { if (connection!=null){ connection.close() } if (ps !=null){ ps.close() } } override def cancel(): Unit = { running = false } override def run(ctx: SourceFunction.SourceContext[ListBuffer[String]]): Unit = { try { while (running) { var listBuffer: ListBuffer[String] = ListBuffer() var res: ResultSet = ps.executeQuery; while (res.next()) { val testData = res.getString("testData"); listBuffer += testData } ctx.collect(listBuffer) } }catch { case e : Exception=>{ println("出现异常") } } } }
3.4 自定义BroadcastProcessFunction
在处理实时流和过滤流连接后的连接流时,需要我们传入一个BroadcastProcessFunction,这里面我们自定义一个类继承BroadcastProcessFunction,然后再完成我们的业务逻辑
class FilterSpecificProblemIDProcessFunction extends BroadcastProcessFunction[Map[String,String],ListBuffer[String],Map[String,String]]{ /** * 定义MapStateDescriptor 状态描述符 */ val filterRules: MapStateDescriptor[String, String] = new MapStateDescriptor("filter_rule", BasicTypeInfo.STRING_TYPE_INFO, Types.STRING) override def processElement(value: Map[String, String], readOnlyContext: BroadcastProcessFunction[Map[String, String], ListBuffer[String], Map[String, String]]#ReadOnlyContext, collector: Collector[Map[String,String]]): Unit = { //实时流id val id:String = value.getOrElse("id","NIL") //获取连接状态 val broadcastState: ReadOnlyBroadcastState[String, String] = readOnlyContext.getBroadcastState(filterRules) if (broadcastState.contains(id)){ collector.collect(value) } } override def processBroadcastElement(value: ListBuffer[String], context: BroadcastProcessFunction[Map[String, String], ListBuffer[String], Map[String, String]]#Context, collector: Collector[ Map[String, String]]): Unit = { if (value.size == 0 || value == null){ return } val broadcastState: BroadcastState[String, String]= context.getBroadcastState(filterRules) //清空状态 broadcastState.clear() //更新状态 for (i <- 0 until value.size) { broadcastState.put(value.apply(i), value.apply(i)) } } }
3.5 自定义sink
同理,在将数据存入pg数据库的时候,由于flink没有实现pg数据库的sink,所以需要我们自己定义一个sink
class ESBPostgreSQLSink(postgresqlHost: String, postgresqlPort: Int, postgresqlDB: String, postgresqlUser: String, postgresqlPassword: String) extends RichSinkFunction[Map[String,String]] { private var connection:Connection = null private var psInsert: PreparedStatement = null private var psUpdate: PreparedStatement = null private var psInsertUpdate: PreparedStatement = null override def open(parameters: Configuration): Unit = { super.open(parameters) Class.forName("org.postgresql.Driver") connection = DriverManager.getConnection("jdbc:postgresql://" + postgresqlHost + ":" + postgresqlPort + "/" + postgresqlDB, postgresqlUser, postgresqlPassword) val insert = "新增" val update ="更新" val insertUpdate = "存在则更新,不存在则新增" psInsert = this.connection.prepareStatement(insert) psUpdate = this.connection.prepareStatement(update) psInsertUpdate = this.connection.prepareStatement(insertUpdate) } override def close(): Unit = { super.close() //关闭连接和释放资源 if (connection != null) connection.close() if (psInsert != null) psInsert.close() if (psUpdate != null) psUpdate.close() } override def invoke(value: Map[String, String], context: SinkFunction.Context[_]): Unit = { //此处就是具体插入数据库的业务逻辑 } } } }
注意事项
1 Flink每次做Checkpoint的时候,会Flush缓冲区的数据,以及将Pending(已经完成的文件,但为被Checkpoint记录,可以通过sink.setPendingSuffix("xxx")来设置)结尾的文件记录下来
2 Flink每60秒(可以通过sink.setInactiveBucketCheckInterval(60 * 1000)来进行设置)检测,如果一个文件的FSDataOutputStream在60秒内(可以通过sink.setInactiveBucketThreshold(60 * 1000)来设置),都还没有接收到数据,Flink就会认为该文件是不活跃的Bucket,那么就会被Flush后关闭该文件;
3 在Flink内部封装了一个集合Map<String, BucketState<T>> bucketStates = new HashMap<>();用来记录当前正在使用的文件,key是文件的路径,BucketState内部封装了该文件的所有信息,包括创建时间,最后一次写入时间(这里的写入指的是写入缓存区的时间,不是Flush的时间)。当前文件是打开还是关闭,写缓冲区的方法。都在这里。每次Flink要对文件进行操作的时候,都会从这里拿到文件的封装对象;
4 在代码里设置合理并行度是一个需要长久观察和调试的过程!!可能也是我后面需要更加深入学习flink的地方!!并行度并不是设置的越高越好,太高会浪费资源,并且效率也不会提升太多,太少则会造成执行效率低下!主要还是要根据具体的业务逻辑取设置比较合理
总结
总体来说,相对于spark streaming,flink的代码略显复杂!毕竟也是处在一个上升的阶段,各方面还是有待完善的地方!但是,对于业务有很高的实时性要求的,flink绝对是一个不错的选择,处理速度是非常快的!这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以发送邮件给我,我会尽我所能为您解答,与君共勉!