Java 17 parallel Stream not yielding better performance

309 Views Asked by At

I am using parallel stream for processing stream having 20,000 elements and for each element a method is called which is using Java readLock to do some compute.

When running the code with JMH and comparing the serial version with parallel version not getting better performance and JMH numbers are somewhat same.

public class ParallelEvaluateTest {
   
    @Test
    public void benchmark() throws Exception {
        System.out.println("procssors: " + Runtime.getRuntime().availableProcessors());
        Options opt = new OptionsBuilder()
                .include(this.getClass().getSimpleName())
                .forks(0)
                .measurementIterations(1)
                .warmupIterations(1)
                .build();


        new Runner(opt).run();
    }

    /*
    Run total 5 iterations, first 2 as warmup and start benchmarking from next 3.

     */
    @Benchmark
    @Fork(value = 3, warmups = 2)
    @BenchmarkMode({Mode.Throughput, Mode.AverageTime})
    @OutputTimeUnit(TimeUnit.MILLISECONDS)
    @Test
    public void evaluateParallelTest() {
        final ConcurrentHashMap<Map<String, String>, String> resultMap = new ConcurrentHashMap<>();
        final List<Map<String, String>> ruleInputList = createTestDataInput();

        assertEquals(ruleInputList.size(), 20000);

        ruleInputList.parallelStream().forEach(ruleInput -> {
                    resultMap.put(ruleInput, ruleSystem1.evaluate(ruleInput));
                }
        );
        validateTestOutput(resultMap);
    }

    @Benchmark
    @Fork(value = 3, warmups = 2)
    @BenchmarkMode({Mode.Throughput, Mode.AverageTime})
    @OutputTimeUnit(TimeUnit.MILLISECONDS)
    @Test
    public void evaluateSerialTest() {
        final ConcurrentHashMap<Map<String, String>, String> resultMap = new ConcurrentHashMap<>();
        List<Map<String, String>> ruleInputList = createTestDataInput();

        assertEquals(ruleInputList.size(), 20000);

        ruleInputList.stream().forEach(ruleInput -> {
                    resultMap.put(ruleInput, ruleSystem1.evaluate(ruleInput));
                }
        );
        validateTestOutput(resultMap);
    }

    private static  List<Map<String, String>> createTestDataInput() {
        final List<Map<String, String>> ruleInputList = new ArrayList<>();
        Map<String, String> inputMap = new HashMap<>();

        for (int i =1; i <= 20000; i++) {
            inputMap = new HashMap<>();
            inputMap.put("input_1", String.valueOf(i));
            inputMap.put("input_2", String.valueOf(i));
            ruleInputList.add(inputMap);
        }
        return ruleInputList;
    }

    private void validateTestOutput(ConcurrentHashMap<Map<String, String>, String> resultMap) {
        assertEquals(resultMap.size(), 20000);
        resultMap.entrySet().removeIf(entry -> entry.getValue().equals(""));
        assertEquals(resultMap.size(), 7);
    }
}

I have also set the VM args -Djava.util.concurrent.ForkJoinPool.common.parallelism=4. This is equal to the number of available processors in machine.

Despite this the results which I am getting are:

Benchmark                                   Mode  Cnt    Score   Error   Units
ParallelEvaluateTest.evaluateParallelTest  thrpt         0.001          ops/ms
ParallelEvaluateTest.evaluateSerialTest    thrpt         0.001          ops/ms
ParallelEvaluateTest.evaluateParallelTest   avgt       790.351           ms/op
ParallelEvaluateTest.evaluateSerialTest     avgt       799.799           ms/op

Thus in short parallelStreams is not yielding better performance compared to serial one.

The parallel stream code is created here:

ruleInputList.parallelStream().forEach(ruleInput -> {
              resultMap.put(ruleInput, ruleSystem1.evaluate(ruleInput));
          }
);
1

There are 1 best solutions below

0
khachik On

If you want to see how parallelism improves the runtime then you should measure only the part that is running in parallel, data generation and validation should be separated (I mentioned this in the comment - refer to Amdahl's). Separately, 20k is not enough to see any significant difference. Check out the code below:

import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;

public class Test {

    public static void main(String [] args) throws Exception {
        System.out.println("processors: " + Runtime.getRuntime().availableProcessors());
        List<Map<String, String>> data = createTestDataInput();
        evaluate(data, true);
        evaluate(data, false);

        long start = System.currentTimeMillis();
        evaluate(data, true);
        long pend = System.currentTimeMillis();
        evaluate(data, false);
        long send = System.currentTimeMillis();

        System.err.println("Parallel: " + (pend - start) + "ms");
        System.err.println("Sequencial: " + (send - pend) + "ms");

    }   

    public static void evaluate(List<Map<String, String>> ruleInputList, boolean p) {
        final ConcurrentHashMap<Map<String, String>, String> resultMap = new ConcurrentHashMap<>();
        if(p) {
            ruleInputList.parallelStream().forEach(ruleInput -> {
                        resultMap.put(ruleInput, "a");
                    }); 
        } else {
            ruleInputList.stream().forEach(ruleInput -> {
                        resultMap.put(ruleInput, "b");
                    }); 
        }   
    }   

    private static  List<Map<String, String>> createTestDataInput() {
        final List<Map<String, String>> ruleInputList = new ArrayList<>();
        Map<String, String> inputMap = new HashMap<>();

        for (int i =1; i <= 2000000; i++) {
            inputMap = new HashMap<>();
            inputMap.put("input_1", String.valueOf(i));
            inputMap.put("input_2", String.valueOf(i));
            ruleInputList.add(inputMap);
        }
        return ruleInputList;
    }
}

Output: processors: 8 Parallel: 302ms Sequencial: 545ms

Furthermore: If you want to reduce the overhead, you can create the result map with the initial size:

...
final ConcurrentHashMap<Map<String, String>, String> resultMap = new ConcurrentHashMap<>(ruleInputList.size());

Output: processors: 8 Parallel: 124ms Sequencial: 273ms