flink程序開發流程:
1、 set up the batch execution environment
2、get date
3、develop business logic
4、executor program
用flink實現githab上面的example :
用批處理實現wordcount(java版本):
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class BatchWCjavaapp { public static void main(String[] args){ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<String> text = env.readTextFile("./data/hello.txt"); try { text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] tokens = value.toLowerCase().split(" "); for(String str: tokens){ if(str.length()>0){ collector.collect(new Tuple2<String, Integer>(str,1)); } } } }).groupBy(0).sum(1).print(); } catch (Exception e) { e.printStackTrace(); } } }
用scala實現批處理wordcount:
import org.apache.flink.api.scala.ExecutionEnvironment object flinktest { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val text = env.readTextFile("./data/hello.txt") import org.apache.flink.api.scala._ text.flatMap(_.toLowerCase.split(" ").filter(_.nonEmpty).map((_,1))).groupBy(0).sum(1).print() } }
用流處理實現單詞統計:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class StreamingWCJavaApp { public static void main(String[] args){ //step1:獲取執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //step2:讀取數據 DataStreamSource<String> text = env.socketTextStream("mynode5", 9999); //step3:transform text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() { public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = s.split(" "); for(String word:words){ if(word.length()>0){ collector.collect(new Tuple2<String,Integer>(word,1)); } } } }).keyBy(0).timeWindow(Time.seconds(2)).sum(1).print().setParallelism(1); try { env.execute("StreamingWCJavaApp"); } catch (Exception e) { e.printStackTrace(); } } }
anatomy of a flink program
1、Obtain an executor environment
2、Load/create the initial data
3、Specify transformation on this data.
4、specify where to put the results of your computation
分組的key可以作為一個對象的字段,但是這個對象需要實現pojo,即有一個無參構造函數,實現get\set方法,能夠被序列化
5、Trigger the program execution (觸發程序的執行)
flinlk延遲執行,可以在中間執行很多的內部優化,提高代碼的整體性能。所謂的延遲執行就是說等所有的代碼塊加載完成之后調用執行方法,才會執行 。
flink關於批處理DataSet的學習:
DataSet 算子學習:
map/partitionmap/flatmap之間的區別:
map是針對於數據集中的每一個元素進行處理
partitionmap:是按照分區,針對一批數據進行處理,當數據量大的時候提高數據處理效率
flatmap:輸入一個元素輸出多個元素
first算子: 取集合中的前幾個元素
distinct算子:對數據集中的元素進行去重
join算子:求兩個數據集之間的交集(內連接)
//import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import scala.collection.mutable.ListBuffer object flinkjoin { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment joinFunction(env) } def joinFunction(env:ExecutionEnvironment) ={ val info1 = ListBuffer[(Int,String)]() info1.append((1,"小劉")) info1.append((2,"送悟空")) info1.append((3,"豬八戒")) info1.append((4,"社和尚")) val info2 = ListBuffer[(Int,String)]() info2.append((1,"北京")) info2.append((2,"傷害")) info2.append((3,"橫軸")) info2.append((5,"蘭州")) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.join(data2).where(0).equalTo(0).apply((left,right)=>{ (left._1, left._2, right._2) } ).print() } }
outjoin算子(外連接):一側的數據會被全部輸出 ,對於無法匹配的需要設置默認值
cross算子(笛卡爾積):
flink計數器:
import org.apache.flink.api.common.accumulators.LongCounter import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.FileSystem.WriteMode //import org.apache.flink.api.scala.ExecutionEnvironment object flinkCounter { def main(args: Array[String]): Unit = { import org.apache.flink.api.scala._ val env = ExecutionEnvironment.getExecutionEnvironment val data = env.fromCollection(List("hadoop","hive","spark","java","hive","hbase")) //該方法只能統計單線程的數據 /* data.map(new RichMapFunction[String,Long] { var counter=0l override def map(in: String): Long = { counter +=1 println("counter :"+counter) counter } }).setParallelism(2).print()*/ val info = data.map(new RichMapFunction[String, String] { //step1:定義一個計數器 var counter = new LongCounter() override def open(parameters: Configuration): Unit = { //注冊一個計數器 getRuntimeContext.addAccumulator("ele-counter", counter) } override def map(in: String): String = { counter.add(1) println("counter: " + counter) in } }) //使用計數器之后,並行度不在影響計算結果 info.writeAsText("./data/hello.txt",WriteMode.OVERWRITE).setParallelism(3) val jobResult = env.execute("flinkCounter") val num = jobResult.getAccumulatorResult[Long]("ele-counter") println("num :"+num ) } }
flink分布式緩存:
flink offers a distributed cache,similar to apache hadoop,to make files locally accessible to parallel instances of user function. this functionality can be used to share files that contain static external data such as dictionaries or machine-learned regression models.
the cache works as follows. a program registers a file or diractory of a local or remote filesystem such as HDFS or s3 under a specific name in its ExecutionEnvironment as a cache file. When the program is executed. flink automatically copies the file or directory to the local filesystem of all workers. a user function can look up the file or directory under the specified name
and access it from the worker's local filesystem
import org.apache.commons.io.FileUtils import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.configuration.Configuration object Distributedcache { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val filepath = "./data/hello.txt" //step1: 注冊一個本地的hdfs文件 env.registerCachedFile(filepath,"pk-scala-dc") import org.apache.flink.api.scala._ val data = env.fromElements("hadoop","hive","scala","java") data.map(new RichMapFunction[String,String] { override def open(parameters: Configuration): Unit = { val filetext = getRuntimeContext.getDistributedCache().getFile("pk-scala-dc") val lines = FileUtils.readLines(filetext) //java 語法 /** * 此時會出現一個異常: java集合和scala集合不兼容的問題 ,進行隱式轉換 */ import scala.collection.JavaConverters._ for (ele <- lines.asScala) { //scala語法 println(ele) } } override def map(in: String): String = { in } }).print() } }
基於dataStream API
自定義數據源addsource 有三種方式
自定義sink,通過繼承方法
import org.apache.flink.streaming.api.functions.source.SourceFunction class CustomNonParallel extends SourceFunction[Long]{ var count = 1L var isRunning = true override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = { while(isRunning&&count<30){ sourceContext.collect(count) count+=1 Thread.sleep(2000) } } override def cancel(): Unit = { isRunning = false } }
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment object DataStreamSourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //socketFunction(env) // NonParallelFunction(env) //parallelFunction(env) // RichParallelFunction(env) env.execute("DataStreamSourceApp") } def RichParallelFunction(env: StreamExecutionEnvironment) = { val data = env.addSource(new CustomRichParallel) data.print().setParallelism(2) } def parallelFunction(env: StreamExecutionEnvironment) = { val date = env.addSource(new CustomParallel) date.print().setParallelism(2) } def NonParallelFunction(env: StreamExecutionEnvironment) = { val data = env.addSource(new CustomNonParallel) data.print().setParallelism(2) } def socketFunction(env:StreamExecutionEnvironment) = { val data = env.socketTextStream("localhost",9999) data.print().setParallelism(1) } }
用split和select方法對數據源進行拆分
import java.{lang, util} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.collector.selector.OutputSelector object DataStreamFilter { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // filterFunction(env) splitselectFunction(env) env.execute("DataStreamFilter") } def splitselectFunction(env: StreamExecutionEnvironment) = { val data = env.addSource(new CustomNonParallel) val splits= data.split(new OutputSelector[Long] { override def select(out: Long): lang.Iterable[String] = { var list = new util.ArrayList[String]() if (out % 2 == 0) { list.add("even") } else { list.add("odd") } list } }) splits.select("odd").print().setParallelism(1) } def filterFunction(env: StreamExecutionEnvironment) = { val data = env.addSource(new CustomNonParallel) /* data.map(x=>{ println("received: " +x ) x }).filter(_%2 == 0).print().setParallelism(2)*/ //注:不同的書寫方式帶來的區別 data.map(x=>{ println("received: " + x) x }).filter(_%2 == 0).print().setParallelism(2) } }
Table API 和 sql 編程
flink 的API分為三層,越往上實現起來越簡單。
package com.flink.tablesql import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.table.api.TableEnvironment import org.apache.flink.api.scala._ import org.apache.flink.types.Row object tableSqlapi { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tableenv = TableEnvironment.getTableEnvironment(env) val filepath = "D:\\尚學堂學習資料\\網盤下載\\flink基礎入門篇\\flink-train\\data\\04\\people.csv" val data = env.readCsvFile[people](filepath,ignoreFirstLine = true) val table = tableenv.fromDataSet(data) //將獲取的table注冊成一張表 val people = tableenv.registerTable("people",table) //對注冊完成的表進行sql查詢 val resultTable = tableenv.sqlQuery("select * from people group by age") tableenv.toDataSet[Row](resultTable).print() data.print() } case class people( name:String, age:Int, job:String ) }
Event Time/Processing Time /Ingestion Time
ingestion time sits conceptually in between event time and processing time .
flink 內置的四個窗口:
tumbling windows : 滾動窗口,有固定大小,且窗口不會重疊
sliding windows : 滑動窗口 ,窗口大小固定,但是可能會重疊
session windows : 會話窗口
global windows : 全局窗口
flink中的窗口有兩大類: 1、基於時間的 2、基於數量的
支持窗口的函數介紹: reduceFunction:會對接受到的數據進行累加
ProcessWindowFunction: 會將接受到的數據先緩存到buffer中,然后對數據進行一次性的處理,這個函數可以實現將一次性接受到的數據進行一次簡單的排序
https://blog.csdn.net/lmalds/article/details/52704170 ---關於flink水印講解的很全面的一篇博客
flink Connectors
connectors介紹:
Predefined Sources and Sinks : a few basic data sources and sinks are built into flink and are always avaliable.the Predefined Data Sources include reading from files,directories ,and sockets,and ingesting data from collections and iterators. the predefined data sinks support writing to files,to stdout and stderr.and to sockets
flink接受kafka的數據:
package com.flink.connectors import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer object KafkaConnectorConsumerApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val topic = "pktest" val pro = new Properties() pro.setProperty("bootstrap.servers","192.168.126.115:9092") pro.setProperty("group.id","test") val data = env.addSource(new FlinkKafkaConsumer[String](topic,new SimpleStringSchema()),pro) data.print() } }
kafka 接受 flink的數據 :
package com.flink.connectors import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper object KafkaConnectorProducerApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //從socket接受數據,通過flink將數據sink到kafka val data = env.socketTextStream("localhost",9999) val topic = "topicTest" val pro = new Properties() pro.setProperty("bootstrap.servers","192.168.126.115:9092") pro.setProperty("group.id","test") val kafkaSink = new FlinkKafkaProducer[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),pro) data.addSink(kafkaSink) env.execute("KafkaConnectorProducerApp") } }