See the simple example below that counts the number of occurences of each word in a list:
Stream<String> words = Stream.of("a", "b", "a", "c");
Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1,
(i, j) -> i + j));
At the end, wordsCount
is {a=2, b=1, c=1}
.
But my stream is very large and I want to parallelise the job, so I write:
Map<String, Integer> wordsCount = words.parallel()
.collect(toMap(s -> s, s -> 1,
(i, j) -> i + j));
However I have noticed that wordsCount
is a simple HashMap
so I wonder if I need to explicitly ask for a concurrent map to ensure thread safety:
Map<String, Integer> wordsCount = words.parallel()
.collect(toConcurrentMap(s -> s, s -> 1,
(i, j) -> i + j));
Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?
Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?
It is safe to use a non-concurrent collector in a collect
operation of a parallel stream.
In the specification of the Collector
interface, in the section with half a dozen bullet points, is this:
For non-concurrent collectors, any result returned from the result supplier, accumulator, or combiner functions must be serially thread-confined. This enables collection to occur in parallel without the Collector needing to implement any additional synchronization. The reduction implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after accumulation is complete.
This means that the various implementations provided by the Collectors
class can be used with parallel streams, even though some of those implementations might not be concurrent collectors. This also applies to any of your own non-concurrent collectors that you might implement. They can be used safely with parallel streams, provided your collectors don't interfere with the stream source, are side-effect free, order independent, etc.
I also recommend reading the Mutable Reduction section of the java.util.stream package documentation. In the middle of this section is an example that is stated to be parallelizable, but which collects results into an ArrayList
, which is not thread-safe.
The way this works is that a parallel stream ending in a non-concurrent collector makes sure that different threads are always operating on different instances of the intermediate result collections. That's why a collector has a Supplier
function, for creating as many intermediate collections as there are threads, so each thread can accumulate into its own. When intermediate results are to be merged, they are handed off safely between threads, and at any given time only a single thread is merging any pair of intermediate results.