8. Collectors

In the Stream API section we saw some of the terminal operations: toArray, reduce, forEach etc used to calculate the end result from the pipeline of intermediate operations. Stream interface contains a most frequently used terminal operation collect that performs the reduction operation on the elements of the stream using Collector interface.

Collector is nothing but a mutable reduction operation that accumulates elements from the stream into a mutable container and finally it returns either the same result container or a different representation of the container dependening on the characteristics given. Reduction operations can be performed either sequentially or in parallel.

Examples of mutable reduction operations include: accumulating elements into a Collection; concatenating strings using a StringBuilder; computing summary information about elements such as sum, min, max, or average; collecting elements as groups etc.

8.1. How Collector works?

Collector divides the complete reduction process to four sub-tasks that best fits to any type of reduction operation. They are:

  1. supplying a new empty result container at the begining
  2. accumulating new data element into the result container
  3. combining two result containers into one in case of parallelization
  4. performing an optional final transform on the container

All of these sub-tasks may or may not be needed for every operation but these are the generalized form of the complete process. Collectors also have a set of characteristics, such as Characteristics.CONCURRENT, that provide hints to the reduction process to provide better performance. Collector.Characteristics enum contains three characteristics as:

  • UNORDERED: Indicates that the collection operation does not commit to preserving the encounter order of input elements. This might be true if the result container has no intrinsic order, such as a Set.
  • CONCURRENT: Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads. Remember marking CONCURRENT doesn’t always execute concurrently, if not marked as UNORDERED or applied to an unordered data source like Set etc.
  • IDENTITY_FINISH: Setting on this property returns the result container as the final result with out calling Collector.finish().

Collector interface contains below five methods to support all of the above subtasks.

interface Collector<T, A, R> {
    Supplier<A> supplier();
    BiConsumer<A, T> accumulator();
    BinaryOperator<A> combiner();
    Function<A, R> finisher();
    Set<Characteristics> characteristics();
}

In this listing the following definitions apply:

  • T denotes the generic type of the stream elements
  • A represents the type of the supplier or type of the accumulator where the partial results will be accumulated
  • R is the type of result to be returned at the end. If the IDENTITY_FINISH characteristic is given then both A and R will be of the same type.

Let’s discuss the methods declared by the collctor interface:

  • Supplier<A> supplier()
    The supplier method will return an empty result container whenever invoked. Remember, this method will be called only once if reduction operation is requested for sequential execution and multiple times if parallel execution.
  • BiConsumer<A, T> accumulator()
    Accumulator will define the behaviour of the accumulation process. You might be already noticed that, though it is BiConsumer it takes partial result container and a new element as inputs and performs the configured task.
  • BinaryOperator<A> combiner()
    Combiner defines what to be done if two partial results are provided. As we know in the parallelization case, the complete dataset will be splitted to multiple chunks and performed separately, so combiner will merge the two partial results into one. The BinaryOperator's functional decsriptor is exactly matching with this task: (partial1, partial2) -> partial1.merge(partial2)
  • Function<A, R> finisher()
    This defines the final transformation to be done to the result container after all the elements are processed.
  • Set<Characteristics> characteristics()
    Returns the immutable set of Characteristics, defining the behavior of the collector.

8.2. Implementing collectors

Now we have enough idea on what are the methods collector interface provides and how does they work. So let’s implement our own collector that takes a set of Employee objects and generates a XML content.

public class Employee {
    public String name;
    public String empid;
    public String technology;
    public double salary;
}

public class ToXMLCollector implements Collector<Employee, StringBuffer, String> {

    final String xmlstr = "\n   <employee eid='%s'>\n\t" + "<name>%s</name>\n\t"
                + "<tech>%s</tech>\n\t<salary>%s</salary>\n   </employee>";

    public Supplier<StringBuffer> supplier() {
        return StringBuffer::new;
    }

    public BiConsumer<StringBuffer, Employee> accumulator() {
        return (sb, e) -> sb.append(String.format(xmlstr, e.empid, e.name, e.technology, e.salary));
    }

    public BinaryOperator<StringBuffer> combiner() {
        return (sb1, sb2) -> sb1.append(sb2.toString());
    }

    public Function<StringBuffer, String> finisher() {
        return sb -> String.format("<employees> %s \n</employees>", sb.toString());
    }

    public Set<Characteristics> characteristics() {
        return EnumSet.of(CONCURRENT);
    }


    public static void main(String[] args) {
        Set<Employee> emps = Database.employees();
        String xmlstr = emps.parallelStream().collect(new ToXMLCollector());
        System.out.println(xmlstr);
    }
}


Output:
-------
<employees>
    <employee eid='E1001'>
       <name>Mr Bean</name>
       <tech>Cloud Computing</tech>
    </employee>
    <employee eid='E1002'>
           <name>J Smith</name>
           <tech>Java</tech>
    </employee>
</employees>

In this example we created a separate ToXMLCollector class by overriding all of the collector methods but Collector interface also has Collector.of static methods that accepts the collector behaviors and returns a anonymous Collector instance.

  • Collector<T, A, R> of(Supplier<A> supplier, BiConsumer<A, T> accumulator,
    BinaryOperator<A> combiner, Function<A, R> finisher, Characteristics... characteristics)
  • Collector<T, A, R> of(Supplier<A> supplier, BiConsumer<A, T> accumulator,
    BinaryOperator<A> combiner, Characteristics... characteristics)

Using these helper method our ToXMLCollector can also be implemented as:

Collector.<Employee, StringBuffer, String>of(StringBuffer::new,
  (sb, e) -> sb.append(String.format(xmlstr, e.empid, e.name, e.technology)),
     (sb1, sb2) -> sb1.append(sb2.toString()),
        sb -> sb.insert(0, "<employees>").append("\n</employees>").toString(),
           Collections.emptySet());