1、dataset的join連接,通過key進行關聯,一般情況下的join都是inner join,類似sql里的inner join
key包括以下幾種情況:
a key expression
a key-selector function
one or more field position keys (Tuple DataSet only).
Case Class Fields
2、inner join的幾種情況
2.1 缺省的join,jion到一個Tuple2元組里
public static class User { public String name; public int zip; } public static class Store { public Manager mgr; public int zip; } DataSet<User> input1 = // [...] DataSet<Store> input2 = // [...] // result dataset is typed as Tuple2 DataSet<Tuple2<User, Store>> result = input1.join(input2) .where("zip") // key of the first input (users) .equalTo("zip"); // key of the second input (stores)
2.2 用戶自定義JoinFuncation,使用with語句
// some POJO public class Rating { public String name; public String category; public int points; } // Join function that joins a custom POJO with a Tuple public class PointWeighter implements JoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> { @Override public Tuple2<String, Double> join(Rating rating, Tuple2<String, Double> weight) { // multiply the points and rating and construct a new output tuple return new Tuple2<String, Double>(rating.name, rating.points * weight.f1); } } DataSet<Rating> ratings = // [...] DataSet<Tuple2<String, Double>> weights = // [...] DataSet<Tuple2<String, Double>> weightedRatings = ratings.join(weights) // key of the first input .where("category") // key of the second input .equalTo("f0") // applying the JoinFunction on joining pairs .with(new PointWeighter());
2.3 使用Flat-Join Function,這種JoinFuncation和FlatJoinFuncation與MapFuncation和FlatMapFuncation的關系類似
public class PointWeighter implements FlatJoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> { @Override public void join(Rating rating, Tuple2<String, Double> weight, Collector<Tuple2<String, Double>> out) { if (weight.f1 > 0.1) { out.collect(new Tuple2<String, Double>(rating.name, rating.points * weight.f1)); } } } DataSet<Tuple2<String, Double>> weightedRatings = ratings.join(weights) // [...]
2.4 join的投影構造,生成自定義的結果集
DataSet<Tuple3<Integer, Byte, String>> input1 = // [...] DataSet<Tuple2<Integer, Double>> input2 = // [...] DataSet<Tuple4<Integer, String, Double, Byte>> result = input1.join(input2) // key definition on first DataSet using a field position key .where(0) // key definition of second DataSet using a field position key .equalTo(0) // select and reorder fields of matching tuples .projectFirst(0,2).projectSecond(1).projectFirst(1);
projectFirst(int...) and projectSecond(int...) 選擇應組合成輸出元組的第一個和第二個連接輸入的字段。索引的順序定義了輸出元組中字段的順序。 連接投影也適用於非元組數據集,在這種情況下,必須在不帶參數的情況下調用projectFirst()或projectSecond(),以將連接元素添加到輸出元組。
2.5 加入join數據集大小提示,這是為了優化join的效率,引導優化器選擇正確的執行策略。
DataSet<Tuple2<Integer, String>> input1 = // [...] DataSet<Tuple2<Integer, String>> input2 = // [...] DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>> result1 = // hint that the second DataSet is very small input1.joinWithTiny(input2) .where(0) .equalTo(0); DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>> result2 = // hint that the second DataSet is very large input1.joinWithHuge(input2) .where(0) .equalTo(0);
2.6 join的算法提示,Flink運行時可以以各種方式執行連接。在不同情況下,每種可能的方式都優於其他方式。系統會嘗試自動選擇合理的方式,但允許您手動選擇策略,以防您想要強制執行連接的特定方式。
DataSet<SomeType> input1 = // [...] DataSet<AnotherType> input2 = // [...] DataSet<Tuple2<SomeType, AnotherType> result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST) .where("id").equalTo("key");
OPTIMIZER_CHOOSES:相當於不提供任何提示,將選擇留給系統。 BROADCAST_HASH_FIRST:廣播第一個輸入並從中構建哈希表,由第二個輸入探測。如果第一個輸入非常小,這是一個很好的策略。 BROADCAST_HASH_SECOND:廣播第二個輸入並從中構建一個哈希表,由第一個輸入探測。如果第二個輸入非常小,這是一個好策略。 REPARTITION_HASH_FIRST:系統對每個輸入進行分區(shuffle)(除非輸入已經分區)並從第一個輸入構建哈希表。如果第一個輸入小於第二個輸入,則此策略很好,但兩個輸入仍然很大。
注意:如果不能進行大小估算,並且不能重新使用預先存在的分區和排序順序,則這是系統使用的默認回退策略。 REPARTITION_HASH_SECOND:系統對每個輸入進行分區(shuffle)(除非輸入已經被分區)並從第二個輸入構建哈希表。如果第二個輸入小於第一個輸入,則此策略很好,但兩個輸入仍然很大。 REPARTITION_SORT_MERGE:系統對每個輸入進行分區(shuffle)(除非輸入已經被分區)並對每個輸入進行排序(除非它已經排序)。輸入通過已排序輸入的流合並來連接。如果已經對一個或兩個輸入進行了排序,則此策略很好。
3、FlatJoinFunction與FlatMapFunction的區別(JoinFuncation和MapFuncation的情況類似)
1、實際上兩者可以干相同的事情 2、使用的區別是FlatJoinFunction有兩個輸入(就是join的兩個數據集)一個輸出, 而FlatMapFunction只有一個輸入,但是這個輸入參數里可以直接包括多個輸入結構(即join的兩個數據集都可以放入到一個輸入參數里), 所以最終實現的結果實際是一致的。
3.1 FlatMapFunction應用join的例子
DataSet<Long> pagesInput = // [...] DataSet<Tuple2<Long, Long>> linksInput = // [...] DataSet<Tuple2<Long, Double>> pagesWithRanks = // [...] DataSet<Tuple2<Long, Long[]>> adjacencyListInput =// [...] IterativeDataSet<Tuple2<Long, Double>> iteration = // [...] DataSet<Tuple2<Long, Double>> newRanks = iteration.join(adjacencyListInput) .where(0).equalTo(0) .flatMap(new JoinVertexWithEdgesMatch()) //下面的不用關注 .groupBy(0) .aggregate(Aggregations.SUM, 1) .map(new Dampener(PageRank.DAMPENING_FACTOR, numPages));
public static final class JoinVertexWithEdgesMatch implements FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> { @Override public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out) { Long[] neighbors = value.f1.f1; double rank = value.f0.f1; double rankToDistribute = rank / ((double) neighbors.length); for (Long neighbor : neighbors) { out.collect(new Tuple2<Long, Double>(neighbor, rankToDistribute)); } } }
從上面的例子可以看到FlatMapFunction雖然只有一個輸入,但是輸入參數Tuple2里包含兩個Tuple2,這被包含的兩個Tuple2就是join的兩個數據集。
3.2 FlatJoinFunction和JoinFuncation例子,它們使用with語句來實現
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset().join(edges) .where(0).equalTo(0) .with(new NeighborWithComponentIDJoin()) .groupBy(0).aggregate(Aggregations.MIN, 1) .join(iteration.getSolutionSet()).where(0).equalTo(0) .with(new ComponentIdFilter()); public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { @Override public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) { return new Tuple2<>(edge.f1, vertexWithComponent.f1); } } public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { @Override public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) { if (candidate.f1 < old.f1) { out.collect(candidate); } } }
從上述例子可以看到FlatJoinFunction或者JoinFunction是兩個輸入參數,也就是join的兩個數據集
3.3 從源碼上看,FlatJoinFunction與FlatMapFunction兩者實際沒太大區別
@Public @FunctionalInterface public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable { /** * The join method, called once per joined pair of elements. * * @param first The element from first input. * @param second The element from second input. * @param out The collector used to return zero, one, or more elements. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception; }
@Public @FunctionalInterface public interface FlatMapFunction<T, O> extends Function, Serializable { /** * The core method of the FlatMapFunction. Takes an element from the input data set and transforms * it into zero, one, or more elements. * * @param value The input value. * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ void flatMap(T value, Collector<O> out) throws Exception; }
4、outer join,外連接,類似sql的left join,right join,full join的情況
OuterJoin在兩個數據集上執行左,右或全外連接。外連接類似於常規的(inner join)連接,並創建在其鍵上相等的所有元素對。
此外,如果在另一側沒有找到匹配的key,則保留“外部”側(左側,右側或兩者)的記錄。
匹配元素對(或一個元素和另一個輸入的空值)被賦予JoinFunction以將元素對轉換為單個元素,
或者給予FlatJoinFunction以將元素對轉換為任意多個(包括none)元素。
4.1 外連接OuterJoin
OuterJoin調用用戶定義的連接函數來處理連接元組。連接函數接收第一個輸入DataSet的一個元素和第二個輸入DataSet的一個元素,並返回一個元素。根據外連接的類型(left,right,full),join函數的兩個輸入元素之一可以為null。 以下代碼使用鍵選擇器函數執行DataSet與自定義java對象和Tuple DataSet的左外連接,並顯示如何使用用戶定義的連接函數:
// some POJO public class Rating { public String name; public String category; public int points; } // Join function that joins a custom POJO with a Tuple public class PointAssigner implements JoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> { @Override public Tuple2<String, Integer> join(Tuple2<String, String> movie, Rating rating) { // Assigns the rating points to the movie. // NOTE: rating might be null return new Tuple2<String, Double>(movie.f0, rating == null ? -1 : rating.points; } } DataSet<Tuple2<String, String>> movies = // [...] DataSet<Rating> ratings = // [...] DataSet<Tuple2<String, Integer>> moviesWithPoints = movies.leftOuterJoin(ratings) // key of the first input .where("f0") // key of the second input .equalTo("name") // applying the JoinFunction on joining pairs .with(new PointAssigner());
4.2 FlatJoinFuncation實現OuterJoin
public class PointAssigner implements FlatJoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> { @Override public void join(Tuple2<String, String> movie, Rating rating Collector<Tuple2<String, Integer>> out) { if (rating == null ) { out.collect(new Tuple2<String, Integer>(movie.f0, -1)); } else if (rating.points < 10) { out.collect(new Tuple2<String, Integer>(movie.f0, rating.points)); } else { // do not emit } } DataSet<Tuple2<String, Integer>> moviesWithPoints = movies.leftOuterJoin(ratings) // [...]