Flink-讀取文件的方式(三)


從HDFS上讀取文件

//在算子轉換的時候,會將數據轉換成Flink內置的數據類型,所以需要將隱式轉換導入進來,才能自動進行類型轉換
import org.apache.flink.streaming.api.scala._

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // readTextFile 只讀一次
    val textStream = env.readTextFile("hdfs://ke01:8020/xiaoke002/two")
    textStream.print()
    env.execute()

結果:
6> 1,小明
8> 2,小紅
11> 3,小柯
2> 4,wang

readTextFile底層代碼:調用readFile,傳入了 FileProcessingMode.PROCESS_ONCE(只允許一次)

so: 需要持續讀取數據需要自己寫readFile()方法, 使用FileProcessingMode.PROCESS_CONTINUOUSLY持續讀入
 
val env = StreamExecutionEnvironment.getExecutionEnvironment
val filePath = "hdfs://ke01:8020/xiaoke002/two"
val textInputFormat = new TextInputFormat(new Path(filePath))
// 每隔10s中讀取 hdfs上新增文件內容
val textStream = env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 10)
textStream.print()
env.execute()

結果:持續運行,每次有新數據進來,都會全部讀一遍
12> 1,小明
5> 3,小柯
8> 4,wang
2> 2,小紅


hdfs dfs -appendToFile test /xiaoke002/two 

3> 3,小柯
6> aa bb
7> cc dd
5> 4,wang
1> 2,小紅
12> 1,小明

 

 

從Kafka中讀取文件

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.9.2</version>
</dependency>

 

flink-kafka生產文件

package com.text
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{StringSerializer}
import scala.io._
object FlinkKafkaProduct {
  def main(args: Array[String]): Unit = {
    val prop = new Properties()
    prop.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092")
    prop.setProperty("key.serializer", classOf[StringSerializer].getName)
    prop.setProperty("value.serializer", classOf[StringSerializer].getName)

    // 創建一個kafaka生產者對象
    var producer = new KafkaProducer[String, String](prop)

    val lines = Source.fromFile("D:\\code\\scala\\test\\test07\\data\\carFlow_all_column_test.txt").getLines()

    for (i <- 1 to 100) {
      for (elem <- lines) {
        val splits = elem.split(",")
        val monitorId = splits(0).replace("'", "")
        val carId = splits(2).replace("'", "")
        val timestamp = splits(4).replace("'", "")
        val speed = splits(6)
        val stringBuilder = new StringBuilder
        val info = stringBuilder.append(monitorId + "\t").append(carId + "\t").append(timestamp + "\t").append(speed)
      // kafka是kv形式的,默認生產的時候為null, topic: flink-kafka key:i+"" value:info producer.send(
new ProducerRecord[String, String]("flink-kafka", i+"", info.toString())) Thread.sleep(500) } } } }

 

 

結果

--ke02,ke03,ke04開啟fakfa:
kafka-server-start.sh -daemon ./server.properties

--啟動消費者
kafka-console-consumer.sh  --new-consumer  --bootstrap-server  ke02:9092  --topic  flink-kafka --property print.key=true

 

 

 

 

 

flink-kafka消費信息(key和value)

package com.text
import java.util.Properties
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.flink.streaming.api.scala._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringSerializer
object FlinkKafkaConsumerKV {
  def main(args: Array[String]): Unit = {
    // flink_kafka消費帶key和value
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092")
    properties.setProperty("group.id", "flink-kafka-001")
    properties.setProperty("key.deserializer", classOf[StringSerializer].getName)
    properties.setProperty("value.deserializer", classOf[StringSerializer].getName)
    val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
      // 停止條件是什么,false代表不停止
      override def isEndOfStream(nextElement: (String, String)): Boolean = false
      // 要進行序列化的字節流
      override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
        val key = new String(record.key(), "UTF-8")
        val value = new String(record.value(), "UTF-8")
        (key, value)
      }
      // 指定一下返回的數據類型, Flink提供的類型
      override def getProducedType: TypeInformation[(String, String)] = {
        createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
      }
    }, properties))
    stream.print()
    env.execute()
  }
}

 

 

 

flink-kafka消費信息(value)

package com.text
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.flink.streaming.api.scala._
object FlinkKafkaConsumerV { def main(args: Array[String]): Unit
= { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092") properties.setProperty("group.id", "flink-kafka-001") properties.setProperty("key.deserializer", classOf[StringSerializer].getName) properties.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), properties)) stream.print() env.execute() } }

 

 

 

 

自定義讀取數據源

package com.text
import java.util.Random
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object CustomSource {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    var flag = true
    val stream = env.addSource(new SourceFunction[String] {
      override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
        // run 讀取任何地方數據(redis),  然后將數據發射出去
        val random = new Random()
        while (flag) {
          ctx.collect("send" + random.nextInt(100))
          Thread.sleep(500)
        }
      }
      override def cancel(): Unit = {
        flag = false
      }
    })
    stream.print()
    env.execute()
  }
}


結果:

11> send16
12> send34
1> send9
2> send69
3> send59

 

 

 

自定義讀取數據源多並行度

package com.text
import java.util.Random
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object CustomSourceParallelism {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    var flag = true
    val stream = env.addSource(new ParallelSourceFunction[String] {
      override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
        // run 讀取任何地方數據(redis),  然后將數據發射出去
        val random = new Random()
        while (flag) {
          ctx.collect("send" + random.nextInt(100))
          Thread.sleep(500)
        }
      }
      override def cancel(): Unit = {
        flag = false
      }
    }).setParallelism(3)
    stream.print()
    env.execute()
  }
}

結果:

2> send89
12> send89
12> send15
1> send93
1> send55



問: 並行度是3, 線程id為什么這么多? 因為stream.print()沒有設置並行度,默認是當前機器核數
設置:stream.print().setParallelism(3)
結果:

3> send27
2> send60
1> send7
3> send24
2> send1
1> send54
2> send2
3> send59
1> send51
3> send49

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM