The technologies we are currently using are:
- Springboot(3.1.2)
- Azure ServiceBus(Topic)
- Spring JMS
We have followed the jms in Spring to access Azure Service Bus Documentation. If you need to know further.
I provide a high-level overview, We used JMS to handle message publishing(JMSTemplate) and listening(JMSListner) in the Azure ServiceBus Topic. JMS manages authorization and token generation, and we established the connection using clientId/secret.
Here we have to publish nearly millions of messages on the topic. Ultimately, The publishing process is too long. So in between we fetch the below-mentioned error while the publishing message process is ongoing. As a result, we may miss some messages in the topic.
org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:311) ~[spring-jms-6.0.11.jar!/:6.0.11]
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:184) ~[spring-jms-6.0.11.jar!/:6.0.11]
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:510) ~[spring-jms-6.0.11.jar!/:6.0.11]
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:587) ~[spring-jms-6.0.11.jar!/:6.0.11]
at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:664) ~[spring-jms-6.0.11.jar!/:6.0.11]
at com.cofinity.gr.orchestration.azure.servicebus.AzureServiceBusProvider.publishMessage(AzureServiceBusProvider.java:34) ~[classes!/:0.0.1-SNAPSHOT]
at com.cofinity.gr.orchestration.azure.servicebus.AzureServiceBusProvider.publishMessage(AzureServiceBusProvider.java:27) ~[classes!/:0.0.1-SNAPSHOT]
at com.cofinity.gr.orchestration.service.RawDataService.waitForCurationMessage(RawDataService.java:932) ~[classes!/:0.0.1-SNAPSHOT]
at com.cofinity.gr.orchestration.service.RawDataService.processClusterEntitiesWithValidationFailed(RawDataService.java:272) ~[classes!/:0.0.1-SNAPSHOT]
at com.cofinity.gr.orchestration.service.RawDataService.processToGenerateBpnV1(RawDataService.java:214) ~[classes!/:0.0.1-SNAPSHOT]
at jdk.internal.reflect.GeneratedMethodAccessor188.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Unknown Source) ~[na:na]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343) ~[spring-aop-6.0.11.jar!/:6.0.11]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:704) ~[spring-aop-6.0.11.jar!/:6.0.11]
at com.cofinity.gr.orchestration.service.RawDataService$$SpringCGLIB$$0.processToGenerateBpnV1(<generated>) ~[classes!/:0.0.1-SNAPSHOT]
at com.cofinity.gr.orchestration.tamr.bpn.BPNProcessor.process(BPNProcessor.java:32) ~[classes!/:0.0.1-SNAPSHOT]
at com.cofinity.gr.orchestration.tamr.bpn.BPNProcessor.process(BPNProcessor.java:20) ~[classes!/:0.0.1-SNAPSHOT]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:146) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:322) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:220) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:389) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:313) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-6.0.11.jar!/:6.0.11]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:256) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:362) ~[spring-batch-infrastructure-5.0.2.jar!/:5.0.2]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:206) ~[spring-batch-infrastructure-5.0.2.jar!/:5.0.2]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:139) ~[spring-batch-infrastructure-5.0.2.jar!/:5.0.2]
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:241) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:227) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:153) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:417) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:132) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:316) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:157) ~[spring-batch-core-5.0.2.jar!/:5.0.2]
at org.springframework.core.task.SimpleAsyncTaskExecutor$ConcurrencyThrottlingRunnable.run(SimpleAsyncTaskExecutor.java:285) ~[spring-core-6.0.11.jar!/:6.0.11]
at java.base/java.lang.Thread.run(Unknown Source) ~[na:na]
Caused by: jakarta.jms.JMSException: ExpiredToken: The token is expired. TrackingId:17702e61-fdc7-4d78-9cb8-fd482356a53e_G13, SystemTracker:NoSystemTracker, Timestamp:2024-01-29T11:26:36 TrackingId:f32eed2ed4f849219cd6513f2ae5e58f_G13, SystemTracker:gateway7, Timestamp:2024-01-29T11:26:36 [condition = com.microsoft:auth-failed]
at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.JmsConnection.send(JmsConnection.java:778) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.JmsNoTxTransactionContext.send(JmsNoTxTransactionContext.java:37) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.JmsSession.send(JmsSession.java:976) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.JmsSession.send(JmsSession.java:855) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.JmsMessageProducer.sendMessage(JmsMessageProducer.java:252) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.JmsMessageProducer.send(JmsMessageProducer.java:200) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.messaginghub.pooled.jms.JmsPoolMessageProducer.sendMessage(JmsPoolMessageProducer.java:194) ~[pooled-jms-3.1.0.jar!/:na]
at org.messaginghub.pooled.jms.JmsPoolMessageProducer.send(JmsPoolMessageProducer.java:88) ~[pooled-jms-3.1.0.jar!/:na]
at org.messaginghub.pooled.jms.JmsPoolMessageProducer.send(JmsPoolMessageProducer.java:77) ~[pooled-jms-3.1.0.jar!/:na]
at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:637) ~[spring-jms-6.0.11.jar!/:6.0.11]
at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:611) ~[spring-jms-6.0.11.jar!/:6.0.11]
at org.springframework.jms.core.JmsTemplate.lambda$send$3(JmsTemplate.java:589) ~[spring-jms-6.0.11.jar!/:6.0.11]
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:507) ~[spring-jms-6.0.11.jar!/:6.0.11]
... 36 common frames omitted
Caused by: org.apache.qpid.jms.provider.ProviderException: ExpiredToken: The token is expired. TrackingId:17702e61-fdc7-4d78-9cb8-fd482356a53e_G13, SystemTracker:NoSystemTracker, Timestamp:2024-01-29T11:26:36 TrackingId:f32eed2ed4f849219cd6513f2ae5e58f_G13, SystemTracker:gateway7, Timestamp:2024-01-29T11:26:36 [condition = com.microsoft:auth-failed]
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToNonFatalException(AmqpSupport.java:181) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.getOpenAbortExceptionFromRemote(AmqpResourceBuilder.java:305) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:191) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:132) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:992) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:878) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:548) ~[qpid-jms-client-2.0.0.jar!/:na]
at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:541) ~[qpid-jms-client-2.0.0.jar!/:na]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1383) ~[netty-handler-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1246) ~[netty-handler-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1295) ~[netty-handler-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[netty-transport-classes-epoll-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) ~[netty-transport-classes-epoll-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) ~[netty-transport-classes-epoll-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.94.Final.jar!/:4.1.94.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.94.Final.jar!/:4.1.94.Final]
... 1 common frames omitted
After a few minutes, JMS will again generate the token for the provider, and message publishing will resume.
You need to implement token refreshing logic. The token should be refreshed before it expires to maintain continuous communication with Azure Service Bus.
Token refreshing implementation:
TokenManageris responsible for refreshing the token,ServiceBusClientencapsulates the communication with Azure Service Bus, andMessageSendersends messages using theServiceBusClientWhen an
ExpiredTokenExceptionis encountered, the token is refreshed, and the operation is retried with the new token.The
ServiceBusClientclass encapsulates the logic for sending messages to Azure Service Bus. It maintains the current authentication token and updates it when necessary.Reference:
Use JMS in Spring to access Azure Service Bus
Set lifetimes for tokens