flink學習筆記-數據源(DataSource)


說明:本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:

 Flink大數據項目實戰:http://t.cn/EJtKhaz

 

1.4 JobGraph -> ExecutionGraph

 

 

 

 

 

 

 

 

 

 

 

1.5 ExecutionGraph

JobGraph轉換ExecutionGraph的過程中,內部會出現如下的轉換。

 

1.ExecutionJobVertex <- JobVertexJobVertex轉換為ExecutionJobVertex

 

2.ExecutionVertex(比如map)可以並發多個任務。

 

3.ExecutionEdge <- JobEdgeJobEdge轉換為ExecutionEdge

 

4.ExecutionGraph 是一個2維結構。

 

5.根據2維結構分發對應Vertex到指定slot

 

 

2. DataStreamContext

 

Flink通過StreamExecutionEnvironment.getExecutionEnvironment()方法獲取一個執行環境,Flink引用是在本地執行,還是以集群方式執行,系統會自動識別。如果是本地執行會調用createLocalEnvironment()方法,如果是集群執行會調用createExecutionEnvironment()。

 

 

3. 數據源(DataSource)

Flink數據源可以有兩種實現方式:

 

1.內置數據源

a)基於文件

b)基於Socket

c)基於Collection

 

2.自定義數據源

a)實現SourceFunction(非並行的)

b)實現ParallelSourceFunction

c)繼承RichParallelSourceFunction

public class SimpleSourceFunction implements ParallelSourceFunction<Long> {

private long num = 0L;

private volatile boolean isRunning = true;

@Override

public void run(SourceContext<Long> sourceContext) throws Exception {

while (isRunning) {

sourceContext.collect(num); num++;

Thread.sleep(10000);

}

}

@Override

public void cancel() {

isRunning = false;

}

}

 

4. Transformation

Transformation(Operators/操作符/算子):可以將一個或多個DataStream轉換為新的DataStream

 

 

5. DataSink

Flink也包含兩類Sink

1.常用的sink會在后續的connectors中介紹。

 

2.自定義Sink

自定義Sink可以實現SinkFunction 接口,也可以繼承RichSinkFunction

 

 

6. 流式迭代運算(Iterations)

 

簡單理解迭代運算:

 

當前一次運算的輸出作為下一次運算的輸入(當前運算叫做迭代運算)。不斷反復進行某種運算,直到達到某個條件才跳出迭代(是不是想起了遞歸)

 

 

 

 

 

流式迭代運算:

 

1.它沒有最大迭代次數

 

2.它需要通過split/filter轉換操作指定流的哪些部分數據反饋給迭代算子,哪些部分數據被轉發到下游DataStream

 

 

 

3.基本套路

 

1)基於輸入流構建IterativeStream(迭代頭)

 

2)定義迭代邏輯(map fun)

 

3)定義反饋流邏輯(從迭代過的流中過濾出符合條件的元素組成的部分流反饋給迭代頭進行重復計算的邏輯)

 

4)調用IterativeStreamcloseWith方法可以關閉一個迭代(也可表述為定義了迭代尾)

 

5)定義“終止迭代”的邏輯(符合條件的元素將被分發給下游而不用於進行下一次迭代)

 

 

 

4.流式迭代運算實例

 

問題域:輸入一組數據,我們對他們分別進行減1運算,直到等於0為止.

 

import org.apache.flink.api.common.functions.FilterFunction;

 

import org.apache.flink.api.common.functions.MapFunction;

 

import org.apache.flink.streaming.api.datastream.DataStream;

 

import org.apache.flink.streaming.api.datastream.IterativeStream;

 

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

 

/**

 * @Author: lifei

 * @Date: 2018/12/16 下午6:43

 */

public class IterativeStreamJob {

    public static void main(String[] args) throws Exception {

 

        //輸入一組數據,我們對他們分別進行減1運算,直到等於0為止

 

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

 

        DataStream<Long> input=env.generateSequence(0,100);//1,2,3,4,5

 

        //基於輸入流構建IterativeStream(迭代頭)

        IterativeStream<Long> itStream=input.iterate();

        //定義迭代邏輯(map fun)

        DataStream<Long> minusOne=itStream.map(new MapFunction<Long, Long>() {

 

            @Override

            public Long map(Long value) throws Exception {

                return value-1;

            }

        });

        //定義反饋流邏輯(從迭代過的流中過濾出符合條件的元素組成的部分流反饋給迭代頭進行重復計算的邏輯)

        DataStream<Long> greaterThanZero=minusOne.filter(new FilterFunction<Long>() {

            @Override

            public boolean filter(Long value) throws Exception {

                return value>0;

            }

        });

 

        //調用IterativeStreamcloseWith方法可以關閉一個迭代(也可表述為定義了迭代尾)

        itStream.closeWith(greaterThanZero);

 

        //定義“終止迭代”的邏輯(符合條件的元素將被分發給下游而不用於進行下一次迭代)

        DataStream<Long> lessThanZero=minusOne.filter(new FilterFunction<Long>() {

            @Override

            public boolean filter(Long value) throws Exception {

                return value<=0;

            }

        });

 

        lessThanZero.print();

 

        env.execute("IterativeStreamJob");

    }

}

 

7. Execution參數

Controlling Latency(控制延遲)

 

1.默認情況下,流中的元素並不會一個一個的在網絡中傳輸(這會導致不必要的網絡流量消耗),而是緩存起來,緩存的大小可以在Flink的配置文件、 ExecutionEnvironment、設置某個算子上進行配置(默認100ms)

1)好處:提高吞吐

2)壞處:增加了延遲

 

2.如何把握平衡

1)為了最大吞吐量,可以設置setBufferTimeout(-1),這會移除timeout機制,緩存中的數據一滿就會被發送

2)為了最小的延遲,可以將超時設置為接近0的數(例如5或者10ms)

3)緩存的超時不要設置為0,因為設置為0會帶來一些性能的損耗

 

3.其他更多的Execution參數后面會有專題講解

 

 

8. 調試

 

 對於具體開發項目,Flink提供了多種調試手段。Streaming程序發布之前最好先進行調試,看看是不是能按預期執行。為了降低分布式流處理程序調試的難度,Flink提供了一些列方法:

 

1.本地執行環境

 

 

2.Collection Data Sources

 

3.Iterator Data Sink

 

 

 

 

 

本地執行環境:

 

本地執行環境不需要刻意創建,可以斷點調試

 

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

 

DataStream<String> lines = env.addSource(/* some source */);

 

env.execute();

 

Collection Data Sources:

Flink提供了一些Java 集合支持的特殊數據源來使得測試更加容易,程序測試成功后,將sourcesink替換成真正sourcesink即可。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

env.fromElements(1, 2, 3, 4, 5);

env.fromCollection(Collection);

env.fromCollection(Iterator, Class);

env.generateSequence(0, 1000)

 

 

Iterator Data Sink:

Flink提供一個特殊的sink來收集DataStream的結果

DataStream<Tuple2<String, Integer>> myResult = ...

Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

 

 

8Operators串燒

1. DataStream Transformation

1.1 DataStream轉換關系

 

 

 

 

上圖標識了DataStream不同形態直接的轉換關系,也可以看出DataStream主要包含以下幾類:

1.keyby就是按照指定的key分組

2.window是一種特殊的分組(基於時間)

3.coGroup

4.join Joincogroup 的特例

5.Connect就是松散聯盟,類似於英聯邦

 

 

1.2 DataStream

 

DataStream Flink 流處理 API 中最核心的數據結構。它代表了一個運行在多個分區上的並行流。

 

一個 DataStream 可以從 StreamExecutionEnvironment 通過env.addSource(SourceFunction) 獲得。

 

1.3 map&flatMap

 

含義:數據映射(11出和1n)

 

 

轉換關系:DataStream DataStream

 

 

 

使用場景:

 

ETL時刪減計算過程中不需要的字段

 

 

 

案例1

 

 

 

public class TestMap {

 

    public static void main(String[] args) throws Exception {

 

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

 

 

 

        DataStream<Long> input=env.generateSequence(0,10);

 

 

 

        DataStream plusOne=input.map(new MapFunction<Long, Long>() {

 

 

 

            @Override

 

            public Long map(Long value) throws Exception {

 

                System.out.println("--------------------"+value);

 

                return value+1;

 

            }

 

        });

 

 

 

        plusOne.print();

 

 

 

        env.execute();

 

    }

 

}

 

 

 

案例2

 

public class TestFlatmap {

 

    public static void main(String[] args) throws Exception {

 

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

 

 

 

        DataStream<String> input=env.fromElements(WORDS);

 

 

 

        DataStream<String> wordStream=input.flatMap(new FlatMapFunction<String, String>() {

 

            @Override

 

            public void flatMap(String value, Collector<String> out) throws Exception {

 

 

 

                String[] tokens = value.toLowerCase().split("\\W+");

 

 

 

 

 

 

 

 

  for (String token : tokens) {

 

                    if (token.length() > 0) {

 

                        out.collect(token);

 

                    }

 

                }

 

            }

 

        });

 

 

 

        wordStream.print();

 

 

 

        env.execute();

 

    }

 

 

 

    public static final String[] WORDS = new String[] {

 

            "To be, or not to be,--that is the question:--",

 

            "Whether 'tis nobler in the mind to suffer",

 

            "The slings and arrows of outrageous fortune",

 

            "And by opposing end them?--To die,--to sleep,--",

 

            "Be all my sins remember'd."

 

    };

 

}

 

如右上圖所示,DataStream 各個算子會並行運行,算子之間是數據流分區。如 Source 的第一個並行實例(S1)和 flatMap() 的第一個並行實例(m1)之間就是一個數據流分區。而在 flatMap() map() 之間由於加了 rebalance(),它們之間的數據流分區就有3個子分區(m1的數據流向3map()實例)。這與 Apache Kafka 是很類似的,把流想象成 Kafka Topic,而一個流分區就表示一個 Topic Partition,流的目標並行算子實例就是 Kafka Consumers

 

1.4 filter

 

含義:數據篩選(滿足條件event的被篩選出來進行后續處理),根據FliterFunction返回的布爾值來判斷是否保留元素,true為保留,false則丟棄

 

 

 

 

 

轉換關系: DataStream DataStream

 

 

 

使用場景:

 

過濾臟數據、數據清洗等

 

 

案例:

public class TestFilter {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

 

        DataStream<Long> input=env.generateSequence(-5,5);

 

        input.filter(new FilterFunction<Long>() {

            @Override

            public boolean filter(Long value) throws Exception {

                return value>0;

            }

        }).print();

 

        env.execute();

    }

}

 

1.5 keyBy

含義:

根據指定的key進行分組(邏輯上把DataStream分成若干不相交的分區,key一樣的event會被划分到相同的partition,內部采用hash分區來實現)

 

 

轉換關系: DataStream KeyedStream

 

限制:

1.可能會出現數據傾斜,可根據實際情況結合物理分區來解決(后面馬上會講到)

2.Key的類型限制:

1)不能是沒有覆蓋hashCode方法的POJO

2)不能是數組

 

 

使用場景:

 

1.分組(類比SQL中的分組)

 

 

 

案例:

 

public class TestKeyBy {

 

    public static void main(String[] args) throws Exception {

 

        //統計各班語文成績最高分是誰

 

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

 

 

 

        DataStream<Tuple4<String,String,String,Integer>> input=env.fromElements(TRANSCRIPT);

 

 

 

        KeyedStream<Tuple4<String,String,String,Integer>,Tuple> keyedStream = input.keyBy("f0");

 

 

 

        keyedStream.maxBy("f3").print();

 

 

 

        env.execute();

 

    public static final Tuple4[] TRANSCRIPT = new Tuple4[] {

            Tuple4.of("class1","張三","語文",100),

            Tuple4.of("class1","李四","語文",78),

            Tuple4.of("class1","王五","語文",99),

            Tuple4.of("class2","趙六","語文",81),

            Tuple4.of("class2","錢七","語文",59),

            Tuple4.of("class2","馬二","語文",97)

    };

}

1.6 KeyedStream

KeyedStream用來表示根據指定的key進行分組的數據流。

 

 

一個KeyedStream可以通過調用DataStream.keyBy()來獲得。

 

 

 

KeyedStream上進行任何transformation都將轉變回DataStream

 

 

 

在實現中,KeyedStream是把key的信息寫入到了transformation中。

 

 

 

每個event只能訪問所屬key的狀態,其上的聚合函數可以方便地操作和保存對應key的狀態。

 

1.7 reduce&fold& Aggregations

 

分組之后當然要對分組之后的數據也就是KeyedStream進行各種聚合操作啦(想想SQL)

 

 

 

KeyedStream DataStream

 

 

 

對於KeyedStream的聚合操作都是滾動的(rolling,在前面的狀態基礎上繼續聚合),千萬不要理解為批處理時的聚合操作(DataSet,其實也是滾動聚合,只不過他只把最后的結果給了我們)

 

案例1

public class TestReduce {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

 

        DataStream<Tuple4<String,String,String,Integer>> input=env.fromElements(TRANSCRIPT);

 

        KeyedStream<Tuple4<String,String,String,Integer>,Tuple> keyedStream = input.keyBy(0);

 

        keyedStream.reduce(new ReduceFunction<Tuple4<String, String, String, Integer>>() {

            @Override

            public Tuple4<String, String, String, Integer> reduce(Tuple4<String, String, String, 

 

Integer> value1, Tuple4<String, String, String, Integer> value2) throws Exception {

                value1.f3+=value2.f3;

                return value1;

            }

        }).print();

 

        env.execute();

    }

 

 

    public static final Tuple4[] TRANSCRIPT = new Tuple4[] {

            Tuple4.of("class1","張三","語文",100),

            Tuple4.of("class1","李四","語文",78),

            Tuple4.of("class1","王五","語文",99),

            Tuple4.of("class2","趙六","語文",81),

            Tuple4.of("class2","錢七","語文",59),

            Tuple4.of("class2","馬二","語文",97)

    };

}

案例2

public class TestFold {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

 

        DataStream<Tuple4<String,String,String,Integer>> input=env.fromElements(TRANSCRIPT);

 

        DataStream<String> result =input.keyBy(0).fold("Start", new FoldFunction<Tuple4<String,String,String,Integer>,String>() {

 

            @Override

            public String fold(String accumulator, Tuple4<String, String, String, Integer> value) throws Exception {

                return accumulator + "=" + value.f1;

            }

        });

 

        result.print();

 

        env.execute();

    }

 

    public static final Tuple4[] TRANSCRIPT = new Tuple4[] {

            Tuple4.of("class1","張三","語文",100),

            Tuple4.of("class1","李四","語文",78),

            Tuple4.of("class1","王五","語文",99),

            Tuple4.of("class2","趙六","語文",81),

            Tuple4.of("class2","錢七","語文",59),

            Tuple4.of("class2","馬二","語文",97)

    };

}

 

1.8 Interval join

KeyedStream,KeyedStream DataStream

 

在給定的周期內,按照指定的key對兩個KeyedStream進行join操作,把符合join條件的兩個event拉到一起,然后怎么處理由用戶你來定義。

 

key1 == key2 && e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

 

場景:把一定時間范圍內相關的分組數據拉成一個寬表

 

案例:

public class TestIntervalJoin {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

 

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 

        DataStream<Transcript> input1=env.fromElements(TRANSCRIPTS).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Transcript>() {

            @Override

            public long extractAscendingTimestamp(Transcript element) {

                return element.time;

            }

        });

 

        DataStream<Student> input2=env.fromElements(STUDENTS).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Student>() {

            @Override

            public long extractAscendingTimestamp(Student element) {

 

                return element.time;

            }

        });

 

        KeyedStream<Transcript,String>  keyedStream=input1.keyBy(new KeySelector<Transcript, String>() {

            @Override

            public String getKey(Transcript value) throws Exception {

                return value.id;

            }

        });

 

        KeyedStream<Student,String>  otherKeyedStream=input2.keyBy(new KeySelector<Student, String>() {

            @Override

            public String getKey(Student value) throws Exception {

                return value.id;

            }

        });

 

 

        //e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

 

        // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2

 

        keyedStream.intervalJoin(otherKeyedStream)

                .between(Time.milliseconds(-2), Time.milliseconds(2))

                .upperBoundExclusive()

                .lowerBoundExclusive()

                .process(new ProcessJoinFunction<Transcript, Student, Tuple5<String,String,String,String,Integer>>() {

 

                    @Override

                    public void processElement(Transcript transcript, Student student, Context ctx, Collector<Tuple5<String, String, String, String, Integer>> out) throws Exception {

                        out.collect(Tuple5.of(transcript.id,transcript.name,student.class_,transcript.subject,transcript.score));

                    }

 

                }).print();

 

        env.execute();

 

    }

 

    public static final Transcript[] TRANSCRIPTS = new Transcript[] {

            new Transcript("1","張三","語文",100,System.currentTimeMillis()),

            new Transcript("2","李四","語文",78,System.currentTimeMillis()),

            new Transcript("3","王五","語文",99,System.currentTimeMillis()),

            new Transcript("4","趙六","語文",81,System.currentTimeMillis()),

            new Transcript("5","錢七","語文",59,System.currentTimeMillis()),

            new Transcript("6","馬二","語文",97,System.currentTimeMillis())

    };

    public static final Student[] STUDENTS = new Student[] {

            new Student("1","張三","class1",System.currentTimeMillis()),

            new Student("2","李四","class1",System.currentTimeMillis()),

            new Student("3","王五","class1",System.currentTimeMillis()),

            new Student("4","趙六","class2",System.currentTimeMillis()),

            new Student("5","錢七","class2",System.currentTimeMillis()),

            new Student("6","馬二","class2",System.currentTimeMillis())

    };

 

    private static class Transcript{

        private String id;

        private String name;

        private String subject;

        private int score;

        private long time;

 

        public Transcript(String id, String name, String subject, int score, long time) {

            this.id = id;

            this.name = name;

            this.subject = subject;

            this.score = score;

            

 

    }

 

        public String getId() {

            return id;

        }

 

        public void setId(String id) {

            this.id = id;

        }

 

        public String getName() {

            return name;

 

 }

 

        public void setName(String name) {

            this.name = name;

        }

 

        public String getSubject() {

            return subject;

        }

 

        public void setSubject(String subject) {

            this.subject = subject;

        }

 

        public int getScore() {

            return score;

        }

 

        public void setScore(int score) {

            this.score = score;

        }

 

        public long getTime() {

            return time;

        }

 

        public void setTime(long time) {

            this.time = time;

        }

    }

 

    private static class Student{

        private String id;

        private String name;

        private String class_;

        private long time;

 

        public Student(String id, String name, String class_, long time) {

            this.id = id;

            this.name = name;

            this.class_ = class_;

            this.time = time;

        }

 

   public String getId() {

            return id;

        }

 

        public void setId(String id) {

            this.id = id;

        }

 

        public String getName() {

            return name;

        }

 

        public void setName(String name) {

            this.name = name;

        }

 

        public String getClass_() {

            return class_;

        }

 

        public void setClass_(String class_) {

            this.class_ = class_;

        }

 

        public long getTime() {

            return time;

        }

 

        public void setTime(long time) {

            this.time = time;

        }

    }

}

1.9 connect & union(合並流)

connect之后生成ConnectedStreams,會對兩個流的數據應用不同的處理方法,並且雙流 之間可以共享狀態(比如計數)。這在第一個流的輸入會影響第二個流 時, 會非常有用; union 合並多個流,新的流包含所有流的數據。

 

unionDataStream* DataStream

 

connect只能連接兩個流,而union可以連接多於兩個流 。

 

 

 

connect連接的兩個流類型可以不一致,而union連接的流的類型必須一致。

 

 

案例:

public class TestConnect {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

 

        DataStream<Long> someStream = env.generateSequence(0,10);

        DataStream<String> otherStream = env.fromElements(WORDS);

 

        ConnectedStreams<Long, String> connectedStreams = someStream.connect(otherStream);

 

        DataStream<String> result=connectedStreams.flatMap(new CoFlatMapFunction<Long, String, String>() {

 

            @Override

            public void flatMap1(Long value, Collector<String> out) throws Exception {

                out.collect(value.toString());

            }

 

            @Override

            public void flatMap2(String value, Collector<String> out) {

                for (String word: value.split("\\W+")) {

                    out.collect(word);

                }

            }

        });

 

        result.print();

 

        env.execute();

    }

 

    public static final String[] WORDS = new String[] {

            "And thus the native hue of resolution",

            "Is sicklied o'er with the pale cast of thought;",

            "And enterprises of great pith and moment,",

            "With this regard, their currents turn awry,",

            "And lose the name of action.--Soft you now!",

            "The fair Ophelia!--Nymph, in thy orisons",

            "Be all my sins remember'd."

    };

}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM