[Translation] This article takes you to play with Java8 Stream, from here on the collection of operations So Easy

[Translation] This article takes you to play with Java8 Stream, from here on the collection of operations So Easy

This article is translated from winterbe.com/posts/2014/...

Author: @Winterbe

Welcome to follow the personal WeChat public account : Xiaoha learns Java , you can get 10G interview learning materials for free without routines, screenshots of the materials at the end of the article.

Personal website: www.exception.site/java8/java8

StreamJava8 flow can be said that with the new features succulent up a function, and with it, from the set of operations farewell tedious forcycle. But there are still many friends who don't know much about Stream. Today, through this @Winterbe translation, let's learn more about how to use it.

table of Contents

1. How does Stream work?

2. different types of Stream

3. the processing sequence of Stream

4. Is the order of intermediate operations so important?

5. data stream multiplexing problem

6. advanced operations

  • 6.1 Collect
  • 6.2 FlatMap
  • 6.3 Reduce

7. parallel stream

8. Conclusion


When I first read Java8 the Stream API, to be honest, I was very confused, because its name sounds and Java I0 frame InputStreamand OutputStreamis very similar. But in fact, they are completely different things.

Java8 Stream uses a functional programming model. Like its name, it can be used to chain stream operations on collections.

This article will show you how to use different types of Stream operations in Java 8. At the same time, you will also learn about the processing sequence of streams and how different sequences of stream operations affect runtime performance.

We will also learn a terminal operation API reduce, collectas well as flatMapthe details, and finally we come to deeply explore Java8 parallel streams.

Note: If you are not familiar with Java 8 lambda expressions, functional interfaces and method references, you can read another translation of Xiaoha's "Java 8 New Features Tutorial"

Next, let us enter the topic!

1. How does Stream work?

The flow representation contains a collection of a series of elements, and we can do different types of operations on them to perform calculations on these elements. It may sound a bit sloppy, let's use code to speak:

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
    .stream() // 
    .filter(s -> s.startsWith("c")) //  c  
    .map(String::toUpperCase) // 
    .sorted() // 
    .forEach(System.out::println); //for  

//C1
//C2
 

We can perform intermediate operations or terminal operations on the stream. Friends may have questions? What is an intermediate operation? What is terminal operation?

Stream intermediate operation, terminal operation

  • : The intermediate operation will return to a stream again, so we can link multiple intermediate operations. Note that there is no need to add a semicolon here. The filterfiltering, mapobject conversion, and sortedsorting in the above figure are intermediate operations.
  • : terminal operation is a termination operation convection operation returns general voidor a non-flow results. Figure above forEachloop is a termination operation.

After reading the above operation, it feels like a pipeline operation.

In fact, most stream operations support lambda expressions as parameters. To understand correctly, it should be said that it accepts an implementation of a functional interface as a parameter.

2. different types of Stream

We can create Stream from various data sources, among which Collection is the most common. As Listand Setsupport stream()methods create a sequential stream or parallel stream.

Parallel streams are executed in a multi-threaded manner, which can give full play to the advantages of multi-core CPUs to improve performance. This article will introduce parallel streams at the end, let s discuss sequential streams first:

Arrays.asList("a1", "a2", "a3")
    .stream() // 
    .findFirst() // 
    .ifPresent(System.out::println);  // 

//a1
 

Calling a stream()method on a collection will return a normal Stream. However, you don't have to create a collection deliberately, and then get the Stream through the collection. You can also use the following method:

Stream.of("a1", "a2", "a3")
    .findFirst()
    .ifPresent(System.out::println);  //a1
 

So the above example, we can Stream.of()flow from a pile to create a Stream object.

In addition to regular object streams, Java 8 also comes with some special types of streams for processing primitive data types int, longas well double. Having said that, you may have guessed that they are IntStream, LongStreamand there are DoubleStream.

Wherein IntStreams.range()the method can also be used to replace the conventional forcycle, as follows:

IntStream.range(1, 4)
    .forEach(System.out::println); //  for (int i = 1; i < 4; i++) {}

//1
//2
//3
 

The working methods of the above primitive type streams are basically the same as the regular object streams, but there are still some differences:

  • The primitive type stream uses its unique functional interface, such as IntFunctioninstead Function, IntPredicateinstead Predicate.

  • The original type stream supports additional terminal aggregation operations, sum()as well average(), as shown below:

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1) //  2*n + 1  
    .average() // 
    .ifPresent(System.out::println);  // 
//5.0
 

However, occasionally we also have this need, we need to convert the regular object stream to the original type stream, at this time, the intermediate operation mapToInt(), mapToLong()and mapToDoublecome in handy:

Stream.of("a1", "a2", "a3")
    .map(s -> s.substring(1)) // 1 
    .mapToInt(Integer::parseInt) //  int  
    .max() // 
    .ifPresent(System.out::println);  // 

//3
 

If you need to install the original type of flow into a stream of objects that you can use mapToObj()to achieve their goals:

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i) //for   1->4,   a
    .forEach(System.out::println); //for  

//a1
//a2
//a3
 

The following is an exemplary composition, will be first converted into double flow inttype of flow, which is then loaded into the stream of objects:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue) //double   int
    .mapToObj(i -> "a" + i) //  a
    .forEach(System.out::println); //for  

//a1
//a2
//a3
 

3. the processing sequence of Stream

In the previous section, we have learned how to create different types of Stream streams, and then we will have a deeper understanding of the execution sequence of the next data streams.

Before discussing the processing sequence, you need to be clear, that is, there is an important feature of intermediate operations- latency . Observe the following sample code without terminal operation:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    });
 

When executing this code snippet, you might think that the "d2", "a2", "b1", "b3", "c" elements will be printed in sequence. However, when you actually execute it, it will not print anything.

why?

The reason is: if and only if there is a terminal operation, the intermediate operation will be executed.

Do you not believe it? Next, the above code is added forEachterminal operation:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));
 

Execute again, we will see the output as follows:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c
 

The order of output may surprise you! Your mind will certainly think it should be first of all filterto print out the string prefix, then will print forEachthe string prefix.

In fact, the output result moves vertically with the chain. For example, when the Stream starts processing the d2 element, it will actually perform the forEach operation after the filter operation is completed, and then it will process the second element.

Isn t it amazing? Why is it designed like this?

The reason is for performance considerations. This design can reduce the actual number of operations for each element. You will understand after reading the following code:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 
    })
    .anyMatch(s -> {
        System.out.println("anyMatch: " + s);
        return s.startsWith("A"); //  A  
    });

//map:      d2
//anyMatch: D2
//map:      a2
//anyMatch: A2
 

Terminal operation anyMatch()represents any element A as a prefix for the return true, to stop the cycle. Therefore it from the d2match begins, and then recycled to a2the time of return true, then stop the loop.

Since the chain call of the data stream is executed vertically, mapit only needs to be executed twice here. The level of execution is relative, mapit will perform as few times, but not all the elements are mapconverted again.

4. Is the order of intermediate operations so important?

The following example consists of two intermediate operations mapand filter, and a terminal operation forEach. Let's take a look at how these operations are performed:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A"); //  A  
    })
    .forEach(s -> System.out.println("forEach: " + s)); //for  

//map:     d2
//filter:  D2
//map:     a2
//filter:  A2
//forEach: A2
//map:     b1
//filter:  B1
//map:     b3
//filter:  B3
//map:     c
//filter:  C
 

After studying the previous section, you should already know that mapsum filterwill be called five times for each string in the collection, forEachbut only once, because only "a2" meets the filter condition.

If we change the order of the intermediate operations and filtermove to the very beginning of the chain head, we can greatly reduce the actual number of executions:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s)
        return s.startsWith("a"); //  a  
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 
    })
    .forEach(s -> System.out.println("forEach: " + s)); //for  

//filter:  d2
//filter:  a2
//map:     a2
//forEach: A2
//filter:  b1
//filter:  b3
//filter:  c
 

Now, mapit only needs to be called once, and the performance is improved. This little trick is very useful for a large number of elements in the stream.

Next, let us add another intermediate operation to the above code sorted:

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2); // 
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a"); //  a  
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 
    })
    .forEach(s -> System.out.println("forEach: " + s)); //for  
 

sorted It is a stateful operation because it needs to save the state in order to sort the elements in the collection during processing.

Execute the above code, the output is as follows:

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2
 

Huh huh? This time, why is it not executed vertically. What you need to know is that it sortedis executed horizontally. Therefore, in this case, the sortedcombination of elements in the collection will be called eight times. Here, we can also use the optimization techniques mentioned above to move the filter intermediate operation to the beginning:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

//filter:  d2
//filter:  a2
//filter:  b1
//filter:  b3
//filter:  c
//map:     a2
//forEach: A2
 

From the above output, we see sortedhave never been called, because after filterthe element has been reduced to just after the one in this case, it does not perform the sort operation. Therefore, the performance is greatly improved.

5. data stream multiplexing problem

Java8 Stream cannot be reused. Once you call any terminal operation, the stream will be closed:

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    //ok
stream.noneMatch(s -> true);   //exception
 

When we call the stream of anyMatchterminal operations after the stream that is closed, then call noneMatchwill throw an exception:

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)
 

To overcome this limitation, we have created for each terminal operations we want to perform a new stream chain, for example, we can Supplierlook to wrap flow through get()to build a new approach Streamflow as follows:

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   //ok
streamSupplier.get().noneMatch(s -> true);  //ok
 

By constructing a new stream to avoid the restriction that the stream cannot be reused, this is also a tricky way.

6. advanced operations

StreamsThe supported operations are very rich, except for the more commonly used intermediate operations described above, such as filteror map(see Stream Javadoc ). There are some more complex operations, such as collect, flatMapand reduce. Next, let us study:

Most code examples in this section will use the following List<Person>presentations:

class Person {
    String name;
    int age;

    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return name;
    }
}

//  Person  
List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));
 

6.1 Collect

collect is a very useful terminal operation, it can transform the elements in the stream into another different object, such as one List, Setor Map. CollectorCollect accepts input parameters as (collector), which consists of four different operations: supplier, accumulator, combiner and finisher.

What are these? Don't panic, it looks very complicated, but fortunately, in most cases, you don't need to implement the collector yourself. Because Java 8 has Collectorsbuilt-in various commonly used collectors through classes, you can just use them directly.

Let's start with a very common use case:

List<Person> filtered =
    persons
        .stream() // 
        .filter(p -> p.name.startsWith("P")) //  P  
        .collect(Collectors.toList()); //  List

System.out.println(filtered);    //[Peter, Pamela]
 

As you can see, a structure from the stream Listvery simple. If you need to construct a Setcollection, only you need to use Collectors.toSet()it.

The next example will group everyone by age:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age)); //  key, 

personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

//age 18: [Max]
//age 23: [Peter, Pamela]
//age 12: [David]
 

In addition to these operations. You can also perform aggregation operations on the stream, for example, to calculate the average age of all people:

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age)); // 

System.out.println(averageAge);     //19.0
 

If you want to get a more comprehensive statistical information, the summary collector can return a special built-in statistical object. Through it, we can simply calculate the minimum age, maximum age, average age, sum and total number.

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age)); // 

System.out.println(ageSummary);
//IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}
 

In the next example, you can concatenate all names into a string:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18) // 18 
    .map(p -> p.name) // 
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age.")); //  In Germany  and   are of legal age.  

System.out.println(phrase);
//In Germany Max and Peter and Pamela are of legal age.
 

The input parameters of the connection collector accept delimiters, and optional prefixes and suffixes.

How to stream into Mapthe set, we must specify Mapthe keys and values. It should be noted here that Mapthe key must be unique, otherwise IllegalStateExceptionan exception will be thrown .

You can choose to pass a merge function as an additional parameter to avoid this exception:

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2)); //  key  

System.out.println(map);
//{18=Max, 23=Peter;Pamela, 12=David}
 

Now that we already know these powerful built-in collectors, let's try to build a custom collector.

For example, we want to convert all the people in the stream into a string containing all uppercase names, and |split them by. In order to achieve this effect, we need to Collector.of()create a new collector. At the same time, we also need to pass in the four components of the collector: supplier, accumulator, combiner and terminator.

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          //supplier  
        (j, p) -> j.add(p.name.toUpperCase()),  //accumulator  
        (j1, j2) -> j1.merge(j2),               //combiner  
        StringJoiner::toString);                //finisher  

String names = persons
    .stream()
    .collect(personNameCollector); // 

System.out.println(names);  //MAX | PETER | PAMELA | DAVID
 

Since strings in Java are of final type, we need helper classes StringJoinerto help us construct strings.

At first, the provider constructed one using a separator StringJointer.

The accumulator is used to convert each person's name to uppercase, and then add it to StringJointerit.

The combiner StringJointermerges the two into one.

In the end, the terminator StringJointerconstructs the expected string.

6.2 FlatMap

Above we have learned how to mapconvert objects in the stream to another type through operations. However, Mapeach object can only be mapped to another object.

What if we want to convert one object to multiple other objects or not at all? At this time, flatMapit comes in handy.

FlatMapAble to convert each element of the stream into a stream of other objects. Therefore, each object can be converted into zero, one or more other objects and returned in a stream. After that, the content of these streams will be put into the flatMapreturned stream.

Before learning how to operate in practice flatMap, let's create two new classes for testing:

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) {
        this.name = name;
    }
}

class Bar {
    String name;

    Bar(String name) {
        this.name = name;
    }
}
 

Next, use the stream knowledge we learned above to instantiate some objects:

List<Foo> foos = new ArrayList<>();

//  foos  
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));

//  bars  
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));
 

We created a fooset of three , each fooof which contains three more bar.

flatMapThe input parameter accepts a function that returns an object stream. In order to process each fooof them bar, we need to pass in the corresponding stream:

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

//Bar1 <- Foo1
//Bar2 <- Foo1
//Bar3 <- Foo1
//Bar1 <- Foo2
//Bar2 <- Foo2
//Bar3 <- Foo2
//Bar1 <- Foo3
//Bar2 <- Foo3
//Bar3 <- Foo3
 

As indicated above, we have successfully three fooconversion target nine stream barobject stream.

Finally, the above code can be reduced to a single stream operation:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));
 

flatMapIt can also be used for classes introduced by Java 8 Optional. OptionalThe flatMapoperation returns one Optionalor other types of objects. So it can be used to avoid cumbersome nullinspections.

Next, let's create deeper objects:

class Outer {
    Nested nested;
}

class Nested {
    Inner inner;
}

class Inner {
    String foo;
}
 

In order to handle getting the lowest foo string from the Outer object, you need to add multiple nullchecks to avoid possible occurrences NullPointerException, as shown below:

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}
 

We can also use Optionalthe flatMapoperation to complete the judgment of the same function as above, and it is more elegant:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);
 

If it is not empty, each flatMapcall will return the Optionalpackage of the expected object , otherwise it will return nullthe Optionalpackage class of the expected object .

The author adds: For Optional, please refer to my other translation "How Java 8 New Features Prevent Null Pointer Exceptions"

6.3 Reduce

The reduction operation can combine all the elements of the stream into one result. Java 8 supports three different reducemethods. The first one reduces the elements in the stream into one element in the stream.

Let's see how to use this method to filter out the oldest person:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    //Pamela
 

reduceThe method accepts an BinaryOperatoraccumulation function. This function is actually the same type of the two operands BiFunction. BiFunctionThe function is the Functionsame, but it accepts two parameters. In the sample code, we compare the ages of two people to return the older person.

The second reducemethod accepts the identification value and the BinaryOperatoraccumulator. This method can be used to construct a new Personone that contains the aggregate name and age from all other people in the stream:

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });

System.out.format("name=%s; age=%s", result.name, result.age);
//name=MaxPeterPamelaDavid; age=76
 

The third reducemethod accepts three parameters: identification value, BiFunctionaccumulator and type combinator function BinaryOperator. Since the type of the initial value is not necessarily Person, we can use this reduction function to calculate the total age of all people:

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum);  //76
 

The result was 76 , but what happened inside? Let's print some more debug logs:

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

//accumulator: sum=0; person=Max
//accumulator: sum=18; person=Peter
//accumulator: sum=41; person=Pamela
//accumulator: sum=64; person=David
 

As you can see, the accumulator function has done all the work. It first uses the initial value 0and the first person's age to be added. It sumwill continue to increase in the next three steps until 76.

and many more? Something seems wrong! The combinator has never been called?

Let's run the above code in a parallel stream and look at the log output:

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

//accumulator: sum=0; person=Pamela
//accumulator: sum=0; person=David
//accumulator: sum=0; person=Max
//accumulator: sum=0; person=Peter
//combiner: sum1=18; sum2=23
//combiner: sum1=23; sum2=12
//combiner: sum1=41; sum2=35
 

The execution of parallel streams is completely different. Here the combinator is called. In fact, because the accumulators are called in parallel, the combiner needs to be used to calculate the sum of the partial accumulated values.

Let's explore parallel streams in depth in the next chapter.

7. parallel stream

Streams can be executed in parallel. When there are a large number of elements in the stream, performance can be significantly improved. Used by the bottom layer of the parallel stream ForkJoinPool, it is ForkJoinPool.commonPool()provided by the method. The size of the underlying thread pool is up to five-depending on the number of cores available for the CPU:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    //3
 

On my machine, the initial default value of the public pool is 3. You can also reduce or increase this value by setting the following JVM parameters:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
 

Collections support parallelStream()methods to create parallel streams of elements. Or you can call an intermediate method on an existing data stream to parallel()convert the serial stream to a parallel stream, which is also possible.

In order to understand the execution behavior of the parallel stream in detail, we print the current thread information in the following sample code:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));
 

Through the log output, we can have a deeper understanding of which thread is used to perform streaming operations:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
 

As you can see, the parallel stream uses all ForkJoinPoolthe available threads to perform streaming operations. In continuous operation, the output may be different, because the specific thread used is non-specific.

Let's sortextend the above example by adding intermediate operations :

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));
 

Run the code, the output looks a little strange:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]
 

It seems to sortbe executed serially only on the main thread. But in fact, the sortnew method in Java 8 is used at the bottom of the parallel stream Arrays.parallelSort(). As explained in the official javadoc documentation, this method will be executed serially or in parallel according to the length of the data.

If the length of the specified data is less than the minimum value, it uses the corresponding Arrays.sortmethod to sort.

Back to the section on reduceexamples. We have found that the combiner function is only called in parallel streams, and not in serial streams.

Let's actually observe which thread is involved:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });
 

Through the console log output, the accumulator and combiner are executed in parallel on all available threads:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]
 

In short, what you need to remember is that parallel streams can greatly improve the performance of data streams that contain a large number of elements. But you also need to remember that some operations of parallel streams, such as reducesum collectoperations, require additional calculations (such as combined operations), which are not required for serial execution.

In addition, we also learned that all parallel stream operations share the same JVM-related common ForkJoinPool. So you may need to avoid writing some slow and stuck streaming operations, which is likely to slow down the performance of other parts of your application that rely heavily on parallel streams.

8. Conclusion

This concludes the Java8 Stream programming guide. If you are interested in learning more about Java 8 Stream, I suggest you use Stream Javadoc to read the official documentation. If you want to learn more about the underlying mechanism, you can also read Martin Fowlers ' article on Collection Pipelines .

Finally, I wish you a happy study!

Give away 10G interview & learning welfare resources

Obtaining method: Follow the WeChat public account: Xiaoha learns Java , the background reply " 666 ", you can get the resource link for free and no routines . The following is the catalog and some screenshots:

Follow the WeChat public account [Xiaoha learns Java], reply "666", you can get it for free without routine

Welcome to follow WeChat public account: Xiaoha learns Java

Xiaoha learns Java, pay attention to receiving 10G interview study materials