MongoDB Batch read implementation issue with change stream replica set

849 Views Asked by At

Issue:
A inference generating process is writing around 300 inference data's to a MongoDB collection per second. Change stream feature of MongoDB is utilized by another process to read back back these inferences and do the post-processing. Currently, only a single inference data is returned when the change stream function API (mongoc_change_stream_next())is called. So, a total of 300 such calls is required to get all inference data stored within 1 second. However, after each read, around 50ms of time is required to perform the post-processing for single/multiple inference data. Because of the single data return model, an effective latency of 15x is introduced. To tackle this issue, we are trying to implement a batch read mechanism in-line with change stream feature of MongoDB. We tried various options to implement the same, but still getting only one data after each change stream API call. Is there any way to sort out this issue?

Platform:
OS: Ubuntu 16.04
Mongo-c-driver: 1.15.1
Mongo server : 4.0.12

Options tried out:
Setting the batch size of the cursor to more than 1.

int main(void) {
    const char *uri_string = "mongodb://localhost:27017/replicaSet=set0";
    mongoc_change_stream_t *stream;
    mongoc_collection_t *coll;
    bson_error_t error;
        mongoc_uri_t *uri;
    mongoc_client_t *client;

    /*
    * Add the Mongo DB blocking read and scall the inference parse function with the Json
                 * */
    uri = mongoc_uri_new_with_error (uri_string, &error);
    if (!uri) {
        fprintf (stderr,
        "failed to parse URI: %s\n"
        "error message:       %s\n",
        uri_string,
        error.message);
        return -1;
    }

    client = mongoc_client_new_from_uri (uri);
    if (!client) {
        return -1;
    }

    coll = mongoc_client_get_collection (client,  <DB-NAME>, <collection-name>);
    stream = mongoc_collection_watch (coll, &empty, NULL);
    mongoc_cursor_set_batch_size(stream->cursor, 20);
    while (1){
        while (mongoc_change_stream_next (stream, &doc)) {
            char *as_json = bson_as_relaxed_extended_json (doc, NULL); 
            ............
            ............
            //post processing consuming 50 ms of time
            ............
            ............
        }
        if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
            if (!bson_empty (err_doc)) {
                fprintf (stderr,
                "Server Error: %s\n",
                bson_as_relaxed_extended_json (err_doc, NULL));
            } else {
                fprintf (stderr, "Client Error: %s\n", error.message);
            }
            break;
        }
    }
    return 0;
}

1

There are 1 best solutions below

0
On

Currently, only a single inference data is returned when the change stream function API (mongoc_change_stream_next())is called

Technically it's not that a single document is returned. This is because mongoc_change_stream_next() iterates the underlying cursor, setting each bson to the next document. So, even the batch size returned is more than one, it still has to iterate per document.

You could try:

  • Create separate threads to process the documents in parallel, so you don't have to wait 50ms per document or 15 seconds accumulatively.

  • Loop through a batch of documents, i.e. 50 cache them then perform a batch processing

  • Batch process them on separate threads (combination of the two above)