flink dataset join筆記


1、dataset的join連接,通過key進行關聯,一般情況下的join都是inner join,類似sql里的inner join

key包括以下幾種情況:

a key expression

a key-selector function

one or more field position keys (Tuple DataSet only).

Case Class Fields

2、inner join的幾種情況

2.1 缺省的join,jion到一個Tuple2元組里

public static class User { public String name; public int zip; }
public static class Store { public Manager mgr; public int zip; }
DataSet<User> input1 = // [...]
DataSet<Store> input2 = // [...]
// result dataset is typed as Tuple2
DataSet<Tuple2<User, Store>>
            result = input1.join(input2)
                           .where("zip")       // key of the first input (users)
                           .equalTo("zip");    // key of the second input (stores)

2.2 用戶自定義JoinFuncation,使用with語句

// some POJO
public class Rating {
  public String name;
  public String category;
  public int points;
}

// Join function that joins a custom POJO with a Tuple
public class PointWeighter
         implements JoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {

  @Override
  public Tuple2<String, Double> join(Rating rating, Tuple2<String, Double> weight) {
    // multiply the points and rating and construct a new output tuple
    return new Tuple2<String, Double>(rating.name, rating.points * weight.f1);
  }
}

DataSet<Rating> ratings = // [...]
DataSet<Tuple2<String, Double>> weights = // [...]
DataSet<Tuple2<String, Double>>
            weightedRatings =
            ratings.join(weights)

                   // key of the first input
                   .where("category")

                   // key of the second input
                   .equalTo("f0")

                   // applying the JoinFunction on joining pairs
                   .with(new PointWeighter());

2.3 使用Flat-Join Function,這種JoinFuncation和FlatJoinFuncation與MapFuncation和FlatMapFuncation的關系類似

public class PointWeighter
         implements FlatJoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
  @Override
  public void join(Rating rating, Tuple2<String, Double> weight,
      Collector<Tuple2<String, Double>> out) {
    if (weight.f1 > 0.1) {
        out.collect(new Tuple2<String, Double>(rating.name, rating.points * weight.f1));
    }
  }
}

DataSet<Tuple2<String, Double>>
            weightedRatings =
            ratings.join(weights) // [...]

2.4 join的投影構造,生成自定義的結果集

DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
DataSet<Tuple4<Integer, String, Double, Byte>>
            result =
            input1.join(input2)
                  // key definition on first DataSet using a field position key
                  .where(0)
                  // key definition of second DataSet using a field position key
                  .equalTo(0)
                  // select and reorder fields of matching tuples
                  .projectFirst(0,2).projectSecond(1).projectFirst(1);
projectFirst(int...) and projectSecond(int...) 
選擇應組合成輸出元組的第一個和第二個連接輸入的字段。索引的順序定義了輸出元組中字段的順序。
連接投影也適用於非元組數據集,在這種情況下,必須在不帶參數的情況下調用projectFirst()或projectSecond(),以將連接元素添加到輸出元組。

2.5 加入join數據集大小提示,這是為了優化join的效率,引導優化器選擇正確的執行策略。

DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
            result1 =
            // hint that the second DataSet is very small
            input1.joinWithTiny(input2)
                  .where(0)
                  .equalTo(0);

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
            result2 =
            // hint that the second DataSet is very large
            input1.joinWithHuge(input2)
                  .where(0)
                  .equalTo(0);

2.6 join的算法提示,Flink運行時可以以各種方式執行連接。在不同情況下,每種可能的方式都優於其他方式。系統會嘗試自動選擇合理的方式,但允許您手動選擇策略,以防您想要強制執行連接的特定方式。

DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]

DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
            .where("id").equalTo("key");
    OPTIMIZER_CHOOSES:相當於不提供任何提示,將選擇留給系統。

    BROADCAST_HASH_FIRST:廣播第一個輸入並從中構建哈希表,由第二個輸入探測。如果第一個輸入非常小,這是一個很好的策略。

    BROADCAST_HASH_SECOND:廣播第二個輸入並從中構建一個哈希表,由第一個輸入探測。如果第二個輸入非常小,這是一個好策略。

    REPARTITION_HASH_FIRST:系統對每個輸入進行分區(shuffle)(除非輸入已經分區)並從第一個輸入構建哈希表。如果第一個輸入小於第二個輸入,則此策略很好,但兩個輸入仍然很大。
注意:如果不能進行大小估算,並且不能重新使用預先存在的分區和排序順序,則這是系統使用的默認回退策略。 REPARTITION_HASH_SECOND:系統對每個輸入進行分區(shuffle)(除非輸入已經被分區)並從第二個輸入構建哈希表。如果第二個輸入小於第一個輸入,則此策略很好,但兩個輸入仍然很大。 REPARTITION_SORT_MERGE:系統對每個輸入進行分區(shuffle)(除非輸入已經被分區)並對每個輸入進行排序(除非它已經排序)。輸入通過已排序輸入的流合並來連接。如果已經對一個或兩個輸入進行了排序,則此策略很好。

 

3、FlatJoinFunction與FlatMapFunction的區別(JoinFuncation和MapFuncation的情況類似)

1、實際上兩者可以干相同的事情
2、使用的區別是FlatJoinFunction有兩個輸入(就是join的兩個數據集)一個輸出,
    而FlatMapFunction只有一個輸入,但是這個輸入參數里可以直接包括多個輸入結構(即join的兩個數據集都可以放入到一個輸入參數里),
    所以最終實現的結果實際是一致的。

3.1 FlatMapFunction應用join的例子

      DataSet<Long> pagesInput = // [...]
      DataSet<Tuple2<Long, Long>> linksInput = // [...]

      DataSet<Tuple2<Long, Double>> pagesWithRanks = // [...]

      DataSet<Tuple2<Long, Long[]>> adjacencyListInput =// [...]

      IterativeDataSet<Tuple2<Long, Double>> iteration = // [...]

       DataSet<Tuple2<Long, Double>> newRanks = iteration.join(adjacencyListInput)
                .where(0).equalTo(0)
                .flatMap(new JoinVertexWithEdgesMatch())
                //下面的不用關注
                .groupBy(0)
                .aggregate(Aggregations.SUM, 1)
                
                .map(new Dampener(PageRank.DAMPENING_FACTOR, numPages));
    public static final class JoinVertexWithEdgesMatch implements FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> {

        @Override
        public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out) {
            Long[] neighbors = value.f1.f1;
            double rank = value.f0.f1;
            double rankToDistribute = rank / ((double) neighbors.length);

            for (Long neighbor : neighbors) {
                out.collect(new Tuple2<Long, Double>(neighbor, rankToDistribute));
            }
        }
    }
從上面的例子可以看到FlatMapFunction雖然只有一個輸入,但是輸入參數Tuple2里包含兩個Tuple2,這被包含的兩個Tuple2就是join的兩個數據集。

3.2 FlatJoinFunction和JoinFuncation例子,它們使用with語句來實現

DataSet<Tuple2<Long, Long>> changes =   iteration.getWorkset().join(edges)
                .where(0).equalTo(0)
                .with(new NeighborWithComponentIDJoin())
                .groupBy(0).aggregate(Aggregations.MIN, 1)
                .join(iteration.getSolutionSet()).where(0).equalTo(0)
                .with(new ComponentIdFilter());



    public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {

        @Override
        public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
            return new Tuple2<>(edge.f1, vertexWithComponent.f1);
        }
    }


    public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {

        @Override
        public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) {
            if (candidate.f1 < old.f1) {
                out.collect(candidate);
            }
        }
    }
從上述例子可以看到FlatJoinFunction或者JoinFunction是兩個輸入參數,也就是join的兩個數據集

 

3.3 從源碼上看,FlatJoinFunction與FlatMapFunction兩者實際沒太大區別

@Public
@FunctionalInterface
public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable {

    /**
     * The join method, called once per joined pair of elements.
     *
     * @param first The element from first input.
     * @param second The element from second input.
     * @param out The collector used to return zero, one, or more elements.
     *
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
     *                   to fail and may trigger recovery.
     */
    void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception;
}
@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {

    /**
     * The core method of the FlatMapFunction. Takes an element from the input data set and transforms
     * it into zero, one, or more elements.
     *
     * @param value The input value.
     * @param out The collector for returning result values.
     *
     * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
     *                   to fail and may trigger recovery.
     */
    void flatMap(T value, Collector<O> out) throws Exception;
}

4、outer join,外連接,類似sql的left join,right join,full join的情況

OuterJoin在兩個數據集上執行左,右或全外連接。外連接類似於常規的(inner join)連接,並創建在其鍵上相等的所有元素對。
此外,如果在另一側沒有找到匹配的key,則保留“外部”側(左側,右側或兩者)的記錄。
匹配元素對(或一個元素和另一個輸入的空值)被賦予JoinFunction以將元素對轉換為單個元素,
或者給予FlatJoinFunction以將元素對轉換為任意多個(包括none)元素。

4.1 外連接OuterJoin

OuterJoin調用用戶定義的連接函數來處理連接元組。連接函數接收第一個輸入DataSet的一個元素和第二個輸入DataSet的一個元素,並返回一個元素。根據外連接的類型(left,right,full),join函數的兩個輸入元素之一可以為null。

以下代碼使用鍵選擇器函數執行DataSet與自定義java對象和Tuple DataSet的左外連接,並顯示如何使用用戶定義的連接函數:
// some POJO
public class Rating {
  public String name;
  public String category;
  public int points;
}

// Join function that joins a custom POJO with a Tuple
public class PointAssigner
         implements JoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> {

  @Override
  public Tuple2<String, Integer> join(Tuple2<String, String> movie, Rating rating) {
    // Assigns the rating points to the movie.
    // NOTE: rating might be null
    return new Tuple2<String, Double>(movie.f0, rating == null ? -1 : rating.points;
  }
}

DataSet<Tuple2<String, String>> movies = // [...]
DataSet<Rating> ratings = // [...]
DataSet<Tuple2<String, Integer>>
            moviesWithPoints =
            movies.leftOuterJoin(ratings)

                   // key of the first input
                   .where("f0")

                   // key of the second input
                   .equalTo("name")

                   // applying the JoinFunction on joining pairs
                   .with(new PointAssigner());

4.2 FlatJoinFuncation實現OuterJoin

public class PointAssigner
         implements FlatJoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> {
  @Override
  public void join(Tuple2<String, String> movie, Rating rating
    Collector<Tuple2<String, Integer>> out) {
  if (rating == null ) {
    out.collect(new Tuple2<String, Integer>(movie.f0, -1));
  } else if (rating.points < 10) {
    out.collect(new Tuple2<String, Integer>(movie.f0, rating.points));
  } else {
    // do not emit
  }
}

DataSet<Tuple2<String, Integer>>
            moviesWithPoints =
            movies.leftOuterJoin(ratings) // [...]

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM