Streams

Goals

Concepts

Library

Dependencies

Lesson

You recently learned that Java's Optional<T> class can reduce the need for null references in your code, and consequently reduce the risk of generating a NullPointerException. In addition Optional<T> provides methods that internally make decisions based upon the presence of the object, using functional interfaces rather than imperative loops and tests to indicate behavior.

Let's review how Optional<T> can help make imperative, null-based code safer and more understandable. Remember that we added a method to the Point for retrieving its Quadrant, an enum of values {I, II, III, IV}. Let us suppose we would like collect the quadrants of all points that lie on lines at 45° angles from the origin—that is, the absolute values of the X and Y coordinates are equal. Without Optional<T> you might use imperative code such as the following. The method Point.getQuadrant() could return null, as some points lie on the axes between quadrants.

Imperative approach to collecting the quadrant of a certain Point.
final Set<Quadrant> quadrants = new HashSet<>();
final Point point = …;
if(point != null) {
  if(abs(point.getX()) == abs(point.getY())) {  //filter point
    final Quadrant quadrant = point.getQuadrant();  //get its quadrant
    if(quadrant != null) {
      quadrants.add(quadrant);  //collect quadrants in the Set<Quadrant>
    }
  }
}

The use of Optional<T> not only obviates explicit null checks, but facilitates filtering and mapping via functional interfaces, using lambda expressions and method reference, for example:

Functional approach to collecting the quadrant of a certain Point using Optional<T>.
final Set<Quadrant> quadrants = new HashSet<>();
final Optional<Point> point = …;
point
    .filter(p -> abs(p.getX()) == abs(p.getY()))  //filter point
    .flatMap(Point::getQuadrant)  //get its quadrant
    .ifPresent(quadrants::add);  //collect quadrants in the Set<Quadrant>

In addition to Optional<T>, you also recently learned that collections now provide internal iteration, which also uses functional interfaces. Rather than explicitly Iterating through the loop, you could pass a lambda to the collection's forEach(…) method. You could thus apply the above logic to a list of points, collecting all the relevant quadrants into a set:

Artificially creating an Optional<Point> to assist in processing a List<Point>.
final Set<Quadrant> quadrants = new HashSet<>();
final List<Point> points = …;
points.forEach(point -> {
  Optional.of(point)  //artificial Optional
      .filter(p -> abs(p.getX()) == abs(p.getY()))  //filter point
      .flatMap(Point::getQuadrant)  //get its quadrant
      .ifPresent(quadrants::add);  //collect quadrants in the Set<Quadrant>
});

But artificially introducing an Optional<Point> solely so we can functionally process elements in a list seems a clunky approach. Wouldn't it be nice if there were some way we could pass our sequence of filtering, mapping, and collection functions directly to the internal iteration logic of a list of elements?

Streams

Java streams represent a functional approach to processing a sequence of objects. A java.util.stream.Stream<T> represents a source of zero, one or more objects; in this respect it is similar to a java.util.Iterator<E>. Unlike an Iterator<E>, however you do not pull things one at a time from a Stream<T>. Instead you provide a series of functions by chaining Stream<T> method calls, as you did with Optional<T>, and let the stream itself take care of the processing using internal iteration.

There are several qualities of streams you should know about so that you can get an idea of how the overall approach of streams is different than the imperative processing you've been used to.

single-use
Like an iterator, once you've processed the elements in a stream, you cannot use it again.
internal iteration
You do not step through the elements in a stream yourself; you instead indicate operations to be performed and the stream will iterator the elements internally.
pipelining
You indicate a sequence of operations to be performed by chaining stream method calls, just like you did with methods like Optional.filter(…) and Optional.map(…). The resulting series of functions forms a processing pipeline through which objects pass. A stream method that returns another Stream<?> represents an intermediate operation, allowing you to chain further operations until the final terminal operation that closes the stream.
lazy
Importantly a stream does not process intermediate options immediately! By chaining intermediate operations you are actually defining the operations to be lazily performed only when and if needed when the terminal operation is invoked.
short-circuiting
Some operations may short-circuit processing of the stream because the outcome would not change whatever the remaining stream content.
parallelizing
Streams have the ability, if you ask them, to transparently parallelize or divide up operations among several processors to arrive at the result faster, with no further work on the developer's part. Parallelization is an advanced concept, and will be discussed in depth in a future lesson. Stream parallelization is not a solution to all problems, and can even lower performance if not used appropriately.

Getting a Stream

A stream provides functional access to some series of objects, so you can always create a stream directly from the source objects if you have them handy, using Stream<T> static factory methods.

Stream.empty()
Returns an empty stream.
Stream.of(T value)
Creates a stream containing a single element.
Stream.of(T... values)
Creates a stream containing the given sequence of the elements. The java.util.Arrays class also provides many static factory methods for producing streams from existing arrays, even for arrays of primitive values.
Stream.generate(Supplier<T> supplier)
Creates an infinite stream that dynamically generates values as needed using the given supplier. For example Stream.generate(Math::random) will continually use the Math.random() method to provide a never-ending sequence of values. Because the values are produced lazily, a later pipelined operation may still short-circuit the stream. See Limiting below.

But perhaps the easiest and most common approach for getting a stream is to ask an existing collection of elements to produce one. The Collection.stream() method produces a Stream<E> for processing the elements in the Collection<E>. The stream produced by Collection.stream() will not modify the original collection; the collection will merely be used as the source of the elements processed by the stream.

Internally Iterating Elements in a Stream

Let's look a simple example to get started. You already learned that Iterable.forEach(Consumer<? super T> action) allows you to pass a Consumer<T> for processing each element. Using a method reference, you can easily print out all all the elements in a list using list.forEach(System.out::println). Streams provide a similar Stream.forEach(Consumer<? super T> action) method which works in exactly some way.
Using Stream.forEach(…) to print all elements in a collection.
final List<String> names = Arrays.asList("Fulano", "Beltrano", "Sicrano");
names.stream().forEach(System.out::println);

Using a Consumer<T> to process each element in the stream using forEach(…) internal iteration is something you could have done with the collection itself. The real power of streams comes from the various intermediate operations you can chain in the pipeline, as well as the multitude of special terminal operations that put the icing on the processing cake.

Intermediate Operations

Intermediate options each returns some Stream<T> for chaining other method calls to place subsequent operations in the pipeline.

Filtering

The Stream.filter(Predicate<? super T> predicate) operation checks every element passing through the pipeline using the supplied Predicate<T>. The approach for filtering only points with equal absolute value X and Y coordinates looks similar to the same approach for filtering a single element using Option.filter(…).

Using Stream.filter(…) to print only certain points from a List<Point>.
final List<Point> points = …;
points.stream()
    .filter(p -> abs(p.getX()) == abs(p.getY()))
    .forEach(System.out::println);  //print each filtered point

Mapping

Stream.map(…)

When processing a stream of elements, you can provide a method for mapping each element to some other element. Just as Option.map(…) will produce another Optional<?> using the result of the mapping function, Stream.map(Function<? super T,? extends R> mapper) will apply the mapping function to each element to produce a new stream which can then be used to chain other operations. The following example Stream.map(Point::getX) operation would would therefore produce a Stream<Integer> of point X coordinates using the mapping function Point.getX().

Using Stream.map(…) to print the X coordinates of certain points from a List<Point>.
final List<Point> points = …;
points.stream()
    .filter(p -> abs(p.getX()) == abs(p.getY()))
    .map(Point::getX)
    .forEach(System.out::println);  //print each filtered point's X coordinate
Stream.flatMap(…)

The Stream.flatMap(Function<? super T,? extends Stream<? extends R>> mapper) operation is analogous to the Optional.flatMap(…) operation for optionals. If a mapping operation produces an Optional<T>, the operation Optional.flatMap(…) will unwrap the value inside any Optional<T>. Similarly if a mapping operation produces a Stream<T>, the operation Stream.flatMap(…) will unwrap the resulting stream and include the items themselves in the current stream.

Assume for a moment that there is a method Shape.points() that returns a Stream<Point> of the points defining that shape. Using Stream.map(…) with Shape::points as the argument would result in a Stream<Stream<Point>>, that is a stream of point streams. Using Stream.flatMap(…) instead would correctly produce a Stream<Point> for later processing, as in the following example.

Printing all the points making up the shapes in some list.
final List<Shape> shapes = …;
shapes.stream()
    .flatMap(Shape::points)
    .forEach(System.out::println);  //print the points of all shapes

Sorting

In previous lessons you've seen various examples of sorting by a Comparator<T>, so sorting a stream using Stream.sorted(Comparator<? super T> comparator) should appear instantly familiar. The following example sorts points by their X coordinates:

Using Stream.sorted(…) to sort points by X coordinates.
final List<Point> points = …;
points.stream()
    .sort(Comparator.comparingInt(Point::getX))
    .forEach(System.out::println);

Limiting

If you only want a certain number of items back, you can use Stream.limit(long maxSize), which short-circuits stream processing after producing the requested number of elements (or fewer if there are not enough elements in the stream to begin with). Another way to limit items that are returned are to skip over some of them in the stream using Stream.skip(long n).

Using Stream.skip(…) and Stream.skip(…) in a stream.
Stream.of("one", "two", "three", "four")
    .skip(1)
    .limit(2)
    .forEach(System.out::println);  //prints "two" and "three"

Terminal Operations

Terminal operations are those that finalize the processing of the stream and close it. You've already seen now Stream.forEach(…) passes each element in the stream to a Consumer<T>. Before creating your own Consumer<T>, however, review the other existing terminal operations—there might already be one that operates on the values in the same way, using a more functional approach to make your code easier to read and less susceptible to logical errors.

Collecting

One of the most common terminal operations is collecting all the elements in the stream into a Collection<T>, for example. At the beginning of each lesson we showed how you could create a separate Set<Point>, and then use a method reference to points::add inside Stream.forEach(…). But streams provide a more general and sophisticated approach for collecting elements.

You must supply the method Stream.collect(Collector<? super T,A,R> collector) with a collector. Although you could create a custom implementation of Collector<T, A, R>, the interface encapsulating the strategy for collecting the elements, it is more common to use one of the static factory methods in java.util.stream.Collectors.

For example the Collectors.toSet() factory method will return a collector strategy that knows how to create a set and add elements to it. The actual collection is achieved by calling collect(Collectors.toSet()), as shown in the following example fully replicating the imperative example from the beginning of this lesson:

Collecting quadrants of certain strings using Stream.collect(…).
final List<Point> points = …;
final Set<Quadrant> quadrants = points.stream()
    .filter(p -> abs(p.getX()) == abs(p.getY()))
    .map(Point::getQuadrant)
    .filter(Optional::isPresent)  //only accept optional quadrants that are present
    .map(Optional::get)  //map to the actual value
    .collect(Collectors.toSet());  //collect the quadrants into a set

Retrieving

If you simply want to retrieve one of the elements in the stream, use Stream.findAny(). This operation returns an Optional<T>, because there is no guarantee that there will be any items at all in the stream. Here is how you would retrieve one of the points that is in one of the four quadrants (i.e. that does not lie on an axis):

Retrieving any point that is in a quadrant using Stream.findAny().
final List<Point> points = …;
final Optional<Point> pointInQuadrant = points.stream()
    .filter(p -> p.getQuadrant().isPresent())
    .findAny();

Matching

Streams have several terminal operations for checking whether some or all of the elements in the stream match some predicate. Each returns a boolean indicating the result of the match.

Stream.allMatch(Predicate<? super T> predicate)
Tests to see whether all elements in the stream match the given predicate.
Stream.anyMatch(Predicate<? super T> predicate)
Tests to see whether any element in the stream matches the given predicate. This operation short-circuits stream processing as soon as a matching element is found, so that no more filtering, mapping, etc. is performed and the element is immediately returned.
Stream.noneMatch(Predicate<? super T> predicate)
Tests to see whether no elements in the stream match the given predicate.
Determining whether at least one point lies in the first quadrant using Stream.anyMatch().
final List<Point> points = …;
final boolean isPointInFirstQuadrant = points.stream()
    .anyMatch(p -> p.getQuadrant() == Quadrant.I);

Review

Gotchas

In the Real World

Streams are extremely new at present. Expect to encounter many APIs that work only with the Java Collection Framework, remembering that you can easily retrieve streams from collections and collect stream contents back into a collection.

Checked Exceptions

Streams are elegant and an important step towards functional programming. The stream implementation in Java has one glaring deficiency: it doesn't work well with checked exceptions. Java's provided functional interfaces, as noted briefly in the lesson on lambda expressions, do not declare checked exceptions, but you are free to create your own versions that do. The situation because more precarious with Java streams, because stream operations prescribe the functional interfaces they used; you are not allowed to substitute them for functions that throw checked exceptions. Moreover keep in mind that a lambda does not throw an exception you you create it, but when it is called by the stream. A lambda providing for mapping, for example, might not be called until the terminal operation is invoked.

Consider a method Vehicle.isAlarmArmed() that indicates whether the vehicle alarm system is turned on to detect intrusion attempts. This method can throw a checked exception AlarmException to indicate that the alarm system has been disconnected altogether, or is malfunctioning. You might attempt to honk the horn of every vehicle with an alarm system turned on:

Filtering vehicles based on a method that can throw a checked exception.
final List<Vehicle> vehicles = …;
vehicles.stream()
    .filter(Vehicle::isAlarmArmed) //ERROR: throws checked exception AlarmException
    .forEach(Vehicle::honk);
Wrapping and Unwrapping Checked Exceptions

The problem is that Stream.filter() does not declare any exceptions, which means that the checked exceptionAlarmException declared by Vehicle.isAlarmArmed() must be caught. One solution would be to catch the exception and rethrow it wrapped in a runtime exception.

Wrapping a checked exception thrown in a stream operation.
final List<Vehicle> vehicles = …;
try {
  vehicles.stream()
      .filter(vehicle -> {
        try {
          return vehicle.isAlarmArmed();
        } catch(final AlarmException alarmException) {
          throw new RuntimeException(alarmException);
        }
      }
      .forEach(Vehicle::honk);
} catch(final RuntimeException runtimeException) {
  if(runtimeException.getCause() instanceof AlarmException)
    throw (AlarmException)runtimeException.getCause();
  }
  throw runtimeException;
}

Obviously this bloats the code and defeats much of the purpose of using lambdas in the first place. Moreover it is not sufficient merely to wrap the exception; you must catch and “unwrap” the runtime exception later to rethrow or otherwise process the original AlarmException error.

Faux Pas

There are many workarounds to the problem of using checked exceptions with Java streams, but one of the most elegant (and also the most dangerous) is to sneaky throw the exception. This technique tricks the Java compiler to propagate checked exceptions even from within methods that do not declare them. The Faux Pas library has many utilities to add sneaky throws to existing lambdas, allowing checked exceptions with the minimum of intrusion. Most utilities appear as static methods of the org.zalando.fauxpas.FauxPas class.

For the above example, you can use FauxPas.throwingFunction(ThrowingFunction<T,R,X> function) to wrap Vehicle.isArmed(). The example code below assumes that you use import static org.zalando.fauxpas.FauxPas.* to statically import all the FauxPas utility methods.

Sneaky-throwing a checked exception in a stream operation using Faux Pas.
final List<Vehicle> vehicles = …;
try {
  vehicles.stream()
      .filter(FauxPas.throwingFunction(Vehicle::isAlarmArmed))
      .forEach(Vehicle::honk);
} catch(final AlarmException alarmException) {
  //TODO process the alarm problem
}

Notice that there is no need to wrap and unwrap the exception—it “magically” passes up the call stack, even pass the Stream.filter() method, until it is caught.

Think About It

Self Evaluation

Task

Add the ability to restrict list certain types of publications in Booker.

In the presentation layer, accept a --type (-t) parameter to the list command, recognizing the argument book or periodical.

Option Alias Description
list Lists all available publications.
--help -h Prints out a help summary of available switches.
--name -n Indicates a filter by name for the list command.
--type -t Indicates the type of publication to list, either book or periodical. If not present, all publications will be listed.

In the data layer, add a PublicationRepository.publications() method to return a Stream<Publication>. Whether you keep your existing PublicationRepository.getPublications() is up to you.

Also in the data layer, add a PublicationRepository.publicationsByType(Class<P> pubClass) method to return a Stream<P> of the requested type of publication. Keep the presentation-layer identification of publication type separate from the domain model (the Publication interface and subclasses) in the business logic layer.

Back in the presentation layer, revamp your printing routines to use stream processing.

See Also

References

Resources

Acknowledgments