Flink的Source----數據源


Flink的Source----數據源

flink代碼分為三部分:

1、Source----數據源,讀取數據

2、Transformation----轉換,對數據進行處理,也就是算子

3、Sink----將數據發出去

Flink的Source分為是四大類

1、基於本地集合的 source---得出的是有界流
2、基於文件的 source---得出的是有界流
3、基於網絡套接字的 source---得出的是無界流
4、自定義的 source(較難,用得比較多)

自定義的 source 常見的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,當然你也可以定義自己的 source。

前面三類原理一樣,底層都是Java代碼實現的
1、基於本地集合的 source---有界流

基於本地集合創建DS----env.fromCollection(List())

package com.shujia.flink.source

import org.apache.flink.streaming.api.scala._

object Demo1ListSource {
  def main(args: Array[String]): Unit = {
    //創建flink環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //基於本地集合創建DS---有界流
    val listDS1: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8))
    //打印
    listDS1.print()

    val listDS2: DataStream[String] = env.fromCollection(List("java,java,hadoop", "java,flink", "hadoop"))
    listDS2
      .flatMap(line => line.split(","))
      .map((_,1))
      .keyBy(_._1)
      .sum(1)
      .print()


    //啟動程序(有界流的啟動程序,可寫可不寫)
    env.execute()

  }
}

執行結果

1> 2
1> 6
2> 3
2> 7
3> 4
3> 8
4> 1
4> 5
4> (hadoop,1)
4> (hadoop,2)
4> (flink,1)
1> (java,1)
1> (java,2)
1> (java,3)

Process finished with exit code 0
2、基於文件的 source----有界流

讀取文件創建DS----env.readTextFile()

package com.shujia.flink.source

import org.apache.flink.streaming.api.scala._

object Demo2FileSource {
  def main(args: Array[String]): Unit = {
    //創建flink環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //讀取文件創建DS----有界流
    val fileDS: DataStream[String] = env.readTextFile("data/students.txt")
    //統計班級人數
    fileDS
      .map(stu=>{
      val clazz: String = stu.split(",")(4)
      (clazz,1)
    })
      .keyBy(_._1)
      .sum(1)
      .print()

    env.execute()
  }
}

執行結果

2> (文科六班,1)
2> (文科六班,2)
2> (文科六班,3)
4> (理科六班,1)
1> (文科二班,1)
1> (文科二班,2)
2> (文科一班,1)
...
...
3> (理科四班,88)
3> (理科五班,69)
3> (理科三班,67)
3> (理科一班,77)
3> (理科四班,89)
3> (理科三班,68)
3> (理科一班,78)
3> (理科四班,90)
3> (理科五班,70)
3> (理科四班,91)

Process finished with exit code 0
3、基於網絡套接字的 source---無界流

讀取socket構建DS----env.socketTextStream("master", 8888)

package com.shujia.flink.source

import org.apache.flink.streaming.api.scala._

object Demo3SocketSource {
  def main(args: Array[String]): Unit = {
    //創建flink環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //讀取socket構建DS----無界流
    val socketDS: DataStream[String] = env.socketTextStream("master", 8888)

    socketDS.print()

    //在虛擬機中輸入  nc -lk 8888

    //啟動flink程序
    env.execute()
  }
}

image

4、自定義的source

讀取自定義的source創建DS----env.addSource()
參數需要傳入一個SourceFunction,但SourceFunction是一個接口,

所以需要傳入這個接口的子類,因此需要創建一個子類來實現這個接口。

子類中需要重寫這個接口的兩個方法run()cancle()

  • 自定義一個Source----有界流
package com.shujia.flink.source

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

object Demo4SourceFunction {
  def main(args: Array[String]): Unit = {
    //創建flink環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //讀取自定義的source創建DS(參數傳入SourceFunction的子類對象)
    val myDS: DataStream[Int] = env.addSource(new MySource)
    myDS.print()

    env.execute()
  }
}

//創建一個類實現接口
class MySource extends SourceFunction[Int]{ //手動指定讀取數據的類型

  /**
   * run方法:用於生成數據的方法,只執行一次
   * sourceContext:用於將數據發送到下游的對象
   */
  override def run(sourceContext: SourceFunction.SourceContext[Int]): Unit = {
    sourceContext.collect(6666)
  }
  //任務被取消的時候執行,一般用於回收資源(本次暫且用不到)
  override def cancel(): Unit = {
  }
}

執行結果

3> 6666

Process finished with exit code 0
  • 自定義一個Source----無界流

在子類實現的run()方法中加入一個循環即可

package com.shujia.flink.source

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

object Demo4SourceFunction {
  def main(args: Array[String]): Unit = {
    //創建flink環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //讀取自定義的source創建DS
    val myDS: DataStream[Int] = env.addSource(new MySource)
    myDS.print()

    env.execute()
  }
}

//創建一個類實現接口
class MySource extends SourceFunction[Int]{ //手動指定讀取數據的類型

  /**
   * run方法:用於生成數據的方法,只執行一次
   * sourceContext:用於將數據發送到下游的對象
   */
  override def run(sourceContext: SourceFunction.SourceContext[Int]): Unit = {
    var i = 0
    while (true){
      i += 1
      sourceContext.collect(i)//將數據發送到下游
      Thread.sleep(100) //設置一個100ms的循環時間,讓其打印的慢一點
    }
  }

  override def cancel(): Unit = {
  }
}
  • 自定義Source----讀取Mysql----有界流
package com.shujia.flink.source

import com.mysql.jdbc.Driver
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

object Demo5MysqlSource {
  def main(args: Array[String]): Unit = {
    //創建flink環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //讀取自定義的source創建DS
    val mysqlDS: DataStream[(String, String, Int, String, String)] = env.addSource(new MysqlSource)
    mysqlDS.print()

    //啟動flink程序
    env.execute()

  }
}

//創建一個類來實現SourceFunction接口,需要指定泛型
//因為本次是讀取學生數據,類型有5個字段
class MysqlSource extends SourceFunction[(String,String,Int,String,String)]{

  override def run(sourceContext: SourceFunction.SourceContext[(String,String,Int,String,String)]): Unit = {
    //使用JDBC讀取Mysql
    Class.forName("com.mysql.jdbc.Driver")
    //建立連接
    val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata", "root", "123456")
    //查詢數據
    val sta: PreparedStatement = con.prepareStatement("select * from student")
    //執行查詢
    val resultSet: ResultSet = sta.executeQuery()
    //循環解析數據
    while (resultSet.next()){
      val id: String = resultSet.getString("id")
      val name: String = resultSet.getString("name")
      val age: Int = resultSet.getInt("age")
      val gender: String = resultSet.getString("gender")
      val clazz: String = resultSet.getString("clazz")

      //將數據發送到下游
      sourceContext.collect((id,name,age,gender,clazz))
    }
    //關閉連接
    con.close()
  }

  override def cancel(): Unit = {
  }
    
}

自定義的source的其他接口

在自定義的Source除了有SourceFunction接口,還有
RichSourceFunction接口、
ParallelSourceFunction接口、
RichParallelSourceFunction接口

SourceFunction是自定義source中最基礎的,是一個單線程的source
RichSourceFunction比SourceFunction多了open()和close()方法,也是一個單線程的source

ParallelSourceFunction接口,多並行的source
RichParallelSourceFunction接口,多並行的source


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM