I have a use case to collect requests from clients , batch them and then the combined request is to be sent for processing, with the result of processing to be having sent back to each of the clients.I would have to wait for about 10ms before batching - this is to be achieved using Dropwizard( Jersey for JAX-RS and Jetty as Web server)
Since requests are handled by using a thread pool, my approach was to use a delay queue, as follows:
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.DelayQueue;
@Path("/helloworld")
@Produces(MediaType.APPLICATION_JSON)
public class HelloWorldResource {
private final String template;
private final String defaultName;
private Long endOfInterval;
private BlockingQueue<ModifiedRequest> blockingDelayQueue;
private List<ModifiedRequest> listOfRequests;
private ArrayList<Response> listOfResponses;
public HelloWorldResource(String template, String defaultName) {
this.template = template;
this.defaultName = defaultName;
this.endOfInterval = System.currentTimeMillis()+10;
this.listOfResponses = new ArrayList<>(10);
this.listOfRequests = new CopyOnWriteArrayList<>();
this.blockingDelayQueue = new DelayQueue<>();
}
@GET
@Timed
@Produces(MediaType.APPLICATION_JSON)
public Response sayHello(@QueryParam("name") Optional<String> name, @Context Response response) throws IOException, InterruptedException {
String message = String.format(template, name.orElse(defaultName));
// In a certain thread
Long currentTimeStamp = System.currentTimeMillis();
Long timeLeftInMS = endOfInterval-currentTimeStamp;
if (timeLeftInMS<0 && blockingDelayQueue.size() > 0){
this.endOfInterval = System.currentTimeMillis()+10;
//drain ready requests
blockingDelayQueue.drainTo(listOfRequests);
//Do some work, get a single result which is most like a list
List<String> listOfProcessedResponses = getProcessedMessage(this.listOfRequests);
for (int j=0;j< listOfProcessedResponses.size();j++){
return Response
.fromResponse(listOfResponses.get(j))
.entity(listOfProcessedResponses.get(j))
.build();
}
listOfProcessedResponses.clear();
listOfResponses.clear();
}
else{
blockingDelayQueue.add(new ModifiedRequest(timeLeftInMS,message));
listOfResponses.add(response);
}
}
public String modifyMessage(String message){
return "I got the string" + message + "of size: " + message.length();
}
public List<String> getProcessedMessage(List<ModifiedRequest> requests){
List<String> listOfStrings = new ArrayList<>();
for (int i =0; i<requests.size(); i++){
listOfStrings.add(modifyMessage(requests.get(i).getMessage()));
}
return listOfStrings;
}
}
First, this design has one serious flaw - I am waiting for 10 ms, if 9 requests come within 5 ms and the 10th one comes after an hour, then this would lead to starvation. What is the ideal way to overcome this scenario ? I am thinking of a cron job that periodically polls if a certain wait time has been exceeded, and start batching.
Second, the sayHello method is expecting a return value of Response just before the closing brace - how do we work around this? I need to send back responses to each of the waiting client from any thread that seems to hit the 10 ms mark.