Transformation


  Flink 的 Transformations 操作主要用于将一个和多个 DataStream 按需转换成新的 DataStream。主要分为以下三类:

  • DataStream Transformations:进行数据流相关转换操作;
  • Physical partitioning:物理分区。Flink 提供的底层 API ,允许用户定义数据的分区规则;
  • Task chaining and resource groups:任务链和资源组。允许用户进行任务链和资源组的细粒度的控制。

Map [DataStream->DataStream]

  对一个 DataStream 中的每个元素都执行特定的转换操作:

DataStream<Integer> integerDataStream = env.fromElements(1, 2, 3, 4, 5);
integerDataStream.map((MapFunction<Integer, Object>) value -> value * 2).print();
// 输出 2,4,6,8,10
    val data = 1 to 10
    import org.apache.flink.api.scala._

    env.fromCollection(data).map(x => x + 1).print()
    env.fromCollection(data).map(_ + 1).print()

       FlatMap 中的一个输入元素可以被映射成一个或者多个输出元素,示例如下:

        String str = "one one one two two";
        String str1 = "third third third four";
        DataSource<String> stringDataSource = env.fromElements(str, str1);
        stringDataSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                for (String str : s.split(" ")) {
                    collector.collect(str);
                }
            }
        }).print();
 public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        List<String> list = new ArrayList<String>();
        list.add("I love Beijing");
        list.add("I love China");
        list.add("Beijing is the capital of China");
        DataSet<String> source = env.fromCollection(list);
        source.map(new MapFunction<String, List<String>>() {

            @Override
            public List<String> map(String line) throws Exception {
                String[] words = line.split(" ");
                List<String> wds = new ArrayList<String>();
                for (String word : words) {
                    wds.add(word);
                }
                return wds;
            }
        }).print();
        System.out.println("*************************************");
        source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        }).print();
        System.out.println("*************************************");
        source.mapPartition(new MapPartitionFunction<String, String>() {
            Integer index = 0;

            @Override
            public void mapPartition(Iterable<String> iterable, Collector<String> collector) throws Exception {
                Iterator<String> iterator = iterable.iterator();
                while (iterator.hasNext()) {
                    String line = iterator.next();
                    String[] words = line.split(" ");
                    for (String word : words) {
                        collector.collect("分区" + index + ",单词为:" + word);
                    }
                }
                index++;
            }
        }).print();
    }
[I, love, Beijing] [I, love, China] [Beijing, is, the, capital, of, China]
************************************* I love Beijing I love China Beijing is the capital of China ************************************* 分区0,单词为:I 分区0,单词为:love 分区0,单词为:Beijing 分区0,单词为:I 分区0,单词为:love 分区0,单词为:China 分区0,单词为:Beijing 分区0,单词为:is 分区0,单词为:the 分区0,单词为:capital 分区0,单词为:of 分区0,单词为:China
        source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        }).map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        }).groupBy(0).sum(1).print();
        System.out.println("*************************************");

(China,2)
(is,1)
(love,2)
(the,1)
(of,1)
(Beijing,2)
(I,2)
(capital,1)

 scala实现

  def flapMapFunction(env: ExecutionEnvironment): Unit = {
    val info = new ListBuffer[String]();
    info.append("hadoop,flink")
    info.append("java,hadoop")
    info.append("flink,java")

    import org.apache.flink.api.scala._

    val data = env.fromCollection(info)
    data.flatMap(_.split(",")).map((_, 1)).groupBy(0).sum(1).print()
  }

    Filter: 过滤出需要的数据

        StreamExecutionEnvironment envStream = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<UserAction> source = envStream.fromCollection(Arrays.asList(new UserAction("user1", 10001, "click", "iphone", BigDecimal.valueOf(10)),
                new UserAction("user2", 10002, "browse", "pc", BigDecimal.valueOf(12)),
                new UserAction("user1", 10001, "click", "mac", BigDecimal.valueOf(10))));
        SingleOutputStreamOperator<UserAction> result = source.filter(new FilterFunction<UserAction>() {
            @Override
            public boolean filter(UserAction userAction) throws Exception {
                return userAction.getUserId().equalsIgnoreCase("user1");
            }
        });
        result.print();
        envStream.execute();
  • KeyBy [DataStream → KeyedStream] :用于将相同 Key 值的数据分到相同的分区中;

   分区结果和KeyBy下游算子的并行度强相关。如下游算子只有一个并行度,不管怎么分,都会分到一起。

     对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区。
     对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。
     对于一般类型,如上, KeyBy可以通过keyBy(new KeySelector {...})指定字段进行分区。

        StreamExecutionEnvironment envStream = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<UserAction> source = envStream.fromCollection(Arrays.asList(new UserAction("user1", 10001, "click", "iphone", BigDecimal.valueOf(10)),
                new UserAction("user2", 10002, "browse", "pc", BigDecimal.valueOf(12)),
                new UserAction("user1", 10001, "click", "mac", BigDecimal.valueOf(10))));
        KeyedStream<UserAction, String> result = source.keyBy(new KeySelector<UserAction, String>() {
            @Override
            public String getKey(UserAction userAction) throws Exception {
                return userAction.getUserId();
            }
        });
        result.print().setParallelism(3);
        envStream.execute();
  • Reduce [KeyedStream → DataStream] :用于对数据执行归约计算。

  Reduce: 基于ReduceFunction进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。注意: Reduce会输出每一次滚动聚合的结果。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("user1", 1293984000, "click", "productID1", BigDecimal.valueOf(12)),
                new UserAction("user2", 1293984001, "browse", "productID2", BigDecimal.valueOf(12)),
                new UserAction("user2", 1293984002, "browse", "productID2", BigDecimal.valueOf(12)),
                new UserAction("user2", 1293984003, "browse", "productID2", BigDecimal.valueOf(12)),
                new UserAction("user1", 1293984002, "click", "productID1", BigDecimal.valueOf(12)),
                new UserAction("user1", 1293984003, "click", "productID3", BigDecimal.valueOf(12)),
                new UserAction("user1", 1293984004, "click", "productID1", BigDecimal.valueOf(12))
        ));

        KeyedStream<UserAction, String> keyedStream = source.keyBy(new KeySelector<UserAction, String>() {
            @Override
            public String getKey(UserAction value) throws Exception {
                return value.getUserId();
            }
        });

        SingleOutputStreamOperator<UserAction> result = keyedStream.reduce(new ReduceFunction<UserAction>() {
            @Override
            public UserAction reduce(UserAction userAction, UserAction t1) throws Exception {
                BigDecimal price = userAction.getPrice().add(t1.getPrice());
                return new UserAction(userAction.getUserId(), userAction.getUserNo()
                        , userAction.getAction(), userAction.getProduct(), price);
            }
        });

        result.print();
        env.execute();

  Aggregate 对KeyedStream按指定字段滚动聚合并输出每一次滚动聚合后的结果。默认的聚合函数有:summinminBymaxmabBy

  1. max(field)maxBy(field)的区别: maxBy返回field最大的那条数据;而max则是将最大的field的值赋值给第一条数据并返回第一条数据。同理,minminBy
  2. Aggregate聚合算子会滚动输出每一次聚合后的结果。
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("user1", 1293984000, "click", "productID1", BigDecimal.valueOf(30)),
                new UserAction("user2", 1293984001, "browse", "productID2", BigDecimal.valueOf(12)),
                new UserAction("user2", 1293984002, "browse11", "productID2", BigDecimal.valueOf(25)),
                new UserAction("user2", 1293984003, "browse", "productID2", BigDecimal.valueOf(12)),
                new UserAction("user1", 1293984002, "click11", "productID1", BigDecimal.valueOf(40)),
                new UserAction("user1", 1293984003, "click", "productID3", BigDecimal.valueOf(12)),
                new UserAction("user1", 1293984004, "click", "productID1", BigDecimal.valueOf(12))
        ));

        KeyedStream<UserAction, String> keyedStream = source.keyBy(new KeySelector<UserAction, String>() {
            @Override
            public String getKey(UserAction value) throws Exception {
                return value.getUserId();
            }
        });

        keyedStream.maxBy("price").print();

        keyedStream.max("price").print();

        env.execute();

   Connect 操作用于连接两个或者多个类型不同的 DataStream ,其返回的类型是 ConnectedStreams ,此时被连接的多个 DataStreams 可以共享彼此之间的数据状态。但是需要注意的是由于不同 DataStream 之间的数据类型是不同的,如果想要进行后续的计算操作,还需要通过 CoMap 或 CoFlatMap 将 ConnectedStreams 转换回 DataStream:

        DataStreamSource<Tuple2<String, Integer>> tuple2DataStreamSource = env.fromElements(new Tuple2<>("a", 2), new Tuple2<>("b", 5));

        DataStreamSource<Integer> integerDataStreamSource = env.fromElements(2, 3, 6);
        ConnectedStreams<Tuple2<String, Integer>, Integer> connect = tuple2DataStreamSource.connect(integerDataStreamSource);
        connect.map(new CoMapFunction<Tuple2<String, Integer>, Integer, Integer>() {
            @Override
            public Integer map1(Tuple2<String, Integer> value) throws Exception {
                return value.f1;
            }

            @Override
            public Integer map2(Integer value) throws Exception {
                return value;
            }
        }).map(x -> x * 100).print();

        env.execute();
  • Split [DataStream → SplitStream]:用于将一个 DataStream 按照指定规则进行拆分为多个 DataStream,需要注意的是这里进行的是逻辑拆分,即 Split 只是将数据贴上不同的类型标签,但最终返回的仍然只是一个 SplitStream;
  • Select [SplitStream → DataStream]:想要从逻辑拆分的 SplitStream 中获取真实的不同类型的 DataStream,需要使用 Select 算子,示例如下:
DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
// 标记
SplitStream<Integer> split = streamSource.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        output.add(value % 2 == 0 ? "even" : "odd");
        return output;
    }
});
// 获取偶数数据集
split.select("even").print();
// 输出 2,4,6,8

  物理分区 (Physical partitioning) 是 Flink 提供的底层的 API,允许用户采用内置的分区规则或者自定义的分区规则来对数据进行分区,从而避免数据在某些分区上过于倾斜,常用的分区规则如下:

Random partitioning [DataStream → DataStream]

  随机分区 (Random partitioning) 用于随机的将数据分布到所有下游分区中,通过 shuffle 方法来进行实现:

dataStream.shuffle();

Rebalancing [DataStream → DataStream]

  Rebalancing 采用轮询的方式将数据进行分区,其适合于存在数据倾斜的场景下,通过 rebalance 方法进行实现:

dataStream.rebalance();

  当采用 Rebalancing 进行分区平衡时,其实现的是全局性的负载均衡,数据会通过网络传输到其他节点上并完成分区数据的均衡。 而 Rescaling 则是低配版本的 rebalance,它不需要额外的网络开销,它只会对上下游的算子之间进行重新均衡,通过 rescale 方法进行实现:

dataStream.rescale();

  ReScale 这个单词具有重新缩放的意义,其对应的操作也是如此,具体如下:如果上游 operation 并行度为 2,而下游的 operation 并行度为 6,则其中 1 个上游的 operation 会将元素分发到 3 个下游 operation,另 1 个上游 operation 则会将元素分发到另外 3 个下游 operation。反之亦然,如果上游的 operation 并行度为 6,而下游 operation 并行度为 2,则其中 3 个上游 operation 会将元素分发到 1 个下游 operation,另 3 个上游 operation 会将元素分发到另外 1 个下游operation:

Broadcasting [DataStream → DataStream]

  将数据分发到所有分区上。通常用于小数据集与大数据集进行关联的情况下,此时可以将小数据集广播到所有分区上,避免频繁的跨分区关联,通过 broadcast 方法进行实现:

dataStream.broadcast();

 Custom partitioning [DataStream → DataStream]

  Flink 运行用户采用自定义的分区规则来实现分区,此时需要通过实现 Partitioner 接口来自定义分区规则,并指定对应的分区键,示例如下:

 DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromElements(new Tuple2<>("Hadoop", 1),
                new Tuple2<>("Spark", 1),
                new Tuple2<>("Flink-streaming", 2),
                new Tuple2<>("Flink-batch", 4),
                new Tuple2<>("Storm", 4),
                new Tuple2<>("HBase", 3));
streamSource.partitionCustom(new Partitioner<String>() {
    @Override
    public int partition(String key, int numPartitions) {
        // 将第一个字段包含flink的Tuple2分配到同一个分区
        return key.toLowerCase().contains("flink") ? 0 : 1;
    }
}, 0).print();


// 输出如下:
1> (Flink-streaming,2)
1> (Flink-batch,4)
2> (Hadoop,1)
2> (Spark,1)
2> (Storm,4)
2> (HBase,3)

任务链和资源组

  任务链和资源组 ( Task chaining and resource groups ) 也是 Flink 提供的底层 API,用于控制任务链和资源分配。默认情况下,如果操作允许 (例如相邻的两次 map 操作) ,则 Flink 会尝试将它们在同一个线程内进行,从而可以获取更好的性能。但是 Flink 也允许用户自己来控制这些行为,这就是任务链和资源组 API:

startNewChain

  startNewChain 用于基于当前 operation 开启一个新的任务链。如下所示,基于第一个 map 开启一个新的任务链,此时前一个 map 和 后一个 map 将处于同一个新的任务链中,但它们与 filter 操作则分别处于不同的任务链中:

someStream.filter(...).map(...).startNewChain().map(...);

disableChaining

  disableChaining 操作用于禁止将其他操作与当前操作放置于同一个任务链中,示例如下

someStream.map(...).disableChaining();

slotSharingGroup

  slot 是任务管理器 (TaskManager) 所拥有资源的固定子集,每个操作 (operation) 的子任务 (sub task) 都需要获取 slot 来执行计算,但每个操作所需要资源的大小都是不相同的,为了更好地利用资源,Flink 允许不同操作的子任务被部署到同一 slot 中。slotSharingGroup 用于设置操作的 slot 共享组 (slot sharing group) ,Flink 会将具有相同 slot 共享组的操作放到同一个 slot 中 。示例如下:

someStream.filter(...).slotSharingGroup("slotSharingGroupName");

 First函数

  def firstFunction(env: ExecutionEnvironment): Unit = {
    val info = ListBuffer[(Int, String)]();
    info.append((1, "hadoop"))
    info.append((1, "spark"))
    info.append((1, "flink"))
    info.append((2, "java"))
    info.append((2, "httpclient"))
    info.append((3, "linux"))

    import org.apache.flink.api.scala._
    val data = env.fromCollection(info)
    data.first(3).print()

    data.groupBy(0).first(2).print()

    data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print()
  }

-------------------------------结果--------------------
(1,hadoop)
(1,spark)
(1,flink)

(3,linux)
(1,hadoop)
(1,spark)
(2,java)
(2,httpclient)

(3,linux)
(1,flink)
(1,hadoop)
(2,httpclient)
(2,java)

 JOIN

  def joinFunction(env: ExecutionEnvironment): Unit = {
    val groupO = ListBuffer[(Int, String)]()
    groupO.append((1, "小明"))
    groupO.append((2, "王二"))
    groupO.append((3, "赵三"))
    groupO.append((4, "李四"))

    val groupT = ListBuffer[(Int, String)]()
    groupT.append((1, "上海"))
    groupT.append((2, "北京"))
    groupT.append((3, "武汉"))
    groupT.append((5, "厦门"))

    val data1 = env.fromCollection(groupO)
    val data2 = env.fromCollection(groupT)
    data1.join(data2).where(0).equalTo(0).apply((first, second) => {
      (first._1, first._2, second._2)
    }).print()
  }
------------------结果-------------------

(3,赵三,武汉)
(1,小明,上海)
(2,王二,北京)

java实现

 public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        List<Tuple2<Integer, String>> groupOne = new ArrayList<>();
        groupOne.add(new Tuple2<>(1, "小明"));
        groupOne.add(new Tuple2<>(2, "小王"));
        groupOne.add(new Tuple2<>(3, "小张"));
        groupOne.add(new Tuple2<>(4, "小赵"));

        List<Tuple2<Integer, String>> groupTwo = new ArrayList<>();
        groupTwo.add(new Tuple2<>(1, "北京"));
        groupTwo.add(new Tuple2<>(2, "上海"));
        groupTwo.add(new Tuple2<>(3, "武汉"));
        groupTwo.add(new Tuple2<>(5, "华盛顿"));

        DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(groupOne);
        DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(groupTwo);
        data1.join(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
            @Override
            public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                return new Tuple3<>(first.f0, first.f1, second.f1);
            }
        }).print();
    }

leftOutJoin()

  def leftOuterjoinFunction(env: ExecutionEnvironment): Unit = {
    val groupO = ListBuffer[(Int, String)]()
    groupO.append((1, "小明"))
    groupO.append((2, "王二"))
    groupO.append((3, "赵三"))
    groupO.append((4, "李四"))

    val groupT = ListBuffer[(Int, String)]()
    groupT.append((1, "上海"))
    groupT.append((2, "北京"))
    groupT.append((3, "武汉"))
    groupT.append((5, "厦门"))

    val data1 = env.fromCollection(groupO)
    val data2 = env.fromCollection(groupT)
    data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {
      if (second == null) {
        (first._1, first._2, "-")
      } else {
        (first._1, first._2, second._2)
      }
    }).print()
  }

 Split和Select

 

object SplitAndSelectApp {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    var counter = 0L
    val data = env.addSource(new SourceFunction[Long] {
      override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
        while (counter < 100) {
          ctx.collect(counter)
          counter += 1
          Thread.sleep(1000)
        }
      }

      override def cancel(): Unit = {
        return false
      }
    })

    val splits = data.split(new OutputSelector[Long] {
      override def select(value: Long): lang.Iterable[String] = {
        val list = new util.ArrayList[String]();
        if (value % 2 == 0) {
          list.add("even")
        } else {
          list.add("odd")
        }
        list
      }
    })

    splits.select("even").print().setParallelism(1)
    env.execute("splitAndSelect")
  }

}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM