Flink-- 數據輸出Data Sinks


flink在批處理中常見的sink

1.基於本地集合的sink(Collection-based-sink)

2.基於文件的sink(File-based-sink)

基於本地集合的sink(Collection-based-sink)

//1.定義環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.定義數據 stu(age,name,height)
val stu: DataSet[(Int, String, Double)] = env.fromElements(
  (19, "zhangsan", 178.8),
  (17, "lisi", 168.8),
  (18, "wangwu", 184.8),
  (21, "zhaoliu", 164.8)
)
//3.TODO sink到標准輸出
stu.print

//3.TODO sink到標准error輸出
stu.printToErr()

//4.TODO sink到本地Collection
print(stu.collect())
View Code

基於文件的sink(File-based-sink)

flink支持多種存儲設備上的文件,包括本地文件,hdfs文件等。

flink支持多種文件的存儲格式,包括text文件,CSV文件等。

Ø writeAsText():TextOuputFormat - 將元素作為字符串寫入行。字符串是通過調用每個元素的toString()方法獲得的。

1、將數據寫入本地文件
//0.主意:不論是本地還是hdfs.若Parallelism>1將把path當成目錄名稱,若Parallelism=1將把path當成文件名。
val env = ExecutionEnvironment.getExecutionEnvironment
val ds1: DataSource[Map[Int, String]] = env.fromElements(Map(1 -> "spark" , 2 -> "flink"))
//1.TODO 寫入到本地,文本文檔,NO_OVERWRITE模式下如果文件已經存在,則報錯,OVERWRITE模式下如果文件已經存在,則覆蓋
ds1.setParallelism(1).writeAsText("test/data1/aa", WriteMode.OVERWRITE)
env.execute()
View Code
2、將數據寫入HDFS
//TODO writeAsText將數據寫入HDFS
val env = ExecutionEnvironment.getExecutionEnvironment
val ds1: DataSource[Map[Int, String]] = env.fromElements(Map(1 -> "spark" , 2 -> "flink"))
ds1.setParallelism(1).writeAsText("hdfs://hadoop01:9000/a", WriteMode.OVERWRITE)
env.execute()
View Code

可以使用sortPartition對數據進行排序后再sink到外部系統。

//TODO 使用sortPartition對數據進行排序后再sink到外部系統
val env = ExecutionEnvironment.getExecutionEnvironment
//stu(age,name,height)
val stu: DataSet[(Int, String, Double)] = env.fromElements(
  (19, "zhangsan", 178.8),
  (17, "lisi", 168.8),
  (18, "wangwu", 184.8),
  (21, "zhaoliu", 164.8)
)
//1.以age從小到大升序排列(0->9)
stu.sortPartition(0, Order.ASCENDING).print
//2.以name從大到小降序排列(z->a)
stu.sortPartition(1, Order.ASCENDING).print
//3.以age升序,height降序排列
stu.sortPartition(0, Order.ASCENDING).sortPartition(2, Order.DESCENDING).print
//4.所有字段升序排列
stu.sortPartition("_", Order.ASCENDING).print
//5.以Student.name升序
//5.1准備數據
case class Student(name: String, age: Int)
val ds1: DataSet[(Student, Double)] = env.fromElements(
  (Student("zhangsan", 18), 178.5),
  (Student("lisi", 19), 176.5),
  (Student("wangwu", 17), 168.5)
)
val ds2 = ds1.sortPartition("_1.age", Order.ASCENDING).setParallelism(1)
//5.2寫入到hdfs,文本文檔
val outPath1="hdfs://hadoop01:9000/Student001.txt"
ds2.writeAsText(outPath1, WriteMode.OVERWRITE)
env.execute()
//5.3寫入到hdfs,CSV文檔
val outPath2="hdfs://hadoop01:9000/Student002.csv"
ds2.writeAsCsv(outPath2, "\n", "|||",WriteMode.OVERWRITE)
env.execute()
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM