Java Multithreading : Ensure data consistency along with optimum performance when writing asynchronously to database

42 Views Asked by At

I have a Java Kafka Consumer reading from a Kafka topic with a single partition. The goal of the Java Microservice is to read from Kafka in batches (of the order of 5000), transform the messages provided and then update a table/collection in MongoDB.

More background : The Kafka Topic itself receives messages in the order of change events happening to a separate database, hence the order of messages in the Kafka are important. The idea is to synchronize changes from that database to MongoDB along with some transformation in the process.

The order of messages is maintained while reading from the Kafka topic. Once a batch of message is read in the Microservice, we wish to utilize Java concurrency and multi-threading to optimize the time in which we execute the task (transformations and save to MongoDB). Current working proof of concept, which is quite performant -

@KafkaListener(topics = "a.topic.containing.messages.in.order", containerFactory = "kafkaListenerContainerFactory")
public void myListener(ConsumerRecord<String, MyMessageData> myMessageRecord, Acknowledgment ack) {
    MyMessageData myMessage = myMessageRecord.value();
    CompletableFuture.runAsync(() -> processMyRecords(ack, myMessage), executorService);
}

The ExecutorService is configured as below -

private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 50);

As can be seen, we are using a fixedThreadPool and trying to maximize the use of threads and utilize all of the MongoDB connections available to us, as configured in the maxPoolSize.

The Problem

The runAsync() method processes the message asynchronously. This is causing a data discrepancy for cases where 2 messages with sequential updates coming in for the same record in MongoDB with unique Identifier (say, field ID) gets saved in MongoDB in the wrong order.

For example -

Message1 - ID - 100 (updates record in Mongo with id 100 with changes in field A) Message2 - ID - 100 (updates record in Mongo with id 100 with more changes in field A) Message3 - ID - 101 (updates record in Mongo with id 101) (we don't care if it goes before or after, as

Correct order: Message1, Message2. Observed undetermined order: Message2, Message1

Note, we can actually process Message3 without order restriction provided there is no other message update for ID 101 in the batch size pulled in from Kafka.

This incorrectness comes in when 2 or more message updates for the same ID are picked up by different threads of the fixedThreadPool.

Please note - we cannot use a single thread executor because performance is important for this use case (think data cutover of thousands of records to be processed in as less time as possible).

I feel if there is a way to provide a custom strategy to the ExecutorService, where we define, among the fixed pool of thread, our own strategy of assigning the threads to tasks,

  1. by first checking the message being processed.
  2. Assigning a bucket for that message, that can be calculated by say ID % totalNoOfThreadsInFixedThreadPool, and these "buckets" are nothing but threads.
  3. So both Message1, and Message2 will be processed by the same thread as they have the same ID (ID = 100), and we can make sure that the task will be performed sequentially for that set of messages assigned to a single message.

Is there a way we can realise above possible solution by customizing the ExecutorService? Any hooks provided into the the ThreadPoolExecutor, by the Java concurrent package? Is it a good idea to override this class or related classes of the ExecutorService?

This question answer talks about thread affinity, but the use case is different and the answer is slightly incomprehensible to me and discourages using the ExecutorService - Deterministic assignment of tasks to threads using ExecutorService.

If ExecutorService is not the answer, how else can our use case be achieved?

I'm no expert in Concurrency, but I feel this must be a problem that has been solved before. I couldn't find an exact answer on SO yet, hence reaching out with this question.

0

There are 0 best solutions below