從HDFS上讀取文件
//在算子轉換的時候,會將數據轉換成Flink內置的數據類型,所以需要將隱式轉換導入進來,才能自動進行類型轉換 import org.apache.flink.streaming.api.scala._ val env = StreamExecutionEnvironment.getExecutionEnvironment // readTextFile 只讀一次 val textStream = env.readTextFile("hdfs://ke01:8020/xiaoke002/two") textStream.print() env.execute() 結果: 6> 1,小明 8> 2,小紅 11> 3,小柯 2> 4,wang readTextFile底層代碼:調用readFile,傳入了 FileProcessingMode.PROCESS_ONCE(只允許一次) so: 需要持續讀取數據需要自己寫readFile()方法, 使用FileProcessingMode.PROCESS_CONTINUOUSLY持續讀入 val env = StreamExecutionEnvironment.getExecutionEnvironment val filePath = "hdfs://ke01:8020/xiaoke002/two" val textInputFormat = new TextInputFormat(new Path(filePath)) // 每隔10s中讀取 hdfs上新增文件內容 val textStream = env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 10) textStream.print() env.execute() 結果:持續運行,每次有新數據進來,都會全部讀一遍 12> 1,小明 5> 3,小柯 8> 4,wang 2> 2,小紅 hdfs dfs -appendToFile test /xiaoke002/two 3> 3,小柯 6> aa bb 7> cc dd 5> 4,wang 1> 2,小紅 12> 1,小明
從Kafka中讀取文件
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.9.2</version> </dependency>
flink-kafka生產文件
package com.text import java.util.Properties import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.{StringSerializer} import scala.io._ object FlinkKafkaProduct { def main(args: Array[String]): Unit = { val prop = new Properties() prop.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092") prop.setProperty("key.serializer", classOf[StringSerializer].getName) prop.setProperty("value.serializer", classOf[StringSerializer].getName) // 創建一個kafaka生產者對象 var producer = new KafkaProducer[String, String](prop) val lines = Source.fromFile("D:\\code\\scala\\test\\test07\\data\\carFlow_all_column_test.txt").getLines() for (i <- 1 to 100) { for (elem <- lines) { val splits = elem.split(",") val monitorId = splits(0).replace("'", "") val carId = splits(2).replace("'", "") val timestamp = splits(4).replace("'", "") val speed = splits(6) val stringBuilder = new StringBuilder val info = stringBuilder.append(monitorId + "\t").append(carId + "\t").append(timestamp + "\t").append(speed)
// kafka是kv形式的,默認生產的時候為null, topic: flink-kafka key:i+"" value:info producer.send(new ProducerRecord[String, String]("flink-kafka", i+"", info.toString())) Thread.sleep(500) } } } }
結果
--ke02,ke03,ke04開啟fakfa: kafka-server-start.sh -daemon ./server.properties --啟動消費者 kafka-console-consumer.sh --new-consumer --bootstrap-server ke02:9092 --topic flink-kafka --property print.key=true
flink-kafka消費信息(key和value)
package com.text import java.util.Properties import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.flink.streaming.api.scala._ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringSerializer object FlinkKafkaConsumerKV { def main(args: Array[String]): Unit = { // flink_kafka消費帶key和value val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092") properties.setProperty("group.id", "flink-kafka-001") properties.setProperty("key.deserializer", classOf[StringSerializer].getName) properties.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] { // 停止條件是什么,false代表不停止 override def isEndOfStream(nextElement: (String, String)): Boolean = false // 要進行序列化的字節流 override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(record.key(), "UTF-8") val value = new String(record.value(), "UTF-8") (key, value) } // 指定一下返回的數據類型, Flink提供的類型 override def getProducedType: TypeInformation[(String, String)] = { createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) } }, properties)) stream.print() env.execute() } }
flink-kafka消費信息(value)
package com.text import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.kafka.common.serialization.StringSerializer import org.apache.flink.streaming.api.scala._
object FlinkKafkaConsumerV { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092") properties.setProperty("group.id", "flink-kafka-001") properties.setProperty("key.deserializer", classOf[StringSerializer].getName) properties.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), properties)) stream.print() env.execute() } }
自定義讀取數據源
package com.text import java.util.Random import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ object CustomSource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment var flag = true val stream = env.addSource(new SourceFunction[String] { override def run(ctx: SourceFunction.SourceContext[String]): Unit = { // run 讀取任何地方數據(redis), 然后將數據發射出去 val random = new Random() while (flag) { ctx.collect("send" + random.nextInt(100)) Thread.sleep(500) } } override def cancel(): Unit = { flag = false } }) stream.print() env.execute() } }
結果:
11> send16
12> send34
1> send9
2> send69
3> send59
自定義讀取數據源多並行度
package com.text import java.util.Random import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ object CustomSourceParallelism { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment var flag = true val stream = env.addSource(new ParallelSourceFunction[String] { override def run(ctx: SourceFunction.SourceContext[String]): Unit = { // run 讀取任何地方數據(redis), 然后將數據發射出去 val random = new Random() while (flag) { ctx.collect("send" + random.nextInt(100)) Thread.sleep(500) } } override def cancel(): Unit = { flag = false } }).setParallelism(3) stream.print() env.execute() } }
結果:
2> send89
12> send89
12> send15
1> send93
1> send55
問: 並行度是3, 線程id為什么這么多? 因為stream.print()沒有設置並行度,默認是當前機器核數
設置:stream.print().setParallelism(3)
結果:
3> send27
2> send60
1> send7
3> send24
2> send1
1> send54
2> send2
3> send59
1> send51
3> send49