I needs to use a multi-producer single-consumer queue in my application, and I choose JCTools's queues for that scenario, then I did a benchmark test about it:
public class MpscQueueTest {
private static final int PRODUCERS = 10;
private static final int SIZE = 256;
private static final int ITERATION = 100000;
private void doTest(Blackhole blackhole, Consumer<Integer> offer, Supplier<Integer> poll) throws InterruptedException {
ThreadLocalRandom random = ThreadLocalRandom.current();
CountDownLatch c1 = new CountDownLatch(1);
CountDownLatch c2 = new CountDownLatch(PRODUCERS + 1);
List<Thread> producerList = IntStream.range(0, PRODUCERS).mapToObj(_ -> Thread.ofVirtual().unstarted(() -> {
try {
c1.await();
for (int i = 0; i < ITERATION; i++) {
offer.accept(random.nextInt());
}
c2.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})).toList();
Thread consumer = Thread.ofPlatform().unstarted(() -> {
try{
c1.await();
for(int i = 0; i < ITERATION * PRODUCERS; ) {
Integer value = poll.get();
blackhole.consume(value);
if(value != null) {
i++;
}else {
Thread.onSpinWait();
}
}
c2.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
producerList.forEach(Thread::start);
consumer.start();
c1.countDown();
c2.await();
}
@Benchmark
public void testMpscLinkedQueue(Blackhole blackhole) throws InterruptedException {
MpscLinkedAtomicQueue<Integer> queue = new MpscLinkedAtomicQueue<>();
doTest(blackhole, queue::offer, queue::poll);
}
@Benchmark
public void testMpscArrayQueue(Blackhole blackhole) throws InterruptedException {
MpscUnboundedAtomicArrayQueue<Integer> queue = new MpscUnboundedAtomicArrayQueue<>(SIZE);
doTest(blackhole, queue::offer, queue::poll);
}
@Benchmark
public void testConcurrentLinkedQueue(Blackhole blackhole) throws InterruptedException {
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
doTest(blackhole, queue::offer, queue::poll);
}
@Benchmark
public void testMpscLockQueue(Blackhole blackhole) throws InterruptedException {
MpscLockQueue<Integer> queue = new MpscLockQueue<>();
doTest(blackhole, queue::offer, queue::poll);
}
}
The results are really unexpected:
MpscQueueTest.testConcurrentLinkedQueue avgt 50 231387059.667 ± 9583570.462 ns/op
MpscQueueTest.testMpscArrayQueue avgt 50 77983705.821 ± 2081174.613 ns/op
MpscQueueTest.testMpscLinkedQueue avgt 50 41121775.808 ± 979946.560 ns/op
MpscQueueTest.testMpscLockQueue avgt 50 31875119.204 ± 1478117.885 ns/op
The MpscLockQueue is just a simple Deque with a ReentrantLock guarding it:
public class MpscLockQueue<T> {
private final Lock lock = new ReentrantLock();
private final Deque<T> deque = new ArrayDeque<>();
public void offer(T element) {
lock.lock();
try{
deque.offer(element);
}finally {
lock.unlock();
}
}
public T poll() {
lock.lock();
try{
return deque.poll();
}finally {
lock.unlock();
}
}
}
I am so confused about the results, I thought the single consumer thread might be doing too much null-polling work, but it turns out all the queues are keeping the consumer thread busy, so the workload should be the same.
Is there any problem with my test program, if so, how should I test the performance of a MPSCQueue, if not, why is this happening?