Bulk writing in Azure Cosmos DB Java SDK

109 Views Asked by At

I have written below code for executing bulk operations (upserting the data) to cosmos db (NoSQL API).

I have written a job to store 100 000 data using Spring Schedular in batches of 300. The average Request Units taken by single data is 60-70. The container has 10 000 Request Unit (AutoScale).

AtomicInteger retry = new AtomicInteger(retryCount);
Flux<Data> gtinContainerFlux = Flux.fromIterable(dataset);
Flux<CosmosItemOperation> cosmosItemOperationFlux = gtinContainerFlux.map(data -> CosmosBulkOperations.getUpsertItemOperation(data, new PartitionKey(data.getKey())));

cosmosAsyncContainer.executeBulkOperations(cosmosItemOperationFlux, new CosmosBulkExecutionOptions())
        .subscribe(cosmosBulkOperationResponse -> {
            CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
            CosmosItemOperation cosmosItemOperation = cosmosBulkOperationResponse.getOperation();
            if (cosmosBulkOperationResponse.getException() != null) {
                log.warn("ERROR : {}", cosmosItemOperation.<Data>getItem().getId());
                log.error("Bulk operation failed", cosmosBulkOperationResponse.getException());
            } else if (cosmosBulkItemResponse == null || !cosmosBulkOperationResponse.getResponse().isSuccessStatusCode()) {
                log.error("The operation for Item ID: [{}]  Item PartitionKey Value: [{}] did not complete successfully with " + "a" + " {} response code.", cosmosItemOperation.<Data>getItem().getId(), cosmosItemOperation.<Data>getItem().getGtinKey(), cosmosBulkItemResponse != null ? cosmosBulkItemResponse.getStatusCode() : "n/a");
                errorEvents.add(cosmosItemOperation.<Data>getItem());
            } else {
                log.debug("Data posted successfully with RUs used : " + cosmosBulkItemResponse.getRequestCharge() + " completed in " + cosmosBulkItemResponse.getCosmosDiagnostics().getDuration().getNano());
            }
        });

When saving data in batches of 300 in async manner, it start giving me below issue

Request rate is large. More Request Units may be needed, so no changes were made. Please retry this request later

StackTrace:

at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager.messageReceiv
ed(RntbdRequestManager.java:1121)
2024-01-15T11:09:54.963941386Z  at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager.channelRead(RntbdRequestManager.java:214)
2024-01-15T11:09:54.963946986Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
2024-01-15T11:09:54.963972387Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
2024-01-15T11:09:54.963978787Z  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
2024-01-15T11:09:54.963992287Z  at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
2024-01-15T11:09:54.963997587Z  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
2024-01-15T11:09:54.964002887Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
2024-01-15T11:09:54.964007787Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
2024-01-15T11:09:54.964012587Z  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
2024-01-15T11:09:54.964018187Z  at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
2024-01-15T11:09:54.964022987Z  at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
2024-01-15T11:09:54.964028088Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
2024-01-15T11:09:54.964033288Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
2024-01-15T11:09:54.964038288Z  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
2024-01-15T11:09:54.964043688Z  at io.netty.handler.timeou
t.IdleStateHandler.channelRead(IdleStateHandler.java:286)
2024-01-15T11:09:54.964048788Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
2024-01-15T11:09:54.964053688Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
2024-01-15T11:09:54.964058488Z  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
2024-01-15T11:09:54.964063888Z  at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
2024-01-15T11:09:54.964081088Z  at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1338)
2024-01-15T11:09:54.964086788Z  at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387)
2024-01-15T11:09:54.964091689Z  at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
2024-01-15T11:09:54.964096589Z  at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
2024-01-15T11:09:54.964101589Z  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
2024-01-15T11:09:54.964106389Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
2024-01-15T11:09:54.964111589Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
2024-01-15T11:09:54.964116489Z  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
2024-01-15T11:09:54.964121489Z  at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
2024-01-15T11:09:54.964126289Z  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
2024-01-15T11:09:54.964135789Z  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
2024-01-15T11:09:54.964140789Z  at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
2024-01-15T11:09:54.964145589Z  at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
2024-01-15T11:09:54.964155690Z  at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
2024-01-15T11:09:54.964160990Z  at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
2024-01-15T11:09:54.964166190Z  at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
2024-01-15T11:09:54.964171090Z  at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
2024-01-15T11:09:54.964176090Z  at java.base/java.lang.Thread.run(Thread.java:833)

Any log which are present in osmosBulkOperationResponse.getException() != null If block is not printed and I am not able to handle the error properly.

Can anyone give the some inputs or code changes or setting/options I need to set for handling error gracefully?

0

There are 0 best solutions below