flink入門到實戰(5)flink流處理從0到1


一、DataStream API之Data Sources(消費者之數據源)

介紹:

source是程序的數據源輸入,你可以通過StreamExecutionEnvironment.addSource(sourceFunction)來為你的程序添加一個source。 flink提供了大量的已經實現好的source方法,你也可以自定義source 通過實現sourceFunction接口來自定義無並行度的source, 或者你也可以通過實現ParallelSourceFunction 接口 or 繼承RichParallelSourceFunction 來自定義有並行度的source。

類型:
基於文件

readTextFile(path) 讀取文本文件,文件遵循TextInputFormat 讀取規則,逐行讀取並返回。

基於socket

socketTextStream 從socker中讀取數據,元素可以通過一個分隔符切開。

基於集合

fromCollection(Collection) 通過java 的collection集合創建一個數據流,集合中的所有元素必須是相同類型的。

自定義輸入

addSource 可以實現讀取第三方數據源的數據 系統內置提供了一批connectors,連接器會提供對應的source支持【kafka】

代碼實現:
1、fromCollection
package xuwei.tech.streaming;

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; /** * 把collection集合作為數據源 * * Created by xuwei.tech on 2018/10/23. */ public class StreamingFromCollection { public static void main(String[] args) throws Exception { //獲取Flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ArrayList<Integer> data = new ArrayList<>(); data.add(10); data.add(15); data.add(20); //指定數據源 DataStreamSource<Integer> collectionData = env.fromCollection(data); //通map對數據進行處理 DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return value + 1; } }); //直接打印 num.print().setParallelism(1); env.execute("StreamingFromCollection"); } } 
2、 創建自定義單並行度為1的SourceFunction(addSource)

① 創建自定義單並行度為1的SourceFunction

package xuwei.tech.streaming.custormSource; import org.apache.flink.streaming.api.functions.source.SourceFunction; /** * 自定義實現並行度為1的source * * 模擬產生從1開始的遞增數字 * * * 注意: * SourceFunction 和 SourceContext 都需要指定數據類型,如果不指定,代碼運行的時候會報錯 * Caused by: org.apache.flink.api.common.functions.InvalidTypesException: * The types of the interface org.apache.flink.streaming.api.functions.source.SourceFunction could not be inferred. * Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point * * * Created by xuwei.tech on 2018/10/23. */ public class MyNoParalleSource implements SourceFunction<Long>{ private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 啟動一個source * 大部分情況下,都需要在這個run方法中實現一個循環,這樣就可以循環產生數據了 * * @param ctx * @throws Exception */ @Override public void run(SourceContext<Long> ctx) throws Exception { while(isRunning){ ctx.collect(count); count++; //每秒產生一條數據 Thread.sleep(1000); } } /** * 取消一個cancel的時候會調用的方法 * */ @Override public void cancel() { isRunning = false; } } 

②實現自定義單並行度為1的SourceFunction

package xuwei.tech.streaming.custormSource; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; /** * 使用並行度為1的source * * Created by xuwei.tech on 2018/10/23. */ public class StreamingDemoWithMyNoPralalleSource { public static void main(String[] args) throws Exception { //獲取Flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //獲取數據源 DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設置為1 DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("接收到數據:" + value); return value; } }); //每2秒鍾處理一次數據 DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0); //打印結果 sum.print().setParallelism(1); String jobName = StreamingDemoWithMyNoPralalleSource.class.getSimpleName(); env.execute(jobName); } } 
3、 創建自定義多並行度為1的ParallelSourceFunction (addSource)

① 創建自定義多並行度為1的ParallelSourceFunction

package xuwei.tech.streaming.custormSource; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; /** * 自定義實現一個支持並行度的source * Created by xuwei.tech on 2018/10/23. */ public class MyParalleSource implements ParallelSourceFunction<Long> { private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 啟動一個source * 大部分情況下,都需要在這個run方法中實現一個循環,這樣就可以循環產生數據了 * * @param ctx * @throws Exception */ @Override public void run(SourceContext<Long> ctx) throws Exception { while(isRunning){ ctx.collect(count); count++; //每秒產生一條數據 Thread.sleep(1000); } } /** * 取消一個cancel的時候會調用的方法 * */ @Override public void cancel() { isRunning = false; } } 

②實現自定義多並行度為1的ParallelSourceFunction

package xuwei.tech.streaming.custormSource; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; /** * 使用多並行度的source * * Created by xuwei.tech on 2018/10/23. */ public class StreamingDemoWithMyPralalleSource { public static void main(String[] args) throws Exception { //獲取Flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //獲取數據源 DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(2); DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("接收到數據:" + value); return value; } }); //每2秒鍾處理一次數據 DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0); //打印結果 sum.print().setParallelism(1); String jobName = StreamingDemoWithMyPralalleSource.class.getSimpleName(); env.execute(jobName); } } 
4、 創建自定義多並行度為1的RichParallelSourceFunction(addSource)

① 創建自定義多並行度為1的RichParallelSourceFunction

package xuwei.tech.streaming.custormSource; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; /** * 自定義實現一個支持並行度的source * * RichParallelSourceFunction 會額外提供open和close方法 * 針對source中如果需要獲取其他鏈接資源,那么可以在open方法中獲取資源鏈接,在close中關閉資源鏈接 * * Created by xuwei.tech on 2018/10/23. */ public class MyRichParalleSource extends RichParallelSourceFunction<Long> { private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 啟動一個source * 大部分情況下,都需要在這個run方法中實現一個循環,這樣就可以循環產生數據了 * * @param ctx * @throws Exception */ @Override public void run(SourceContext<Long> ctx) throws Exception { while(isRunning){ ctx.collect(count); count++; //每秒產生一條數據 Thread.sleep(1000); } } /** * 取消一個cancel的時候會調用的方法 * */ @Override public void cancel() { isRunning = false; } /** * 這個方法只會在最開始的時候被調用一次 * 實現獲取鏈接的代碼 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { System.out.println("open............."); super.open(parameters); } /** * 實現關閉鏈接的代碼 * @throws Exception */ @Override public void close() throws Exception { super.close(); } } 

②實現自定義多並行度為1的RichParallelSourceFunction

package xuwei.tech.streaming.custormSource; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; /** * 使用多並行度的source * * Created by xuwei.tech on 2018/10/23. */ public class StreamingDemoWithMyRichPralalleSource { public static void main(String[] args) throws Exception { //獲取Flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //獲取數據源 DataStreamSource<Long> text = env.addSource(new MyRichParalleSource()).setParallelism(2); DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("接收到數據:" + value); return value; } }); //每2秒鍾處理一次數據 DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0); //打印結果 sum.print().setParallelism(1); String jobName = StreamingDemoWithMyRichPralalleSource.class.getSimpleName(); env.execute(jobName); } } 

二、DataStream API之Transformations

介紹:

  1. map:輸入一個元素,然后返回一個元素,中間可以做一些清洗轉換等操作
  2. flatmap:輸入一個元素,可以返回零個,一個或者多個元素
  3. filter:過濾函數,對傳入的數據進行判斷,符合條件的數據會被留下
  4. keyBy:根據指定的key進行分組,相同key的數據會進入同一個分區【典型用法見備注】
  5. reduce:對數據進行聚合操作,結合當前元素和上一次reduce返回的值進行聚合操作,然后返回一個新的值
  6. aggregations:sum(),min(),max()等
  7. window:在后面單獨詳解
  8. Union:合並多個流,新的流會包含所有流中的數據,但是union是一個限制,就是所有合並的流類型必須是一致的。
  9. Connect:和union類似,但是只能連接兩個流,兩個流的數據類型可以不同,會對兩個流中的數據應用不同的處理方法。
  10. CoMap, CoFlatMap:在ConnectedStreams中需要使用這種函數,類似於map和flatmap
  11. Split:根據規則把一個數據流切分為多個流
  12. Select:和split配合使用,選擇切分后的流
代碼實現:
1、filter
package xuwei.tech.streaming.streamAPI; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import xuwei.tech.streaming.custormSource.MyNoParalleSource; /** * Filter演示 * * Created by xuwei.tech on 2018/10/23. */ public class StreamingDemoFilter { public static void main(String[] args) throws Exception { //獲取Flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //獲取數據源 DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設置為1 DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("原始接收到數據:" + value); return value; } }); //執行filter過濾,滿足條件的數據會被留下 DataStream<Long> filterData = num.filter(new FilterFunction<Long>() { //把所有的奇數過濾掉 @Override public boolean filter(Long value) throws Exception { return value % 2 == 0; } }); DataStream<Long> resultData = filterData.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("過濾之后的數據:" + value); return value; } }); //每2秒鍾處理一次數據 DataStream<Long> sum = resultData.timeWindowAll(Time.seconds(2)).sum(0); //打印結果 sum.print().setParallelism(1); String jobName = StreamingDemoFilter.class.getSimpleName(); env.execute(jobName); } } 
2、Split
package xuwei.tech.streaming.streamAPI; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import xuwei.tech.streaming.custormSource.MyNoParalleSource; import java.util.ArrayList; /** * split * * 根據規則把一個數據流切分為多個流 * * 應用場景: * 可能在實際工作中,源數據流中混合了多種類似的數據,多種類型的數據處理規則不一樣,所以就可以在根據一定的規則, * 把一個數據流切分成多個數據流,這樣每個數據流就可以使用不用的處理邏輯了 * * Created by xuwei.tech on 2018/10/23. */ public class StreamingDemoSplit { public static void main(String[] args) throws Exception { //獲取Flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //獲取數據源 DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設置為1 //對流進行切分,按照數據的奇偶性進行區分 SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() { @Override public Iterable<String> select(Long value) { ArrayList<String> outPut = new ArrayList<>(); if (value % 2 == 0) { outPut.add("even");//偶數 } else { outPut.add("odd");//奇數 } return outPut; } }); //選擇一個或者多個切分后的流 DataStream<Long> evenStream = splitStream.select("even"); DataStream<Long> oddStream = splitStream.select("odd"); DataStream<Long> moreStream = splitStream.select("odd","even"); //打印結果 moreStream.print().setParallelism(1); String jobName = StreamingDemoSplit.class.getSimpleName(); env.execute(jobName); } } 
3、union(注意兩個數據源類型必須相同)
package xuwei.tech.streaming.streamAPI; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import xuwei.tech.streaming.custormSource.MyNoParalleSource; /** * union * 合並多個流,新的流會包含所有流中的數據,但是union是一個限制,就是所有合並的流類型必須是一致的 * * Created by xuwei.tech on 2018/10/23. */ public class StreamingDemoUnion { public static void main(String[] args) throws Exception { //獲取Flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //獲取數據源 DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設置為1 DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1); //把text1和text2組裝到一起 DataStream<Long> text = text1.union(text2); DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("原始接收到數據:" + value); return value; } }); //每2秒鍾處理一次數據 DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0); //打印結果 sum.print().setParallelism(1); String jobName = StreamingDemoUnion.class.getSimpleName(); env.execute(jobName); } } 
4、Connect(可以合並兩種類型不一樣的數據流)
package xuwei.tech.streaming.streamAPI; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.windowing.time.Time; import xuwei.tech.streaming.custormSource.MyNoParalleSource; /** * connect * 和union類似,但是只能連接兩個流,兩個流的數據類型可以不同,會對兩個流中的數據應用不同的處理方法 * * Created by xuwei.tech on 2018/10/23. */ public class StreamingDemoConnect { public static void main(String[] args) throws Exception { //獲取Flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //獲取數據源 DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設置為1 DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1); SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() { @Override public String map(Long value) throws Exception { return "str_" + value; } }); ConnectedStreams<Long, String> connectStream = text1.connect(text2_str); SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() { @Override public Object map1(Long value) throws Exception { return value; } @Override public Object map2(String value) throws Exception { return value; } }); //打印結果 result.print().setParallelism(1); String jobName = StreamingDemoConnect.class.getSimpleName(); env.execute(jobName); } } 
5、broadcast(broadcast分區規則)
package xuwei.tech.streaming.streamAPI; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import xuwei.tech.streaming.custormSource.MyNoParalleSource; /** * broadcast分區規則 * * Created by xuwei.tech on 2018/10/23. */ public class StreamingDemoWithMyNoPralalleSourceBroadcast { public static void main(String[] args) throws Exception { //獲取Flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); //獲取數據源 DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,並行度只能設置為1 DataStream<Long> num = text.broadcast().map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { long id = Thread.currentThread().getId(); System.out.println("線程id:"+id+",接收到數據:" + value); return value; } }); //每2秒鍾處理一次數據 DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0); //打印結果 sum.print().setParallelism(1); String jobName = StreamingDemoWithMyNoPralalleSourceBroadcast.class.getSimpleName(); env.execute(jobName); } } 
總結:
一. 狀態(State)
介紹:
  • 我們前面寫的word count的例子,沒有包含狀態管理。如果一個task在處理過程中掛掉了,那么它在內存中的狀態都會丟失,所有的數據都需要重新計算。從容錯和消息處理的語義上(at least once, exactly once),Flink引入了state和checkpoint。
  • 首先區分一下兩個概念
  • state一般指一個具體的task/operator的狀態【state數據默認保存在java的堆內存中】
  • 而checkpoint【可以理解為checkpoint是把state數據持久化存儲了】,則表示了一個Flink Job在一個特定時刻的一份全局狀態快照,即包含了所有task/operator的狀態
  • 注意:task是Flink中執行的基本單位。operator指算子(transformation)。
  • State可以被記錄,在失敗的情況下數據還可以恢復
  • Flink中有兩種基本類型的State
  • Keyed State
  • Operator State
  • Keyed State和Operator State,可以以兩種形式存在:
  • 原始狀態(raw state)
  • 托管狀態(managed state)
  • 托管狀態是由Flink框架管理的狀態
  • 而原始狀態,由用戶自行管理狀態具體的數據結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部數據結構一無所知。
  • 通常在DataStream上的狀態推薦使用托管的狀態,當實現一個用戶自定義的operator時,會使用到原始狀態。
1. State-Keyed State
  • 顧名思義,就是基於KeyedStream上的狀態。這個狀態是跟特定的key綁定的,對KeyedStream流上的每一個key,都對應一個state。
  • stream.keyBy(…)
  • 保存state的數據結構
  • ValueState<T>:即類型為T的單值狀態。這個狀態與對應的key綁定,是最簡單的狀態了。它可以通過update方法更新狀態值,通過value()方法獲取狀態值
  • ListState<T>:即key上的狀態值為一個列表。可以通過add方法往列表中附加值;也可以通過get()方法返回一個Iterable<T>來遍歷狀態值
  • ReducingState<T>:這種狀態通過用戶傳入的reduceFunction,每次調用add方法添加值的時候,會調用reduceFunction,最后合並到一個單一的狀態值
  • MapState<UK, UV>:即狀態值為一個map。用戶通過put或putAll方法添加元素
  • 需要注意的是,以上所述的State對象,僅僅用於與狀態進行交互(更新、刪除、清空等),而真正的狀態值,有可能是存在內存、磁盤、或者其他分布式存儲系統中。相當於我們只是持有了這個狀態的句柄
2. State-Operator State
  • 與Key無關的State,與Operator綁定的state,整個operator只對應一個state
  • 保存state的數據結構
  • ListState<T>
  • 舉例來說,Flink中的Kafka Connector,就使用了operator state。它會在每個connector實例中,保存該實例中消費topic的所有(partition, offset)映射

二、狀態容錯

1、checkpoint

  • 依靠checkPoint機制
  • 保證exactly-once
  • 只能保證Flink系統內的exactly-once
  • 對於source和sink需要依賴外部的組件一同保證
checkPoint介紹:
  • 為了保證state的容錯性,Flink需要對state進行checkpoint。
  • Checkpoint是Flink實現容錯機制最核心的功能,它能夠根據配置周期性地基於Stream中各個Operator/task的狀態來生成快照,從而將這些狀態數據定期持久化存儲下來,當Flink程序一旦意外崩潰時,重新運行程序時可以有選擇地從這些快照進行恢復,從而修正因為故障帶來的程序數據異常
  • Flink的checkpoint機制可以與(stream和state)的持久化存儲交互的前提:
  • 持久化的source,它需要支持在一定時間內重放事件。這種sources的典型例子是持久化的消息隊列(比如Apache Kafka,RabbitMQ等)或文件系統(比如HDFS,S3,GFS等)
  • 用於state的持久化存儲,例如分布式文件系統(比如HDFS,S3,GFS等)
checkPoint配置:
  • 默認checkpoint功能是disabled的,想要使用的時候需要先啟用
  • checkpoint開啟之后,默認的checkPointMode是Exactly-once
  • checkpoint的checkPointMode有兩種,Exactly-once和At-least-once
  • Exactly-once對於大多數應用來說是最合適的。At-least-once可能用在某些延遲超低的應用程序(始終延遲為幾毫秒)
checkpoint配置conf
  • 默認checkpoint功能是disabled的,想要使用的時候需要先啟用
  • StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • // 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】
  • env.enableCheckpointing(1000);
  • // 高級選項:
  • // 設置模式為exactly-once (這是默認值)
  • env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  • // 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
  • env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  • // 檢查點必須在一分鍾內完成,或者被丟棄【checkpoint的超時時間】
  • env.getCheckpointConfig().setCheckpointTimeout(60000);
  • // 同一時間只允許進行一個檢查點
  • env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  • // 表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備注】
  • env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

2、State Backend(狀態的后端存儲)

介紹:
  • 默認情況下,state會保存在taskmanager的內存中,checkpoint會存儲在JobManager的內存中。
  • state 的store和checkpoint的位置取決於State Backend的配置
  • env.setStateBackend(…)
  • 一共有三種State Backend
  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend
分類:
1、 MemoryStateBackend
  • state數據保存在java堆內存中,執行checkpoint的時候,會把state的快照數據保存到jobmanager的內存中
  • 基於內存的state backend在生產環境下不建議使用
2、 FsStateBackend
  • state數據保存在taskmanager的內存中,執行checkpoint的時候,會把state的快照數據保存到配置的文件系統中
  • 可以使用hdfs等分布式文件系統
3、RocksDBStateBackend
  • RocksDB跟上面的都略有不同,它會在本地文件系統中維護狀態,state會直接寫入本地rocksdb中。同時它需要配置一個遠端的filesystem uri(一般是HDFS),在做checkpoint的時候,會把本地的數據直接復制到filesystem中。fail over的時候從filesystem中恢復到本地
  • RocksDB克服了state受內存限制的缺點,同時又能夠持久化到遠端文件系統中,比較適合在生產中使用
state配置:

修改State Backend的兩種方式

  • 第一種:單任務調整
  • 修改當前任務代碼
  • env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
  • 或者new MemoryStateBackend()
  • 或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】
  • 第二種:全局調整
  • 修改flink-conf.yaml
  • state.backend: filesystem
  • state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
  • 注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

三、Restart Strategies(重啟策略)

介紹:
  • Flink支持不同的重啟策略,以在故障發生時控制作業如何重啟
  • 集群在啟動時會伴隨一個默認的重啟策略,在沒有定義具體重啟策略時會使用該默認策略。 如果在工作提交時指定了一個重啟策略,該策略會覆蓋集群的默認策略
  • 默認的重啟策略可以通過 Flink 的配置文件 flink-conf.yaml 指定。配置參數 restart-strategy 定義了哪個策略被使用。
  • 常用的重啟策略
  • 固定間隔 (Fixed delay)
  • 失敗率 (Failure rate)
  • 無重啟 (No restart)
  • 如果沒有啟用 checkpointing,則使用無重啟 (no restart) 策略。
  • 如果啟用了 checkpointing,但沒有配置重啟策略,則使用固定間隔 (fixed-delay) 策略,其中 Integer.MAX_VALUE 參數是嘗試重啟次數
  • 重啟策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在應用代碼中動態指定,會覆蓋全局配置
分類:
  • 1、重啟策略之固定間隔 (Fixed delay)
  • 第一種:全局配置 flink-conf.yaml
  • restart-strategy: fixed-delay
  • restart-strategy.fixed-delay.attempts: 3
  • restart-strategy.fixed-delay.delay: 10 s
  • 第二種:應用代碼設置
  • env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  • 3, // 嘗試重啟的次數
  • Time.of(10, TimeUnit.SECONDS) // 間隔 ));
2、重啟策略之失敗率 (Failure rate)
  • 第一種:全局配置 flink-conf.yaml
  • restart-strategy: failure-rate
  • restart-strategy.failure-rate.max-failures-per-interval: 3
  • restart-strategy.failure-rate.failure-rate-interval: 5 min
  • restart-strategy.failure-rate.delay: 10 s
  • 第二種:應用代碼設置
  • env.setRestartStrategy(RestartStrategies.failureRateRestart(
  • 3, // 一個時間段內的最大失敗次數
  • Time.of(5, TimeUnit.MINUTES), // 衡量失敗次數的是時間段
  • Time.of(10, TimeUnit.SECONDS) // 間隔 ));
3、重啟策略之無重啟 (No restart)
  • 第一種:全局配置 flink-conf.yaml
  • restart-strategy: none
  • 第二種:應用代碼設置
  • env.setRestartStrategy(RestartStrategies.noRestart());
4、保存多個Checkpoint
  • 默認情況下,如果設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復。但是,如果我們希望保留多個Checkpoint,並能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發現最近4個小時數據記錄處理有問題,希望將整個狀態還原到4小時之前
  • Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數
  • state.checkpoints.num-retained: 20
  • 這樣設置以后就查看對應的Checkpoint在HDFS上存儲的文件目錄
  • hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints
  • 如果希望回退到某個Checkpoint點,只需要指定對應的某個Checkpoint路徑即可實現
6、從Checkpoint進行恢復
  • 如果Flink程序異常失敗,或者最近一段時間內數據處理錯誤,我們可以將程序從某一個Checkpoint點進行恢復
  • bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar
  • 程序正常運行后,還會按照Checkpoint配置進行運行,繼續生成Checkpoint數據
7、savePoint
介紹:
  • Flink通過Savepoint功能可以做到程序升級后,繼續從升級前的那個點開始執行計算,保證數據不中斷
  • 全局,一致性快照。可以保存數據源offset,operator操作狀態等信息
  • 可以從應用在過去任意做了savepoint的時刻開始繼續消費
配置使用:
  1. 在flink-conf.yaml中配置Savepoint存儲位置
  • 不是必須設置,但是設置后,后面創建指定Job的Savepoint時,可以不用在手動執行命令時指定Savepoint的位置
  • state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
  1. 觸發一個savepoint【直接觸發或者在cancel的時候觸發】
  • bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】
  • bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】
  1. 從指定的savepoint啟動job bin/flink run -s savepointPath [runArgs]
總結:checkPoint vs savePoint
  1. checkPoint
  • 應用定時觸發,用於保存狀態,會過期
  • 內部應用失敗重啟的時候使用
  1. savePoint
  • 用戶手動執行,是指向Checkpoint的指針,不會過期
  • 在升級的情況下使用
  • 注意:為了能夠在作業的不同版本之間以及 Flink 的不同版本之間順利升級,強烈推薦程序員通過 uid(String) 方法手動的給算子賦予 ID,這些 ID 將用於確定每一個算子的狀態范圍。如果不手動給各算子指定 ID,則會由 Flink 自動給每個算子生成一個 ID。只要這些 ID 沒有改變就能從保存點(savepoint)將程序恢復回來。而這些自動生成的 ID 依賴於程序的結構,並且對代碼的更改是很敏感的。因此,強烈建議用戶手動的設置 ID。
9、checkpoint(檢查點)
package xuwei.tech.streaming;

import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * checkpoint * * Created by xuwei.tech on 2018/10/8. */ public class SocketWindowWordCountJavaCheckPoint { public static void main(String[] args) throws Exception{ //獲取需要的端口號 int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); }catch (Exception e){ System.err.println("No port set. use default port 9000--java"); port = 9000; } //獲取flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】 env.enableCheckpointing(1000); // 高級選項: // 設置模式為exactly-once (這是默認值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 檢查點必須在一分鍾內完成,或者被丟棄【checkpoint的超時時間】 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一時間只允許進行一個檢查點 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備注】 //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后,會刪除Checkpoint數據,只有job執行失敗的時候才會保存checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //設置statebackend //env.setStateBackend(new MemoryStateBackend()); //env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints")); //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true)); String hostname = "hadoop100"; String delimiter = "\n"; //連接socket獲取輸入的數據 DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter); // a a c // a 1 // a 1 // c 1 DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() { public void flatMap(String value, Collector<WordWithCount> out) throws Exception { String[] splits = value.split("\\s"); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); } } }).keyBy("word") .timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小為2秒,指定時間間隔為1秒 .sum("count");//在這里使用sum或者reduce都可以 /*.reduce(new ReduceFunction<WordWithCount>() { public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word,a.count+b.count); } })*/ //把數據打印到控制台並且設置並行度 windowCounts.print().setParallelism(1); //這一行代碼一定要實現,否則程序不執行 env.execute("Socket window count"); } public static class WordWithCount{ public String word; public long count; public WordWithCount(){} public WordWithCount(String word,long count){ this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } } 

三、DataStream API之partition

介紹:
  1. Random partitioning:隨機分區
  2. dataStream.shuffle()
  3. Rebalancing:對數據集進行再平衡,重分區,消除數據傾斜
  4. dataStream.rebalance()
  5. Rescaling:解釋見備注
  6. dataStream.rescale()
  7. Custom partitioning:自定義分區
  8. 自定義分區需要實現Partitioner接口
  9. dataStream.partitionCustom(partitioner, "someKey")
  10. 或者dataStream.partitionCustom(partitioner, 0);
  11. Broadcasting:在后面單獨詳解
代碼實現:
1、Partitioner

①創建分區類

package xuwei.tech.streaming.custormPartition; import org.apache.flink.api.common.functions.Partitioner; /** * Created by xuwei.tech on 2018/10/23. */ public class MyPartition implements Partitioner<Long> { @Override public int partition(Long key, int numPartitions) { System.out.println("分區總數:"+numPartitions); if(key % 2 == 0){ return 0; }else{ return 1; } } } 

②實現分區類的對象化

package xuwei.tech.streaming.custormPartition; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import xuwei.tech.streaming.custormSource.MyNoParalleSource; /** * * 使用自定義分析 * 根據數字的奇偶性來分區 * * Created by xuwei.tech on 2018/10/23. */ public class SteamingDemoWithMyParitition { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()); //對數據進行轉換,把long類型轉成tuple1類型 DataStream<Tuple1<Long>> tupleData = text.map(new MapFunction<Long, Tuple1<Long>>() { @Override public Tuple1<Long> map(Long value) throws Exception { return new Tuple1<>(value); } }); //分區之后的數據 DataStream<Tuple1<Long>> partitionData= tupleData.partitionCustom(new MyPartition(), 0); DataStream<Long> result = partitionData.map(new MapFunction<Tuple1<Long>, Long>() { @Override public Long map(Tuple1<Long> value) throws Exception { System.out.println("當前線程id:" + Thread.currentThread().getId() + ",value: " + value); return value.getField(0); } }); result.print().setParallelism(1); env.execute("SteamingDemoWithMyParitition"); } } 

四、DataStream API之Data Sink(數據落地)

介紹:
  1. writeAsText():將元素以字符串形式逐行寫入,這些字符串通過調用每個元素的toString()方法來獲取
  2. print() / printToErr():打印每個元素的toString()方法的值到標准輸出或者標准錯誤輸出流中
  3. 自定義輸出addSink【kafka、redis】
1、內置Connectors
  1. Apache Kafka (source/sink)
  2. Apache Cassandra (sink)
  3. Elasticsearch (sink)
  4. Hadoop FileSystem (sink)
  5. RabbitMQ (source/sink)
  6. Apache ActiveMQ (source/sink)
  7. Redis (sink)
2. 自定義sink
  1. 實現自定義的sink
  2. 實現SinkFunction接口
  3. 或者繼承RichSinkFunction
  4. 參考org.apache.flink.streaming.connectors.redis.RedisSink
代碼
1、落地到Redis
package xuwei.tech.streaming.sink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; /** * 接收socket數據,把數據保存到redis中 * * list * * lpush list_key value * * Created by xuwei.tech on 2018/10/23. */ public class StreamingDemoToRedis { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.socketTextStream("hadoop100", 9000, "\n"); //lpsuh l_words word //對數據進行組裝,把string轉化為tuple2<String,String> DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String value) throws Exception { return new Tuple2<>("l_words", value); } }); //創建redis的配置 FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build(); //創建redissink RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper()); l_wordsData.addSink(redisSink); env.execute("StreamingDemoToRedis"); } public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>>{ //表示從接收的數據中獲取需要操作的redis key @Override public String getKeyFromData(Tuple2<String, String> data) { return data.f0; } //表示從接收的數據中獲取需要操作的redis value @Override public String getValueFromData(Tuple2<String, String> data) { return data.f1; } @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.LPUSH); } } } 
2、落地到kafka(生產者)
package xuwei.tech.streaming; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import java.util.Properties; /** * kafkaSink * * Created by xuwei.tech on 2018/10/23. */ public class StreamingKafkaSink { public static void main(String[] args) throws Exception { //獲取Flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //checkpoint配置 env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //設置statebackend //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true)); DataStreamSource<String> text = env.socketTextStream("hadoop100", 9001, "\n"); String brokerList = "hadoop110:9092"; String topic = "t1"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers",brokerList); //第一種解決方案,設置FlinkKafkaProducer011里面的事務超時時間 //設置事務超時時間 //prop.setProperty("transaction.timeout.ms",60000*15+""); //第二種解決方案,設置kafka的最大事務超時時間 //FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema()); //使用僅一次語義的kafkaProducer FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); text.addSink(myProducer); env.execute("StreamingFromCollection"); } } 
3、消費kafka數據
package xuwei.tech.streaming;

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import java.util.ArrayList; import java.util.Properties; /** * kafkaSource * * Created by xuwei.tech on 2018/10/23. */ public class StreamingKafkaSource { public static void main(String[] args) throws Exception { //獲取Flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //checkpoint配置 env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //設置statebackend //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true)); String topic = "t1"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers","hadoop110:9092"); prop.setProperty("group.id","con1"); FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop); myConsumer.setStartFromGroupOffsets();//默認消費策略 DataStreamSource<String> text = env.addSource(myConsumer); text.print().setParallelism(1); env.execute("StreamingFromCollection"); } } 

五、DataStream API之watermark

介紹:
  • 在使用eventTime的時候如何處理亂序數據?
  • 我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由於網絡延遲等原因,導致亂序的產生,特別是使用kafka的話,多個分區的數據無法保證有序。所以在進行window計算的時候,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了。這個特別的機制,就是watermark,watermark是用於處理亂序事件的。
  • watermark可以翻譯為水位線
代碼:
1、Watermark案例1
package xuwei.tech.streaming.watermark; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import javax.annotation.Nullable; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; /** * * Watermark 案例 * * Created by xuwei.tech. */ public class StreamingWindowWatermark { public static void main(String[] args) throws Exception { //定義socket的端口號 int port = 9000; //獲取運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設置使用eventtime,默認是使用processtime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //設置並行度為1,默認並行度是當前機器的cpu數量 env.setParallelism(1); //連接socket獲取輸入的數據 DataStream<String> text = env.socketTextStream("hadoop100", port, "\n"); //解析輸入的數據 DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String value) throws Exception { String[] arr = value.split(","); return new Tuple2<>(arr[0], Long.parseLong(arr[1])); } }); //抽取timestamp和生成watermark DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() { Long currentMaxTimestamp = 0L; final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時間是10s SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); /** * 定義生成watermark的邏輯 * 默認100ms被調用一次 */ @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } //定義如何提取timestamp @Override public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) { long timestamp = element.f1; currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); long id = Thread.currentThread().getId(); System.out.println("currentThreadId:"+id+",key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]"); return timestamp; } }); DataStream<String> window = waterMarkStream.keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和調用TimeWindow效果一樣 .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() { /** * 對window內的數據進行排序,保證數據的順序 * @param tuple * @param window * @param input * @param out * @throws Exception */ @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception { String key = tuple.toString(); List<Long> arrarList = new ArrayList<Long>(); Iterator<Tuple2<String, Long>> it = input.iterator(); while (it.hasNext()) { Tuple2<String, Long> next = it.next(); arrarList.add(next.f1); } Collections.sort(arrarList); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1)) + "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd()); out.collect(result); } }); //測試-把結果打印到控制台即可 window.print(); //注意:因為flink是懶加載的,所以必須調用execute方法,上面的代碼才會執行 env.execute("eventtime-watermark"); } } 
2、Watermark案例2
package xuwei.tech.streaming.watermark; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import javax.annotation.Nullable; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; /** * * Watermark 案例 * * sideOutputLateData 收集遲到的數據 * * Created by xuwei.tech. */ public class StreamingWindowWatermark2 { public static void main(String[] args) throws Exception { //定義socket的端口號 int port = 9000; //獲取運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設置使用eventtime,默認是使用processtime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //設置並行度為1,默認並行度是當前機器的cpu數量 env.setParallelism(1); //連接socket獲取輸入的數據 DataStream<String> text = env.socketTextStream("hadoop100", port, "\n"); //解析輸入的數據 DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String value) throws Exception { String[] arr = value.split(","); return new Tuple2<>(arr[0], Long.parseLong(arr[1])); } }); //抽取timestamp和生成watermark DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() { Long currentMaxTimestamp = 0L; final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時間是10s SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); /** * 定義生成watermark的邏輯 * 默認100ms被調用一次 */ @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } //定義如何提取timestamp @Override public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) { long timestamp = element.f1; currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]"); return timestamp; } }); //保存被丟棄的數據 OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){}; //注意,由於getSideOutput方法是SingleOutputStreamOperator子類中的特有方法,所以這里的類型,不能使用它的父類dataStream。 SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和調用TimeWindow效果一樣 //.allowedLateness(Time.seconds(2))//允許數據遲到2秒 .sideOutputLateData(outputTag) .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() { /** * 對window內的數據進行排序,保證數據的順序 * @param tuple * @param window * @param input * @param out * @throws Exception */ @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception { String key = tuple.toString(); List<Long> arrarList = new ArrayList<Long>(); Iterator<Tuple2<String, Long>> it = input.iterator(); while (it.hasNext()) { Tuple2<String, Long> next = it.next(); arrarList.add(next.f1); } Collections.sort(arrarList); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1)) + "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd()); out.collect(result); } }); //把遲到的數據暫時打印到控制台,實際中可以保存到其他存儲介質中 DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag); sideOutput.print(); //測試-把結果打印到控制台即可 window.print(); //注意:因為flink是懶加載的,所以必須調用execute方法,上面的代碼才會執行 env.execute("eventtime-watermark"); } } 

①window操作與介紹

介紹:
  1. window介紹:
  • 聚合事件(比如計數、求和)在流上的工作方式與批處理不同。
  • 比如,對流中的所有元素進行計數是不可能的,因為通常流是無限的(無界的)。所以,流上的聚合需要由 window 來划定范圍,比如 “計算過去的5分鍾” ,或者 “最后100個元素的和” 。
  • window是一種可以把無限數據切割為有限數據塊的手段
  • 窗口可以是 時間驅動的 【Time Window】(比如:每30秒)或者 數據驅動的【Count Window】 (比如:每100個元素)。
  1. window類型
  • 窗口通常被區分為不同的類型:
  • tumbling windows:滾動窗口 【沒有重疊】
  • sliding windows:滑動窗口 【有重疊】
  • session windows:會話窗口
代碼:
1、window 全量聚合
介紹:
  • 全量聚合
  • 等屬於窗口的數據到齊,才開始進行聚合計算【可以實現對窗口內的數據進行排序等需求】
  • apply(windowFunction)
  • process(processWindowFunction)
  • processWindowFunction比windowFunction提供了更多的上下文信息。
package xuwei.tech.streaming;

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; /** * window 全量聚合 */ public class SocketDemoFullCount { public static void main(String[] args) throws Exception{ //獲取需要的端口號 int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); }catch (Exception e){ System.err.println("No port set. use default port 9000--java"); port = 9000; } //獲取flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String hostname = "hadoop100"; String delimiter = "\n"; //連接socket獲取輸入的數據 DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter); DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() { @Override public Tuple2<Integer,Integer> map(String value) throws Exception { return new Tuple2<>(1,Integer.parseInt(value)); } }); intData.keyBy(0) .timeWindow(Time.seconds(5)) .process(new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() { @Override public void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out) throws Exception { System.out.println("執行process。。。"); long count = 0; for(Tuple2<Integer,Integer> element: elements){ count++; } out.collect("window:"+context.window()+",count:"+count); } }).print(); //這一行代碼一定要實現,否則程序不執行 env.execute("Socket window count"); } } 
2、window增量聚合
介紹:
  • 增量聚合
  • 窗口中每進入一條數據,就進行一次計算
  • reduce(reduceFunction)
  • aggregate(aggregateFunction)
  • sum(),min(),max()
package xuwei.tech.streaming;

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; /** * window 增量聚合 */ public class SocketDemoIncrAgg { public static void main(String[] args) throws Exception{ //獲取需要的端口號 int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); }catch (Exception e){ System.err.println("No port set. use default port 9000--java"); port = 9000; } //獲取flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String hostname = "hadoop100"; String delimiter = "\n"; //連接socket獲取輸入的數據 DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter); DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() { @Override public Tuple2<Integer,Integer> map(String value) throws Exception { return new Tuple2<>(1,Integer.parseInt(value)); } }); intData.keyBy(0) .timeWindow(Time.seconds(5)) .reduce(new ReduceFunction<Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception { System.out.println("執行reduce操作:"+value1+","+value2); return new Tuple2<>(value1.f0,value1.f1+value2.f1); } }).print(); //這一行代碼一定要實現,否則程序不執行 env.execute("Socket window count"); } } 
8、窗口滑動計算
package xuwei.tech.streaming;

import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * 滑動窗口計算 * * 通過socket模擬產生單詞數據 * flink對數據進行統計計算 * * 需要實現每隔1秒對最近2秒內的數據進行匯總計算 * * * Created by xuwei.tech on 2018/10/8. */ public class SocketWindowWordCountJava { public static void main(String[] args) throws Exception{ //獲取需要的端口號 int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); }catch (Exception e){ System.err.println("No port set. use default port 9000--java"); port = 9000; } //獲取flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String hostname = "hadoop100"; String delimiter = "\n"; //連接socket獲取輸入的數據 DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter); // a a c // a 1 // a 1 // c 1 DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() { public void flatMap(String value, Collector<WordWithCount> out) throws Exception { String[] splits = value.split("\\s"); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); } } }).keyBy("word") .timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小為2秒,指定時間間隔為1秒 .sum("count");//在這里使用sum或者reduce都可以 /*.reduce(new ReduceFunction<WordWithCount>() { public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word,a.count+b.count); } })*/ //把數據打印到控制台並且設置並行度 windowCounts.print().setParallelism(1); //這一行代碼一定要實現,否則程序不執行 env.execute("Socket window count"); } public static class WordWithCount{ public String word; public long count; public WordWithCount(){} public WordWithCount(String word,long count){ this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } } 
window總結與感悟:

②time操作與介紹

介紹:
  • 針對stream數據中的時間,可以分為以下三種
  • Event Time:事件產生的時間,它通常由事件中的時間戳描述。
  • Ingestion time:事件進入Flink的時間
  • Processing Time:事件被處理時當前系統的時間
代碼:
1、Processing Time
package myflink.job;

import com.alibaba.fastjson.JSON; import myflink.model.UrlInfo; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import java.util.Date; import java.util.Properties; public class WindowTest { public static void main(String[] args) throws Exception { // 從kafka中獲取數據 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("zookeeper.connect", "localhost:2181"); properties.put("group.id", "metric-group"); properties.put("auto.offset.reset", "latest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); SingleOutputStreamOperator<UrlInfo> dataStreamSource = env.addSource( new FlinkKafkaConsumer010<String>( "testjin",// topic new SimpleStringSchema(), properties ) ).setParallelism(1) // map操作,轉換,從一個數據流轉換成另一個數據流,這里是從string-->UrlInfo .map(string -> { UrlInfo urlInfo = JSON.parseObject(string, UrlInfo.class); urlInfo.setDomain(urlInfo.generateDomain()); return urlInfo; }); // 根據domain做keyby KeyedStream<UrlInfo, String> keyedStream = dataStreamSource.keyBy(new KeySelector<UrlInfo, String>() { @Override public String getKey(UrlInfo urlInfo) throws Exception { return urlInfo.getDomain(); } }); // 設置時間類型為Processing Time env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 使用timeWindow SingleOutputStreamOperator<UrlInfo> windowReduceStream = keyedStream.timeWindow(Time.seconds(30)) .reduce((ReduceFunction<UrlInfo>) (t1, t2) -> { UrlInfo urlInfo = new UrlInfo(); // domain都是同一個partition,所以都是同一個 urlInfo.setDomain(t1.getDomain()); urlInfo.setUrl(urlInfo.getDomain() + "/reduce/" + DateFormatUtils.format(new Date(),"yyyy-MM-dd'T'HH:mm:ss")); urlInfo.setHash(DigestUtils.md5Hex(urlInfo.getUrl())); urlInfo.setCount(t1.getCount() + 1);// 在reduce中做累加計數 return urlInfo; }).returns(UrlInfo.class); windowReduceStream.addSink(new PrintSinkFunction<>()); env.execute("execute window reduce info"); } } 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM