简洁的Java8

Stream

标签 : Java基础


再次回到阿里, 感觉变化好大: 一是服务资源Docker化, 最牛逼的阿里DB团队竟然把DB放到了容器中, 还放到了线上环境; 二是全集团Java8(记得离开时还是1.6、1.5, 甚至还有1.4), 在外面创业公司都还停留在1.7的时代, 阿里竟率先使用了Java8, 而且还做了高性能的定制, 因此阿里人也就有机会在生产环境体验到Java8如丝般的顺滑流畅. 而本篇就从对Java8影响最大的Stream开始说起.


引入

如果说Runnable接口是将执行逻辑Thread中剥离了的话, 那Stream则是将数据计算逻辑Collection中抽离了出来, 使Collection只专注于数据的存储, 而不用分心计算.

打开Collection Api可以看到多了一个stream() default接口:

/**
 * Returns a sequential {@code Stream} with this collection as its source.
 *
 * <p>This method should be overridden when the {@link #spliterator()}
 * method cannot return a spliterator that is {@code IMMUTABLE},
 * {@code CONCURRENT}, or <em>late-binding</em>. (See {@link #spliterator()}
 * for details.)
 *
 * @implSpec
 * The default implementation creates a sequential {@code Stream} from the
 * collection's {@code Spliterator}.
 *
 * @return a sequential {@code Stream} over the elements in this collection
 * @since 1.8
 */
default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
}

Stream允许以声明方式处理集合等可以转换为Stream<T>的数据, 他有很多特点:

  • 内部迭代
    与原有的Iterator不同, Stream将迭代操作(类似for/for-each)全部固化到了Api内部实现, 用户只需传入表达计算逻辑的lambda表达式(可以理解为SupplierFunction这些的@FunctionalInterface的实现), Stream便会自动迭代数据触发计算逻辑并生成结果. 内部迭代主要解决了两方面的问题: 避免集合处理时的套路和晦涩; 便于库内部实现的多核并行优化.
  • 流水线
    很多Stream操作会再返回一个Stream, 这样多个操作就可以链接起来, 形成一个大的流水线, 使其看起来像是对数据源进行数据库式查询, 这也就让自动优化成为可能, 如隐式并行.
  • 隐式并行
    如将.stream()替换为.parallelStream(), Stream则会自动启用Fork/Join框架, 并行执行各条流水线, 并最终自动将结果进行合并.
  • 延迟计算
    由于Stream大部分的操作(如filter()generate()map()…)都是接受一段lambda表达式, 逻辑类似接口实现(可以看成是回调), 因此代码并不是立即执行的, 除非流水线上触发一个终端操作, 否则中间操作不会执行任何处理.
  • 短路求值
    有些操作不需要处理整个流就能够拿到结果, 很多像anyMatch()allMatch()limit(), 只要找到一个元素他们的工作就可以结束, 也就没有必要执行后面的操作, 因此如果后面有大量耗时的操作, 此举可大大节省性能.

下面一个示例直观的感受下Stream带来的便利:

public void joiningList() {
    // 生成一段[0,20)序列
    List<Integer> list = IntStream.range(0, 20)
            .boxed()
            .collect(Collectors.toList());

    // 将list内的偶数提取反向排序后聚合为一个String
    String string = list.stream()
            .filter(n -> n % 2 == 0)
            .sorted(Comparator.comparing((Integer i) -> i).reversed())
            .limit(3)
            .peek((i) -> System.out.println("remained: " + i))
            .map(String::valueOf)
            .collect(Collectors.joining());

    System.out.println(string);
}

Stream 构成

一个流管道(Stream pipeline)通常由3部分构成: 数据源(Source) -> 中间操作/转换(Transforming) -> 终端操作/执行(Operations): Stream由数据源生成, 经由中间操作串联起来的一条流水线的转换, 最后由终端操作触发执行拿到结果.

  1. Source - 对应Stream的生成: -> 如何生成一个Stream;
  2. Transforming - 对应Stream的转换: -> 如前面的map()filter()limit(), 将原Stream转换为另一形态;
  3. Operations - 对应Stream的执行: -> 他会真正引发前面一系列Transforming的执行, 并生成一个结果(如ListArrayOptional<T>), 或一个side effect.

我们分别来介绍这些Stream的构成部分:


数据源-Stream生成

除了前面介绍过的collection.stream(), 流的生成方式多种多样, 可简单概括为3类: 通用流数值流其他, 其中以通用流最为常用, 数值流是Java为intlongdouble三种数值类型防拆装箱成本所做的优化:


1. 通用流

APIdescription
Arrays.stream(T[] array)Returns a sequential Stream with the specified array as its source.
Stream.empty()Returns an empty sequential Stream.
Stream.generate(Supplier<T> s)Returns an infinite sequential unordered stream where each element is generated by the provided Supplier<T>.
Stream.iterate(T seed, UnaryOperator<T> f)Returns an infinite sequential ordered Stream produced by iterative application of a function f to an initial element seed, producing a Stream consisting of seed, f(seed), f(f(seed)), etc.
Stream.of(T... values)Returns a sequential ordered stream whose elements are the specified values.
Stream.concat(Stream<? extends T> a, Stream<? extends T> b)Creates a lazily concatenated stream whose elements are all the elements of the first stream followed by all the elements of the second stream.
StreamSupport.stream(Spliterator<T> spliterator, boolean parallel)Creates a new sequential or parallel Stream from a Spliterator.

2. 数值流

APIdescription
Arrays.stream(Xxx[] array)Returns a sequential Int/Long/DoubleStream with the specified array as its source.
XxxStream.empty()Returns an empty sequential Int/Long/DoubleStream.
XxxStream.generate(XxxSupplier s)Returns an infinite sequential unordered stream where each element is generated by the provided Int/Long/DoubleSupplier.
XxxStream.iterate(Xxx seed, XxxUnaryOperator f)Returns an infinite sequential ordered Int/Long/DoubleStream like as Stream.iterate(T seed, UnaryOperator<T> f)
XxxStream.of(Xxx... values)Returns a sequential ordered stream whose elements are the specified values.
XxxStream.concat(XxxStream a, XxxStream b)Creates a lazily concatenated stream whose elements are all the elements of the first stream followed by all the elements of the second stream.
Int/LongStream.range(startInclusive, endExclusive)Returns a sequential ordered Int/LongStream from startInclusive (inclusive) to endExclusive (exclusive) by an incremental step of 1.
Int/LongStream.rangeClosed(startInclusive, endInclusive)Returns a sequential ordered Int/LongStream from startInclusive (inclusive) to endInclusive (inclusive) by an incremental step of 1.

3. 其他

  • I/O Stream
    • BufferedReader.lines()
  • File Stream
    • Files.lines(Path path)
    • Files.find(Path start, int maxDepth, BiPredicate<Path,BasicFileAttributes> matcher, FileVisitOption... options)
    • DirectoryStream<Path> newDirectoryStream(Path dir)
    • Files.walk(Path start, FileVisitOption... options)
  • Jar
    • JarFile.stream()
  • Random
    • Random.ints()
    • Random.longs()
    • Random.doubles()
  • Pattern
    • splitAsStream(CharSequence input)

另外, 三种数值流之间, 以及数值流与通用流之间都可以相互转换:
1. 数值流转换: doubleStream.mapToInt(DoubleToIntFunction mapper)intStream.asLongStream()
2. 数值流转通用流: longStream.boxed()intStream.mapToObj(IntFunction<? extends U> mapper)
3. 通用流转数值流: stream.flatMapToInt(Function<? super T,? extends IntStream> mapper)stream.mapToDouble(ToDoubleFunction<? super T> mapper)


中间操作-Stream转换

所有的中间操作都会返回另一个Stream, 这让多个操作可以链接起来组成中间操作链, 从而形成一条流水线, 因此它的特点就是前面提到的延迟执行: 触发流水线上触发一个终端操作, 否则中间操作不执行任何处理.

APIDescription
filter(Predicate<? super T> predicate)Returns a stream consisting of the elements of this stream that match the given predicate.
distinct()Returns a stream consisting of the distinct elements (according to Object.equals(Object)) of this stream.
limit(long maxSize)Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length.
skip(long n)Returns a stream consisting of the remaining elements of this stream after discarding the first n elements of the stream.
sorted(Comparator<? super T> comparator)Returns a stream consisting of the elements of this stream, sorted according to the provided Comparator.
map(Function<? super T,? extends R> mapper)Returns a stream consisting of the results of applying the given function to the elements of this stream.
flatMap(Function<? super T,? extends Stream<? extends R>> mapper)Returns a stream consisting of the results of replacing each element of this stream with the contents of a mapped stream produced by applying the provided mapping function to each element.
peek(Consumer<? super T> action)Returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream.

这里着重讲解下flatMap(), 因为我在第一次接触他时也没明白他到底能做什么:
假设我们有这样一个字符串list:List<String> strs = Arrays.asList("hello", "alibaba", "world");如何列出里面各不相同的字符呢?
首先我们想到的是String包含一个split()方法, 将字符串分解为子串, 于是我们这样写:

Stream<Stream<String>> streamStream = strs.stream()
        .map(str -> Arrays.stream(str.split("")));

我们将String分解成String[]后再由Arrays.stream()String[]映射成Stream<String>, 但这个结果是我们不想看到的: 我们明明想要的是Stream<String>却得到的是Stream<Stream<String>>, 他把我们想要的结果包到Stream里面了. 这时候就需要我们的flatMap()出场了:

Stream<String> stringStream = strs.stream()
        .flatMap(str -> Arrays.stream(str.split("")));

flatMap()Stream中的层级结构扁平化了, 将内层Stream内的元素抽取出来, 最终新的Stream就没有内层Stream了.

可以简单概括为: flatMap()方法让你把一个流中的每个值都换成另一个Stream, 然后把所有的Stream连接起来成为一个Stream.


终端操作-Stream执行

终端操作不仅担负着触发流水线执行的任务, 他还需要拿到流水线执行的结果, 其结果为任何不是流的值, 如ListArraybooleanOptional<T>, 甚至是void(forEach()):

ApiDescription
count()Returns the count of elements in this stream.
max(Comparator<? super T> comparator)Returns the maximum element of this stream according to the provided Comparator.
min(Comparator<? super T> comparator)Returns the minimum element of this stream according to the provided Comparator.
allMatch(Predicate<? super T> predicate)Returns whether all elements of this stream match the provided predicate.
anyMatch(Predicate<? super T> predicate)Returns whether any elements of this stream match the provided predicate.
noneMatch(Predicate<? super T> predicate)Returns whether no elements of this stream match the provided predicate.
findAny()Returns an Optional describing some element of the stream, or an empty Optional if the stream is empty.
findFirst()Returns an Optional describing the first element of this stream, or an empty Optional if the stream is empty.
reduce(BinaryOperator<T> accumulator)Performs a reduction on the elements of this stream, using an associative accumulation function, and returns an Optional describing the reduced value, if any.
toArray()Returns an array containing the elements of this stream.
forEach(Consumer<? super T> action)Performs an action for each element of this stream.
forEachOrdered(Consumer<? super T> action)Performs an action for each element of this stream, in the encounter order of the stream if the stream has a defined encounter order.
collect(Collector<? super T,A,R> collector)Performs a mutable reduction operation on the elements of this stream using a Collector.

IntStream/LongStream/DoubleStream还提供了average()sum()summaryStatistics()这样的操作, 拿到一个对Stream进行汇总了的结果.


other

java.util.stream.Stream接口集成自java.util.stream.BaseStream接口, 而BaseStream接口也提供了很多工具方法(如将串行流转换为并行流的parallel()方法)供我们使用:

ApiDescription
S onClose(Runnable closeHandler)Returns an equivalent stream with an additional close handler.
void close()Closes this stream, causing all close handlers for this stream pipeline to be called.
S unordered()Returns an equivalent stream that is unordered.
Iterator<T> iterator()Returns an iterator for the elements of this stream.
Spliterator<T> spliterator()Returns a spliterator for the elements of this stream.
S sequential()Returns an equivalent stream that is sequential.
S parallel()Returns an equivalent stream that is parallel.
boolean isParallel()Returns whether this stream, if a terminal operation were to be executed, would execute in parallel.

综合实战

下面, 我们针对一系列交易提出一些问题综合实践上面列举的Api:

  • DO定义
/**
 * 交易员
 */
private class Trader {

    private String name;
    private String city;

    public Trader(String name, String city) {
        this.name = name;
        this.city = city;
    }

    public String getName() {
        return name;
    }

    public String getCity() {
        return city;
    }

    @Override
    public String toString() {
        return "Trader{" +
                "name='" + name + '\'' +
                ", city='" + city + '\'' +
                '}';
    }
}

/**
 * 交易
 */
private class Transaction {

    private Trader trader;
    private int year;
    private int value;

    public Transaction(Trader trader, int year, int value) {
        this.trader = trader;
        this.year = year;
        this.value = value;
    }

    public Trader getTrader() {
        return this.trader;
    }

    public int getYear() {
        return this.year;
    }

    public int getValue() {
        return this.value;
    }

    @Override
    public String toString() {
        return "Transaction{" +
                "trader=" + trader +
                ", year=" + year +
                ", value=" + value +
                '}';
    }
}
  • Stream操作
/**
 * @author jifang.zjf
 * @since 2017/7/3 下午4:05.
 */
public class StreamLambda {

    private List<Transaction> transactions;

    @Before
    public void setUp() {
        Trader raoul = new Trader("Raoul", "Cambridge");
        Trader mario = new Trader("Mario", "Milan");
        Trader alan = new Trader("Alan", "Cambridge");
        Trader brian = new Trader("Brian", "Cambridge");

        transactions = Arrays.asList(
                new Transaction(brian, 2011, 300),
                new Transaction(raoul, 2012, 1000),
                new Transaction(raoul, 2011, 400),
                new Transaction(mario, 2012, 710),
                new Transaction(mario, 2012, 700),
                new Transaction(alan, 2012, 950)
        );
    }

    @Test
    public void action() {
        // 1. 打印2011年发生的所有交易, 并按交易额排序(从低到高)
        transactions.stream()
                .filter(transaction -> transaction.getYear() == 2011)
                .sorted(Comparator.comparing(Transaction::getValue))
                .forEachOrdered(System.out::println);

        // 2. 找出交易员都在哪些不同的城市工作过
        Set<String> distinctCities = transactions.stream()
                .map(transaction -> transaction.getTrader().getCity())
                .collect(Collectors.toSet());   // or .distinct().collect(Collectors.toList())
        System.out.println(distinctCities);

        // 3. 找出所有来自于剑桥的交易员, 并按姓名排序
        Trader[] traders = transactions.stream()
                .map(Transaction::getTrader)
                .filter(trader -> trader.getCity().equals("Cambridge"))
                .distinct()
                .sorted(Comparator.comparing(Trader::getName))
                .toArray(Trader[]::new);
        System.out.println(Arrays.toString(traders));

        // 4. 返回所有交易员的姓名字符串, 并按字母顺序排序
        String names = transactions.stream()
                .map(transaction -> transaction.getTrader().getName())
                .distinct()
                .sorted(Comparator.naturalOrder())
                .reduce("", (str1, str2) -> str1 + " " + str2);
        System.out.println(names);

        // 5. 返回所有交易员的姓名字母串, 并按字母顺序排序
        String letters = transactions.stream()
                .map(transaction -> transaction.getTrader().getName())
                .distinct()
                .map(name -> name.split(""))
                .flatMap(Arrays::stream)
                .sorted()
                .collect(Collectors.joining());
        System.out.println(letters);

        // 6. 有没有交易员是在米兰工作
        boolean workMilan = transactions.stream()
                .anyMatch(transaction -> transaction.getTrader().getCity().equals("Milan"));
        System.out.println(workMilan);

        // 7. 打印生活在剑桥的交易员的所有交易额总和
        long sum = transactions.stream()
                .filter(transaction -> transaction.getTrader().getCity().equals("Cambridge"))
                .mapToLong(Transaction::getValue)
                .sum();
        System.out.println(sum);

        // 8. 所有交易中,最高的交易额是多少
        OptionalInt max = transactions.stream()
                .mapToInt(Transaction::getValue)
                .max();
        // or transactions.stream().map(Transaction::getValue).max(Comparator.naturalOrder());
        System.out.println(max.orElse(0));

        // 9. 找到交易额最小的交易
        Optional<Transaction> min = transactions.stream()
                .min(Comparator.comparingInt(Transaction::getValue));
        System.out.println(min.orElseThrow(IllegalArgumentException::new));
    }
}
参考
Java 8新特性:全新的Stream API
Java 8 中的 Streams API 详解
Java 8:不要再用循环了
Java 8 in Action

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐