前面分别看了lambda表达式的定义Stream接口的使用以及Collectors的应用,对如何使用Java 8里面提供的函数式机制有了一定了解,这里回过头了解下相应的代码实现,梳理下Stream的执行脉络。

从一个简单的示例来看下Stream执行过程是如何实现的,

Stream.of(1, 2, 3).map(x -> x + 1).filter(x -> x > 2).reduce((x, y) -> x + y).ifPresent(System.out::println);

Stream的构造

Pipeline

首先是Stream的创建,Stream.of(1, 2, 3),Stream.of其代码实现内部是调用了Arrays.stream,

public static<T> Stream<T> of(T... values) {
    return Arrays.stream(values);
}

Arrays.stream里调用了,

public static <T> Stream<T> stream(T[] array) {
    return stream(array, 0, array.length);
}

public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
    return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel);
}

这部分代码告诉我们RefrerencePipeline是实际处理逻辑的实现类,与Stream接口区分了原生类型与引用类型一样,Pipeline也有对应的实现,

  • ReferencePipeline
  • IntPipeline
  • LongPipeline
  • DoublePipeline

不同Pipeline的处理逻辑大致相同,只要弄明白一个,其余的也就清楚了。

Spliterator

在构造Pipeline的时候需要传入Spliterator,这个东西也是之前版本中所没有的。Spliterator实际负责了如何从数据源中获取数据的责任,其定义如下,

Spliterator

需要注意的是其中定义的tryAdvance、forEachRemaining两个接口。在Pipeline执行过程中会通过这两个接口来实际获得数据。

map/filter的实现

map、filter虽然是两个不同的接口,但其内部流程比较相似,这里就一并来看下,

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

map、reduce两个函数调用均返回了StatelessOp,这里可以验证文档里说的Stream提供的接口操作分为intermediate/terminal两种操作,intermediate操作调用后并不会执行传入的操作。所有的intermediate返回的都是StatelessOp/StatefulOp。StatelessOp/StatefulOp又是什么呢?这两个也是继承自Pipeline的,所以intermidate操作返回的都是Stream。

ChainedReference也是需要注意的一个类,通过它构建了Pipeline的不同阶段,在AbstractPipeline可以看到如下变量用以记录。

private final AbstractPipeline sourceStage;
private final AbstractPipeline previousStage;
private AbstractPipeline nextStage;

reduce的实现

reduce操作则是terminal操作,

// ReferencePipeline.java
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
    return evaluate(ReduceOps.makeRef(accumulator));
}

// AbstractReferencePipeline.java
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

// ReduceOps.java
public static <T, U> TerminalOp<T, U>
makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
    Objects.requireNonNull(reducer);
    Objects.requireNonNull(combiner);
    class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
        @Override
        public void begin(long size) {
            state = seed;
        }

        @Override
        public void accept(T t) {
            state = reducer.apply(state, t);
        }

        @Override
        public void combine(ReducingSink other) {
            state = combiner.apply(state, other.state);
        }
    }
    return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
        @Override
        public ReducingSink makeSink() {
            return new ReducingSink();
        }
    };
}

上面这么一大段内容就是其中一个reduce接口涉及的代码。这段内容里面需要注意ReduceOps这个类。在Java 8中几乎每一个terminal操作都有这么个对应的类,

  • DistinctOps
  • FindOps
  • ForEachOps
  • MatchOps
  • ReduceOps
  • SliceOps
  • SortedOps

这些Ops内部实现在流程上都很类似。同样的,弄明白ReduceOps基本也就能弄明白其余的。最终执行reduce逻辑的代码是哪呢,在ReduceOps.java的ReduceOp类中,

private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>> implements TerminalOp<T, R> {
    private final StreamShape inputShape;

    ReduceOp(StreamShape shape) {
        inputShape = shape;
    }

    public abstract S makeSink();

    @Override
    public StreamShape inputShape() {
        return inputShape;
    }

    @Override
    public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                       Spliterator<P_IN> spliterator) {
        return helper.wrapAndCopyInto(makeSink(), spliterator).get();
    }

    @Override
    public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                     Spliterator<P_IN> spliterator) {
        return new ReduceTask<>(this, helper, spliterator).invoke().get();
    }
}

最后的实现逻辑实在AbstractPipeline.java中,

@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}

@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

Sink.java也是需要关注的接口,在最终执行过程中均是通过这个接口来操作数据的,

Sink

总结

Java的一大好处就是代码开源可以随便看,而且其大部分库都是Java语言实现的,这样读起来也不费劲,不像Python需要去看C++代码。阅读Stream执行过程的相关代码,对其特性会有进一步认识,比如就更容易理解为什么会有intermediate/terminal操作的区分,以及也可以去看下Stream的并行是如何实现的,应该是应用了ForkJoin框架。

比较大的一个感受就是Java 8中区分了IntStream、LongStream、DoubleStream、ReferenceStream,相关联的实现也基本就是四份,实现上来看相当冗余。如果Java能更好更有效率的处理原生类型,这部分代码实现就可以简化。

在实际开发中知道如何使用往往并不足够,能够了解底层的实现通常会对使用更有帮助。退一步说,看看这些代码也可以了解下JDK的实现者们是如何进行抽象设计的,或多或少也能学到点东西。