Flink的Source----數據源
目錄
flink代碼分為三部分:
1、Source----數據源,讀取數據
2、Transformation----轉換,對數據進行處理,也就是算子
3、Sink----將數據發出去
Flink的Source分為是四大類
1、基於本地集合的 source---得出的是有界流
2、基於文件的 source---得出的是有界流
3、基於網絡套接字的 source---得出的是無界流
4、自定義的 source(較難,用得比較多)
自定義的 source 常見的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,當然你也可以定義自己的 source。
前面三類原理一樣,底層都是Java代碼實現的
1、基於本地集合的 source---有界流
基於本地集合創建DS----
env.fromCollection(List())
package com.shujia.flink.source
import org.apache.flink.streaming.api.scala._
object Demo1ListSource {
def main(args: Array[String]): Unit = {
//創建flink環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//基於本地集合創建DS---有界流
val listDS1: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8))
//打印
listDS1.print()
val listDS2: DataStream[String] = env.fromCollection(List("java,java,hadoop", "java,flink", "hadoop"))
listDS2
.flatMap(line => line.split(","))
.map((_,1))
.keyBy(_._1)
.sum(1)
.print()
//啟動程序(有界流的啟動程序,可寫可不寫)
env.execute()
}
}
執行結果
1> 2
1> 6
2> 3
2> 7
3> 4
3> 8
4> 1
4> 5
4> (hadoop,1)
4> (hadoop,2)
4> (flink,1)
1> (java,1)
1> (java,2)
1> (java,3)
Process finished with exit code 0
2、基於文件的 source----有界流
讀取文件創建DS----
env.readTextFile()
package com.shujia.flink.source
import org.apache.flink.streaming.api.scala._
object Demo2FileSource {
def main(args: Array[String]): Unit = {
//創建flink環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//讀取文件創建DS----有界流
val fileDS: DataStream[String] = env.readTextFile("data/students.txt")
//統計班級人數
fileDS
.map(stu=>{
val clazz: String = stu.split(",")(4)
(clazz,1)
})
.keyBy(_._1)
.sum(1)
.print()
env.execute()
}
}
執行結果
2> (文科六班,1)
2> (文科六班,2)
2> (文科六班,3)
4> (理科六班,1)
1> (文科二班,1)
1> (文科二班,2)
2> (文科一班,1)
...
...
3> (理科四班,88)
3> (理科五班,69)
3> (理科三班,67)
3> (理科一班,77)
3> (理科四班,89)
3> (理科三班,68)
3> (理科一班,78)
3> (理科四班,90)
3> (理科五班,70)
3> (理科四班,91)
Process finished with exit code 0
3、基於網絡套接字的 source---無界流
讀取socket構建DS----
env.socketTextStream("master", 8888)
package com.shujia.flink.source
import org.apache.flink.streaming.api.scala._
object Demo3SocketSource {
def main(args: Array[String]): Unit = {
//創建flink環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//讀取socket構建DS----無界流
val socketDS: DataStream[String] = env.socketTextStream("master", 8888)
socketDS.print()
//在虛擬機中輸入 nc -lk 8888
//啟動flink程序
env.execute()
}
}
4、自定義的source
讀取自定義的source創建DS----
env.addSource()
參數需要傳入一個SourceFunction
,但SourceFunction
是一個接口,所以需要傳入這個接口的子類,因此需要創建一個子類來實現這個接口。
子類中需要重寫這個接口的兩個方法
run()
、cancle()
- 自定義一個Source----有界流
package com.shujia.flink.source
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
object Demo4SourceFunction {
def main(args: Array[String]): Unit = {
//創建flink環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//讀取自定義的source創建DS(參數傳入SourceFunction的子類對象)
val myDS: DataStream[Int] = env.addSource(new MySource)
myDS.print()
env.execute()
}
}
//創建一個類實現接口
class MySource extends SourceFunction[Int]{ //手動指定讀取數據的類型
/**
* run方法:用於生成數據的方法,只執行一次
* sourceContext:用於將數據發送到下游的對象
*/
override def run(sourceContext: SourceFunction.SourceContext[Int]): Unit = {
sourceContext.collect(6666)
}
//任務被取消的時候執行,一般用於回收資源(本次暫且用不到)
override def cancel(): Unit = {
}
}
執行結果
3> 6666
Process finished with exit code 0
- 自定義一個Source----無界流
在子類實現的run()方法中加入一個循環即可
package com.shujia.flink.source
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
object Demo4SourceFunction {
def main(args: Array[String]): Unit = {
//創建flink環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//讀取自定義的source創建DS
val myDS: DataStream[Int] = env.addSource(new MySource)
myDS.print()
env.execute()
}
}
//創建一個類實現接口
class MySource extends SourceFunction[Int]{ //手動指定讀取數據的類型
/**
* run方法:用於生成數據的方法,只執行一次
* sourceContext:用於將數據發送到下游的對象
*/
override def run(sourceContext: SourceFunction.SourceContext[Int]): Unit = {
var i = 0
while (true){
i += 1
sourceContext.collect(i)//將數據發送到下游
Thread.sleep(100) //設置一個100ms的循環時間,讓其打印的慢一點
}
}
override def cancel(): Unit = {
}
}
- 自定義Source----讀取Mysql----有界流
package com.shujia.flink.source
import com.mysql.jdbc.Driver
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
object Demo5MysqlSource {
def main(args: Array[String]): Unit = {
//創建flink環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//讀取自定義的source創建DS
val mysqlDS: DataStream[(String, String, Int, String, String)] = env.addSource(new MysqlSource)
mysqlDS.print()
//啟動flink程序
env.execute()
}
}
//創建一個類來實現SourceFunction接口,需要指定泛型
//因為本次是讀取學生數據,類型有5個字段
class MysqlSource extends SourceFunction[(String,String,Int,String,String)]{
override def run(sourceContext: SourceFunction.SourceContext[(String,String,Int,String,String)]): Unit = {
//使用JDBC讀取Mysql
Class.forName("com.mysql.jdbc.Driver")
//建立連接
val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata", "root", "123456")
//查詢數據
val sta: PreparedStatement = con.prepareStatement("select * from student")
//執行查詢
val resultSet: ResultSet = sta.executeQuery()
//循環解析數據
while (resultSet.next()){
val id: String = resultSet.getString("id")
val name: String = resultSet.getString("name")
val age: Int = resultSet.getInt("age")
val gender: String = resultSet.getString("gender")
val clazz: String = resultSet.getString("clazz")
//將數據發送到下游
sourceContext.collect((id,name,age,gender,clazz))
}
//關閉連接
con.close()
}
override def cancel(): Unit = {
}
}
自定義的source的其他接口
在自定義的Source除了有SourceFunction接口,還有
RichSourceFunction接口、
ParallelSourceFunction接口、
RichParallelSourceFunction接口
SourceFunction是自定義source中最基礎的,是一個單線程的source
RichSourceFunction比SourceFunction多了open()和close()方法,也是一個單線程的source
ParallelSourceFunction接口,多並行的source
RichParallelSourceFunction接口,多並行的source