How should I implement parallelism in java using threads for this scenario?

91 Views Asked by At

So My Scenario is I receive a message for executing a job. The job has a sourceId. Now at a time one job with one type of sourceId should be running others should be queued. When a job starts it should break down its own work in multiple small executions again. And for every small execution I need to update the db about how many parts of the job got completed. And once the whole job ends I need to update the db that the job completed. if there is an error I need to mark the job in db as failed. This is a approximate sketch I came up with. Can you guys help me If I am missing something and how should I do the db updates. And Is there any Java Language features that can make my life easy. One More thing I need to do this in a quarkus Application.

class PrimaryWorker{
    ConcurrentHashMap<String,ArrayBlockingQueue> staging;

    public void submit(){
        staging.checkIfEntryExistForSource()
        if(yes){
            getTheEntry()
            createSecondaryWorkerWithQueue();
            pushTheJobToQueue();
        }else{
            createEntryForDataSource()
            createSecondaryWorkerWithQueue();
            pushTheJobToQueue();
        }
    }
}
class BusinessJobWorker{
    ArrayBlockingQueue input;
    ArrayBlockingQueue commonOutput;


    public SecondaryWorker(input,commonOutput){

    }

    public void run(){
        BusinessJob br = input.poll();
        ArrayBlockingQueue queue = new ArrayBlockingQueue(10);
        if(canBeParallel()){
            queue = new ArrayBlockingQueue(1);
        }
        do{
            Batch batch = getEntities(br,batchSize,pageNumber);
            workerQueue.push(batch);
        }while(batch.hasNext);
        updateDatabase();
    }
    public void updateDatabase(){

    }
    public boolean canBeParallel(){

    }
}
class BatchWorker{
    ArrayBlockingQueue commonInput;
    ArrayBlockingQueue output;

    public BatchWorker(input){

    }
    
}

One Cases that I want to cover is If the server crashed or for some reason got terminated. The job execution should be resumed on restart and the sourceId level concurrency control should be maintained.

I am still yet to figure out the right way to do it I want expert opinion on how a multi threaded task should be done.

1

There are 1 best solutions below

1
Serkan On

Have you looked at Mutiny ? This is the library within Quarkus for reactive programming, see this link:

https://smallrye.io/smallrye-mutiny/latest/tutorials/getting-mutiny/

Anyways, I haven't tested the code below, but it should be something like this:

Assuming you have a db with a MyJobEntity (and I'm using Mongo in this example):

public class MyJobEntity extends ReactivePanacheMongoEntity {
     public State state;
     public List<Execution> executions;     
}

Then you can process the jobs as follow:

public Uni<Void> processAllJobs() {
        return MyJobEntity.<MyJobEntity>streamAll()
                .call(job -> processJob(job))
                .call(job -> setJobState(job, "FINISHED"))
                .invoke(job -> Log.infof("Job(id=%s, state=FINISHED) finished successfully", job.id))
                .onItem().ignoreAsUni();
    }

    public Uni<Void> processJob(MyJobEntity job) {
        return Multi.createFrom().iterable(job.executions)
                .onItem().transformToUniAndConcatenate(execution -> executePartOfJob(execution)
                        .onFailure()
                        .invoke(() -> new JobFailedException("Job(id=%s, executionId=%s) could not be completed".formatted(jobId, execution.id)))
                )
                .onItem().ignoreAsUni()
                .onFailure(JobFailedException.class).call(() -> setJobState(job, "FAILED"));
    }

This code will process each job in parallel and the execution part will be processed in order. If the executions are done, the state of the job is changed into FINISHED. Otherwise, it will throw an error set the state to FAILED.