JDK22 - Gatherer

Overview

If you take a look into JDK-22, you will find a very interesting thing, which is called 461: Stream Gatherers (Preview). Those Gatherers are a way to enhance the Stream API, which is existing in the JDK since 2014 (JDK 8). The collectors have been enhanced over the time (You can look up the differences via Javaalmanac check for Collectors). The provided collectors in the JDK already cover a lot of things, but sometimes there are situations, where it's not enough or not flexible enough or would produce code, which is hard to read (more accurate hard to understand). The first thought could be to request an enhancement of the Collectors in the JDK itself, but that would mean to add more methods on the collectors which already has 44 methods (If I have counted correctly). Apart from having a problem, which is so specific so it's worth to add that to the JDK itself(I have my doubts about that). So it might be a better solution to give the users a way to enhance the Stream API based on the their own needs. That is also the summary of the JEP 461:

Enhance the Stream API to support custom intermediate operations. This will allow stream pipelines to transform data in ways that are not easily achievable with the existing built-in intermediate operations. This is a preview API.

Let us start with a simple example. We would like to group by a list of strings based on the length of the strings itself. That can easily being expressed with the existing Stream API and collectors like this:

1@Test
2void usingGroupingBy() {
3  var result = Stream.of(
4          "123456", "foo", "bar", "baz", 
5          "quux", "anton", "egon", "banton")
6      .collect(Collectors.groupingBy(String::length));
7  System.out.println("result = " + result);
8}

The output of that is:

1result = {3=[foo, bar, baz], 4=[quux, egon], 5=[anton], 6=[123456, banton]}

So nothing fancy here. Another example using things like distinct() in a stream. That can look like this:

1@Test
2void exampleDistinctWithNumbers() {
3  var integers = List.of(1, 10, 11, 10, 11);
4  var result = integers.stream().distinct().toList();
5  System.out.println("result = " + result);
6}

And the output of the above:

1result = [1, 10, 11]

So let us combine those two ideas and create a distinct based on the length. Unfortunately there is no way to change distinct because it does not have a parameter at all. Only a single implementation. Ok, there is a way to achieve that, which could look like this (Yes that code is taken from the JEP 461):

 1record DistinctByLength(String str) {
 2  @Override public boolean equals(Object obj) {
 3    return obj instanceof DistinctByLength(String other)
 4           && str.length() == other.length();
 5  }
 6
 7  @Override public int hashCode() {
 8    return str == null ? 0 : Integer.hashCode(str.length());
 9  }
10}
11
12@Test
13void example_one() {
14  var result = Stream.of(
15          "123456", "foo", "bar", "baz", 
16          "quux", "anton", "egon", "banton")
17      .map(DistinctByLength::new)
18      .distinct()
19      .map(DistinctByLength::str)
20      .toList();
21  System.out.println("result = " + result);
22}

and the output:

1result = [123456, foo, quux, anton]

So to achieve the goal, we have to do a mapping .map(DistinctByLength::new) first and after that .distinct() and then map back into the string .map(DistinctByLength::str) ok works! Is that maintainable? It might be. Flexibility? The result shows, that the first reached element is emitted in the result. What if I like to have the last one? Or the second one if more than two elements are existing? Wouldn't it be easier if you could write the code like this:

1var streamList = List.of(
2    "123456", "foo", "bar", "baz",
3    "quux", "anton", "egon", "banton");
4
5var result = streamList
6    .stream()
7    .distinctBy(String::length)
8    .toList();

That would be great, but the JEP 461 comes quite near to that. That means, you have to write the code like this:

1var streamList = List.of(
2    "123456", "foo", "bar", "baz",
3    "quux", "anton", "egon", "banton");
4
5var result = streamList
6    .stream()
7    .gather(distinctBy(String::length))
8    .toList();

So finally we have to implement the distinctBy thing here. So how will that work? Starting by taking a look into the .gather(..) method. You can see, that the parameter uses the interface Gatherer<T, A, R>.. which in general comprises of four parts. The initializer, integrator, combiner and the finisher.

Integrator

Let us start with the integrator. This part is called every time an element from the stream is put through the stream, which means the integrator will see every element of the stream. Based on the implementation it can do anything with the element. For a better understanding we start with an implementation of a mapping, which does nothing. Let us call it mapNoOp (You might already have implied by the name: Map No Operation). That could be used like this:

1@Test
2void noOperation_withGathererOf() {
3  var integerList = List.of(1, 2, 3, 4, 5, 6, 7, 8);
4
5  var resultList = integerList.stream()
6      .gather(Gatherer.of(mapNoOp))
7      .toList();
8  System.out.println("resultList = " + resultList);
9}

This could also being achieved based on the existing collectors like this:

1@Test
2void noOperationMapping() {
3  var integerList = List.of(1, 2, 3, 4, 5, 6, 7, 8);
4
5  var resultList = integerList.stream()
6      .map(Function.identity())
7      .toList();
8  System.out.println("resultList = " + resultList);
9}

Now let us take a look how the implementation based on the Gatherer looks like:

1static final Gatherer.Integrator<Void, Integer, Integer> mapNoOp =
2    (state, element, downstream) -> {
3      downstream.push(element);
4      return true;
5    };

So for this, we only have to implement an integrator. The integrator will consume the elements via element and emits the elements via downstream.push(element). The return true here is responsible to tell the stream to consume more elements or don't continue to consume more elements via return false. Currently the state is not used at all here (I have kept the state for better understanding. It can be replaced with the underscore (_) to make it clear that it is not used see JEP-456). So to use it, as already stated before, you have to use Gatherer.of(mapNoOp). This can be made a bit more convenient:

1static <T> Gatherer<T, ?, T> mapNoOp() {
2  Gatherer.Integrator<Void, T, T> integrator = (_, element, downstream) -> {
3    downstream.push(element);
4    return true;
5  };
6  return Gatherer.ofSequential(integrator);
7}

That also makes it easier to generalize it. So now you can use very simple:

1@Test
2void noOperation_Integration() {
3  var integerList = List.of(1, 2, 3, 4, 5, 6, 7, 8);
4
5  var resultList = integerList.stream()
6      .gather(mapNoOp())
7      .toList();
8  System.out.println("resultList = " + resultList);
9}

So based on that, it is no problem to use it with other types:

1@Test
2void noOperation_IntegrationDifferentType() {
3  var integerList = List.of("1", "2", "3", "4", "5", "6", "7", "8");
4
5  var resultList = integerList.stream()
6      .gather(mapNoOp())
7      .toList();
8  System.out.println("resultList = " + resultList);
9}

That has become very convenient to be used.

Initializer and Finisher

So back to our initial idea. If reconsidering the usage of groupingBy and the distinct examples at the beginning. The groupingBy requires something internally to store intermediate results before emitting the final result. Also the distinct part needs something to remember, which elements already have gone through the stream and already existed. So that's when the initializer comes into the play. Or the state within the integrator becomes important. So this thing to remember is exactly the intention of the initializer, which creates such memory while the state is the access to that memory. The implementation looks like this:

 1static <T, A> Gatherer<T, ?, T> distinctBy(Function<? super T, ? extends A> classifier) {
 2  Supplier<HashMap<A, List<T>>> initializer = HashMap::new;
 3  //
 4  Gatherer.Integrator<HashMap<A, List<T>>, T, T> integrator = (state, element, downstream) -> {
 5    A apply = classifier.apply(element);
 6    state.computeIfAbsent(apply, (_) -> new ArrayList<>()).add(element);
 7    return true;
 8  };
 9  //
10  return Gatherer.ofSequential(initializer, integrator);
11}

So what do we do here? Based on the initializer we create a memory (HashMap), which stores the elements based on the key classifier. The classifier is something like String::length. So we have a HashMap<Integer,List<String>>. This will be filled during the iteration through the stream in the integrator. First apply the classifier (which results into the length) and adds the element (String) to the list in the HashMap. If you take a deep look into the above implementation you might spotted the issue. It will consume all elements form the stream, but it will never publish any element. So we have to think under which circumstance we want to emit elements? If the HashMap contains an element where one entry is in there or expressed in code:

 1static <T, A> Gatherer<T, ?, T> distinctBy(Function<? super T, ? extends A> classifier) {
 2  Supplier<HashMap<A, List<T>>> initializer = HashMap::new;
 3  //
 4  Gatherer.Integrator<HashMap<A, List<T>>, T, T> integrator = (state, element, downstream) -> {
 5    A apply = classifier.apply(element);
 6    state.computeIfAbsent(apply, (_) -> new ArrayList<>()).add(element);
 7    if (state.get(apply).size() == 1) {
 8      downstream.push(element);
 9    }
10    return true;
11  };
12  //
13  return Gatherer.ofSequential(initializer, integrator);
14}

The usage looks like this:

 1@Test
 2void usingDistinctByExample() {
 3  var streamList = List.of(
 4      "123456", "foo", "bar", "baz",
 5      "quux", "anton", "egon", "banton");
 6
 7  var result = streamList
 8      .stream()
 9      .gather(distinctBy(String::length))
10      .toList();
11
12  System.out.println("result = " + result);
13}

and the output is this:

1result = [123456, foo, quux, anton]

So that means original requirement solved. Go to the next requirement. Emit the last element of appearance. That means, we must be sure the whole stream has been done or in other words all elements haven been seen. That is not solvable only with the integrator for that you need the finisher.

Finisher

The finisher is called after the whole stream is done and before the stream ends so to speak. The finisher is defined as BiConsumer<A, Downstream<? super R>> finisher. That means in other words, the first part is the state and of course downstream which gives us the opportunity to emit elements in it's final state. That means also all elements of the stream have gone through the integrator.

 1static <T, A> Gatherer<T, ?, T> distinctBy(Function<? super T, ? extends A> classifier) {
 2  Supplier<HashMap<A, List<T>>> initializer = HashMap::new;
 3  //
 4  Gatherer.Integrator<HashMap<A, List<T>>, T, T> integrator = (state, element, downstream) -> {
 5    A apply = classifier.apply(element);
 6    state.computeIfAbsent(apply, (_) -> new ArrayList<>()).add(element);
 7    return true;
 8  };
 9  //
10  BiConsumer<HashMap<A, List<T>>, Gatherer.Downstream<? super T>> finisher = (state, downstream) -> {
11    state.forEach((_, value) -> downstream.push(value.getLast()));
12  };
13  //
14  return Gatherer.ofSequential(initializer, integrator, finisher);
15}

So the above definition can now be used as before with the only difference it will emit the last elements and not the first which looks like this:

1result = [baz, egon, anton, banton]

Also the other requirement I have expressed (the second element etc.) can be easily solved via the finisher. I leave that as an exercise for the gentle readers.

Combiner

So let us come to the last one. The combiner. This is responsible to combine (You guessed it ;-) different states of the state which can only happen if you run your gatherer in a parallel stream. As you might also already realized I used return Gatherer.ofSequential(initializer, integrator, finisher); which implies sequential processing (wasn't that obvious?). I selected a different requirement to demonstrate the usage of combiner. We should try to select the duplicates in a list or more accurate in a Stream. Ok, let us take a look into a simple solution (I bet there are other or even better solutions):

 1@Test
 2void exampleFindDuplicates() {
 3  var integers = List.of(100, 1, 10, 11, 5, 10, 11, 5, 100, 75, 78, 90);
 4  var duplicates = findDuplicates(integers);
 5  System.out.println("duplicates = " + duplicates);
 6}
 7
 8List<Integer> findDuplicates(List<Integer> givenList) {
 9  long count = givenList.stream().distinct().count();
10  if (count < givenList.size()) {
11    return givenList.stream().filter(i -> Collections.frequency(givenList, i) > 1)
12        .distinct().toList();
13  } else {
14    return List.of();
15  }
16}

This could be done in more general and more convenient way to be used in streams via Gathers like this:

 1static <T> Gatherer<? super T, ?, T> duplicatesWithoutCombiner() {
 2  Supplier<HashMap<T, Integer>> initializer = HashMap::new;
 3  //
 4  Gatherer.Integrator<HashMap<T, Integer>, T, T> integrator = (state, element, _) -> {
 5    var orDefault = state.getOrDefault(element, 0);
 6    state.put(element, orDefault + 1);
 7    return true;
 8  };
 9  //
10  BiConsumer<HashMap<T, Integer>, Gatherer.Downstream<? super T>> finisher = (state, downstream) -> {
11    state.forEach((k, v) -> {
12      if (v >= 2) {
13        downstream.push(k);
14      }
15    });
16  };
17  //
18  return Gatherer.ofSequential(initializer, integrator, finisher);
19}

This solution uses an initializer to create the internal state which is a HashMap<T, Integer>. The type T represents the used type by the stream. The Integer is simply to count the number of occurrence. The integrator does not emit any value, because the integrator can not say for sure if there will be coming more elements. That means the final emitting of the result can only be done in the finisher. That is of course only useful if the number of occurrence is greater or equal than two. If you run that on the defined set of number you will get:

1resultList = [100, 5, 10, 11]

So now let us take a look into a solution with a combiner:

 1static <T> Gatherer<? super T, ?, T> duplicates() {
 2  Supplier<HashMap<T, Integer>> initializer = HashMap::new;
 3  //
 4  Gatherer.Integrator<HashMap<T, Integer>, T, T> integrator = (state, element, _) -> {
 5    var orDefault = state.getOrDefault(element, 0);
 6    state.put(element, orDefault + 1);
 7    return true;
 8  };
 9  //
10  BiConsumer<HashMap<T, Integer>, Gatherer.Downstream<? super T>> finisher = (state, downstream) -> {
11    state.forEach((k, v) -> {
12      if (v >= 2) {
13        downstream.push(k);
14      }
15    });
16  };
17  //
18  BinaryOperator<HashMap<T, Integer>> combiner = (s1, s2) -> {
19    s1.forEach((k, v) -> {
20      var s1def = s2.getOrDefault(k, 0);
21      s2.put(k, v + s1def);
22    });
23    return s2;
24  };
25  //
26  return Gatherer.of(initializer, integrator, combiner, finisher);
27}

There at two differences. First the last line uses return Gatherer.of(initializer, integrator, combiner, finisher); instead of the previous mentioned ... ofSequential(..) and of course the implementation of the combiner itself. The combiner is meant literally a combiner and it does not make any difference here if you return s2 or s1. You should of course change the implementation accordingly like this:

1BinaryOperator<HashMap<T, Integer>> combiner = (s1, s2) -> {
2  s2.forEach((k, v) -> {
3    var s1def = s1.getOrDefault(k, 0);
4    s1.put(k, v + s1def);
5  });
6  return s1;
7};

You make sure the combining of two of those states results in the right thing. So finally you can now use the gatherer like this:

1@Test
2void exampleFindDuplicatesWithGathererCombiner() {
3  var integers = List.of(100, 1, 10, 11, 5, 10, 11, 5, 100, 75, 78, 90);
4  var resultList = integers.parallelStream().gather(duplicates()).toList();
5
6  assertThat(resultList).containsExactlyInAnyOrder(100, 10, 11, 5);
7}

If you like to see the combiner working you can simply put a print statement (or better use your IDE's debugger) in there and see that it is call several times.

I think this is a great enhancement for the JDK to make it more convenient customize required functionality based on the Stream API. Unfortunately, I have realized that the IntStream, LongStream and DoubleStream do not contain such an an enhancement. At the moment it looks like there is only a supplemental method missing gather on those three interfaces. Or maybe I misunderstand the complexity for adding such simple additions. The code for the examples here can be found Gatherer.

Happy coding.