Sleeping in Spring Boot

333 Views Asked by At

I have a task scheduled (@EnableSceduling) in a Spring Boot web service that repeats on a regular basis. When that task fires, it calls the registered object's Runnable/run method. In that run method, I need to do work and not exit that run method until the work is completed. The problem is that I have other threads doing other work that is needed by this run thread for its work. So in the run thread I have something like this:

@Component
public class DoWork implements Runnable {
    
    @override
    public void run() {
    
    // Setup clients.

    // Call services.
    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback()); 
    
    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback()); 

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback());
        
    while(callbacksWorkCompletedFlag == false) {
        
            Thread.sleep (1000);
        }

        // Do computation with callback responses.

        // After computation is completed, exit run method.
    }
}

public class MyResponseCallback implements Consumer<String> {
    
    @override
    public void accept (final Sting response) {
    
        // Do work with response.
    }
}

public class MyErrorCallback implements Consumer<Throwable> {
    
    @override
    public void accept (final Throwable error) {
    
        // Log error.
    }
}

Is there a better way to do this in Java/Spring boot?

2

There are 2 best solutions below

5
scrhartley On BEST ANSWER

Here is an example using CompletableFuture. It uses the third parameter for Mono.subscribe to let the future know when it's done.

@Override
public void run() {
    Mono<String> response1 = client1.post();
    CompletableFuture<?> future1 = new CompletableFuture<>();
    response1.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> future1.complete(null));

    Mono<String> response2 = client2.post();
    CompletableFuture<?> future2 = new CompletableFuture<>();
    response2.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> future2.complete(null));

    Mono<String> responseX = clientX.post();
    CompletableFuture<?> futureX = new CompletableFuture<>();
    responseX.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> futureX.complete(null));
    
    CompletableFuture.allOf(future1, future2, futureX).join();
}

Here is a CountDownLatch example:

@Override
public void run() {
    CountDownLatch latch = new CountDownLatch(3);

    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);

    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
    
    try {
        latch.await();
    } catch (InterruptedException ex) {}
}

Another CompletableFuture example:

@Override
public void run() {
    List<CompletableFuture<?>> futures = new ArrayList<>();
    Supplier<Runnable> onDone = () -> {
        CompletableFuture<?> future = new CompletableFuture<>();
        futures.add(future);
        return () -> future.complete(null);
    };

    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());

    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());

    CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
}

Are all the callbacks actually required?

@Override
public void run() {
    // Make requests
    Mono<String> responseMono1 = client1.post();
    Mono<String> responseMono2 = client2.post();
    Mono<String> responseMonoX = clientX.post();
    try {
        // Wait for requests to complete
        String response1 = responseMono1.block();
        String response2 = responseMono2.block();
        String responseX = responseMonoX.block();

        ...
    }
    catch (RuntimeException e) {
        ...
    }
}
0
M. Deinum On

You are using reactive programming but forget about that and try to solve it imperativly. You don't need to sleep, utilize the features you have with Project Reactor instead.

You can use zip to combine the result of different Mono together, then map the result into what you need. No need to check a boolean use countdown latches etc.

Mono<String> response1 = client1.post();   
Mono<String> response2 = client2.post();
Mono<String> responseX = clientX.post();
Mono<String> result = Mono
  .zip(response1, response2, response3)
  .map(//dosomething with the result of the 3 mono's)
  .subscribe();

What you are doing in your Consumer is a bit unclear, but you can probably use a map for that as well on each Mono (or if it is just to set a boolean you can remove them).