Flink数据转换方法使用案例

Apache Flink是一个分布式流处理框架,它提供了丰富的数据转换方法,可以帮助我们对数据进行各种各样的转换操作。本文将介绍Flink中常用的数据转换方法,并提供相应的使用案例。

Map

Map方法可以将输入的数据转换成另一种形式,常用于数据清洗、数据格式转换等场景。

DataStream<String> input = env.fromElements("hello world", "flink is awesome");
DataStream<String> output = input.map(new MapFunction<String, String>() {
    @Override
    public String map(String value) throws Exception {
        return value.toUpperCase();
    }
});

上述代码将输入的字符串转换成大写形式。

FlatMap

FlatMap方法可以将输入的数据转换成多个输出,常用于数据拆分、数据过滤等场景。

DataStream<String> input = env.fromElements("hello world", "flink is awesome");
DataStream<String> output = input.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        String[] words = value.split(" ");
        for (String word : words) {
            out.collect(word);
        }
    }
});

上述代码将输入的字符串按照空格拆分成多个单词,并输出每个单词。

Filter

Filter方法可以根据条件过滤输入的数据,常用于数据筛选、数据清洗等场景。

DataStream<Integer> input = env.fromElements(1, 2, 3, 4, 5);
DataStream<Integer> output = input.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value % 2 == 0;
    }
});

上述代码将输入的整数中的偶数过滤出来。

KeyBy

KeyBy方法可以将输入的数据按照指定的Key进行分组,常用于数据聚合、数据统计等场景。

DataStream<Tuple2<String, Integer>> input = env.fromElements(
    new Tuple2<>("a", 1),
    new Tuple2<>("b", 2),
    new Tuple2<>("a", 3),
    new Tuple2<>("b", 4)
);
DataStream<Tuple2<String, Integer>> output = input.keyBy(0).sum(1);

上述代码将输入的Tuple按照第一个元素进行分组,并对每组的第二个元素进行求和。

Reduce

Reduce方法可以对输入的数据进行归约操作,常用于数据聚合、数据统计等场景。

DataStream<Integer> input = env.fromElements(1, 2, 3, 4, 5);
DataStream<Integer> output = input.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2) throws Exception {
        return value1 + value2;
    }
});

上述代码将输入的整数进行求和操作。

Aggregations

Aggregations方法可以对输入的数据进行聚合操作,常用于数据统计、数据分析等场景。

DataStream<Tuple2<String, Integer>> input = env.fromElements(
    new Tuple2<>("a", 1),
    new Tuple2<>("b", 2),
    new Tuple2<>("a", 3),
    new Tuple2<>("b", 4)
);
DataStream<Tuple2<String, Integer>> output = input.keyBy(0).maxBy(1);

上述代码将输入的Tuple按照第一个元素进行分组,并对每组的第二个元素进行最大值计算。

Window

Window方法可以对输入的数据进行窗口操作,常用于数据统计、数据分析等场景。

DataStream<Tuple2<String, Integer>> input = env.fromElements(
    new Tuple2<>("a", 1),
    new Tuple2<>("b", 2),
    new Tuple2<>("a", 3),
    new Tuple2<>("b", 4)
);
DataStream<Tuple2<String, Integer>> output = input.keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .sum(1);

上述代码将输入的Tuple按照第一个元素进行分组,并对每组的第二个元素进行5秒的滚动窗口求和操作。

Join

DataStream<Tuple2<String, Integer>> stream1 = ...;
DataStream<Tuple2<String, Integer>> stream2 = ...;

// 使用Key进行连接
stream1.join(stream2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    .apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {
        @Override
        public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first, Tuple2<String, Integer> second) throws Exception {
            return new Tuple3<>(first.f0, first.f1, second.f1);
        }
    });

Union

DataStream<String> stream1 = ...;
DataStream<String> stream2 = ...;

// 将两个流合并成一个流
DataStream<String> unionStream = stream1.union(stream2);

Project

DataStream<Tuple2<String, Integer>> stream = ...;

// 只保留Tuple中的第一个元素
DataStream<String> projectedStream = stream.map(tuple -> tuple.f0);

Distinct

DataStream<String> stream = ...;

// 去重
DataStream<String> distinctStream = stream.distinct();

Sort

DataStream<Tuple2<String, Integer>> stream = ...;

// 按照Tuple中的第二个元素进行升序排序
DataStream<Tuple2<String, Integer>> sortedStream = stream
    .keyBy(tuple -> tuple.f0)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    .sortBy(tuple -> tuple.f1, true);

Partition

DataStream<String> stream = ...;

// 按照Hash值进行分区
DataStream<String> partitionedStream = stream
    .keyBy(str -> str.hashCode() % 10)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    .reduce((value1, value2) -> value1 + value2);

Iterate

DataStream<Long> initialStream = ...;

// 迭代计算
IterativeStream<Long> iterativeStream = initialStream.iterate();
DataStream<Long> iterationBody = iterativeStream
    .map(value -> value + 1)
    .filter(value -> value < 100);
iterativeStream.closeWith(iterationBody);

DataStream<Long> resultStream = iterativeStream
    .map(value -> value * 2);

Fold

DataStream<Integer> stream = ...;

// 对流中的元素进行累加
DataStream<Integer> foldedStream = stream
    .keyBy(value -> value % 2)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    .fold(0, (value1, value2) -> value1 + value2);

以下是一个使用 Flink 数据转换 Conclusion 的案例:

使用 Flink 数据转换 Conclusion 的案例

问题描述

我们有一个数据集,其中包含了用户的浏览记录,每条记录包含了用户 ID、浏览时间、浏览的网页 URL 等信息。我们希望对这个数据集进行分析,找出每个用户最近浏览的 3 个网页。

解决方案

我们可以使用 Flink 的数据转换 Conclusion 来解决这个问题。具体步骤如下:

  1. 读取数据集,并将每条记录转换成一个元组,其中包含了用户 ID、浏览时间和网页 URL。
  2. 按照用户 ID 进行分组,然后对每个用户的浏览记录按照浏览时间进行排序。
  3. 对于每个用户,使用滑动窗口来获取最近浏览的 3 个网页。具体来说,我们可以使用 window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1))) 来定义一个大小为 10 分钟、滑动步长为 1 分钟的滑动窗口。
  4. 对于每个窗口,使用 reduceGroup() 函数来对窗口内的数据进行处理。具体来说,我们可以使用一个自定义的 TopNFunction 函数来实现对每个用户最近浏览的 3 个网页的查找和输出。

下面是完整的代码示例:

DataStream<Tuple3<String, Long, String>> data = env.readTextFile("input.txt")
        .map(line -> {
            String[] fields = line.split(",");
            return new Tuple3<>(fields[0], Long.parseLong(fields[1]), fields[2]);
        });

DataStream<Tuple3<String, Long, String>> result = data
        .keyBy(0)
        .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
        .reduceGroup(new TopNFunction(3));

result.print();

public static class TopNFunction implements GroupReduceFunction<Tuple3<String, Long, String>, Tuple3<String, Long, String>> {
    private final int n;

    public TopNFunction(int n) {
        this.n = n;
    }

    @Override
    public void reduce(Iterable<Tuple3<String, Long, String>> iterable, Collector<Tuple3<String, Long, String>> collector) throws Exception {
        PriorityQueue<Tuple3<String, Long, String>> queue = new PriorityQueue<>(n, Comparator.comparingLong(t -> -t.f1));
        for (Tuple3<String, Long, String> t : iterable) {
            queue.offer(t);
            if (queue.size() > n) {
                queue.poll();
            }
        }
        for (Tuple3<String, Long, String> t : queue) {
            collector.collect(t);
        }
    }
}

结论

使用 Flink 的数据转换 Conclusion,我们可以方便地对数据集进行分析和处理,实现各种复杂的数据处理任务。在本例中,我们使用 Flink 的滑动窗口和自定义函数来查找每个用户最近浏览的 3 个网页,从而实现了对数据集的分析和处理。

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐