flink第一個應用


去年華為大佬就開始在用flink,今天剛有空就稍微跟着寫了個demo玩起來(就不用java了 spark和flink還是用scala玩)

 

package flink.test
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object StreamingWindowWordCount {
def main(args:Array[String]):Unit={
//get port param
val port:Int = try {
ParameterTool.fromArgs(args).getInt("port")
}catch{
case e:Exception=> {
System.err.println("no port")
}
9876(啟動linux的NC -l 9876端口進行監聽)
}

//獲取運行環境
val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
//解析參數
val text = env.socketTextStream("rhel071",port,'\n')
//解析數據,分組,窗口操作,聚合求sum
//注意:在這需要做一個隱式轉換,否則使用flatmap會報錯
import org.apache.flink.api.scala._
val windowCount = text.flatMap(line=>line.split("\\s"))
.map(word=>WordWithCount(word,1L))
.keyBy("word")
.timeWindow(Time.seconds(2),Time.seconds(1))
.reduce((a,b)=>WordWithCount(a.word,a.count + b.count))//key 一樣獲取相同的數據進行匯總(scala邏輯基本和spark沒什么兩樣,都是進行數據的算子操作,需要action算子才能觸發動作)
//.sum("count")

//使用一個單線程打印結果
windowCount.print().setParallelism(1)
env.execute("streaming word count")
}
case class WordWithCount(word:String,count:Long)
}





maven項目(這種東西不適合自己找jar包,本地測試需要的jar包量實在太多,特別項目牽扯上hadoop hbase的時候)
這里是小例子應用到的maven,記錄下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>finkDemo_20180918</groupId>
<artifactId>finkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.4.2</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.4.2</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.4.2</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.4.2</version>
<!--<scope>provided</scope>-->
</dependency>
</dependencies>
</project>



 

 

flink的離線操作

package flink.test

import org.apache.flink.api.scala.ExecutionEnvironment

object BatchWordCountScala {
def main(args:Array[String]):Unit = {
val inputPath:String = "D:\\flink\\batch\\file";
val outputPath:String = "D:\\flink\\data\\result";
val env:ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment;
val text = env.readTextFile(inputPath)
import org.apache.flink.api.scala._
val counts = text.flatMap(line => line.split(" ")).map(word => WordWithCount(word,1L)).groupBy(0).sum(1)
counts.writeAsCsv(outputPath,"\n"," ")
env.execute("batch word count")
}
case class WordWithCount(word:String,count:Long)
}

把目錄下的text文件word解析統計后存入result目錄

 

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM