slow sending messages with get_topic_sender to azure service bus with async python

235 Views Asked by At

I found this example with python, which sends messages to service bus topic, but this is very slow.

def send_messages():
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)

    async with servicebus_client:
        sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME)
        async with sender:
            await send_single_message(sender)

Every single example, when sends messages creates servicebus client and topic sender for every message. Is it possible to reuse topic sender? Creating topic sender is very slow. I create messages from iterator, I cannot batch them, I have to send messages one by one, but with this solution sending 100 messages takes over 120 seconds :/

I tried sync solution, with reusing topic sender and sending 100 messages takes only 5 seconds.

Ideally, I want to find solution to inject topic sender with dependency injector library.

1

There are 1 best solutions below

0
Sampath On
  • Example to Send messages service-bus topic sender with dependency injector.
import asyncio
import dependency_injector.containers as containers
import dependency_injector.providers as providers
from azure.servicebus import ServiceBusClient, ServiceBusMessage


class AzureServiceBusTopicSender:
    def __init__(self, connection_string, topic_name):
        self.client = ServiceBusClient.from_connection_string(connection_string)
        self.topic_name = topic_name

    def send_message(self, message):
        try:
            with self.client:
                topic_sender = self.client.get_topic_sender(topic_name=self.topic_name)
                topic_sender.send_messages(ServiceBusMessage(message))
                print(f"Successfully sent: {message}")
        except Exception as e:
            print(f"Failed to send: {message}, Error: {e}")


class Container(containers.DeclarativeContainer):
    azure_service_bus_connection_string = "Endpoint=sb://samaservicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey="
    azure_service_bus_topic_name = "sampath122"
    azure_service_bus_topic_sender = providers.Singleton(
        AzureServiceBusTopicSender,
        connection_string=azure_service_bus_connection_string,
        topic_name=azure_service_bus_topic_name,
    )


async def main():
    messages = ["Message 1", "Message 2", "Message 3"]
    sender = Container.azure_service_bus_topic_sender()
    for message in messages:
        sender.send_message(message)


if __name__ == "__main__":
    asyncio.run(main())

enter image description here

enter image description here

  • Creating a new service bus client and topic sender for every single message is not an optimal approach, and it can indeed lead to slower message-sending performance due to the overhead of creating and tearing down the connections for each message. To achieve better performance, you should create the service bus client and topic sender once and then reuse them for sending multiple messages. Yes, it is possible and recommended to reuse the topic sender when sending multiple messages to a Service Bus topic.

  • Sending 100 messages to Azure Service Bus should not take long if the messages are relatively small and the network connection is stable. However, if you want to introduce some delay between sending each message to simulate a real-world scenario or to avoid rate-limiting issues, you can use asyncio.sleep().

from  azure.servicebus  import  ServiceBusClient,  ServiceBusMessage
import  time
import  json

class  CustomMessage:
    def  __init__(self,  timestamp,  message_number):
        self.timestamp = timestamp
        self.message_number = message_number
    
class  AzureServiceBusTopicSender:
    def  __init__(self,  connection_string,  topic_name):
        self.client = ServiceBusClient.from_connection_string(connection_string)
        self.topic_name = topic_name
    def  send_message(self,  custom_message):
        try:
            with  self.client:
                topic_sender = self.client.get_topic_sender(topic_name=self.topic_name)
                message = ServiceBusMessage(json.dumps(custom_message.__dict__))  
                topic_sender.send_messages(message)
                print(f"Successfully sent: {custom_message.__dict__}")
        except  Exception  as  e:
            print(f"Failed to send: {custom_message.__dict__}, Error: {e}")
            
class  Container(containers.DeclarativeContainer):
    azure_service_bus_connection_string = "Endpoint=sb://sampathservicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=J"
    azure_service_bus_topic_name = "sampath122"
    azure_service_bus_topic_sender = providers.Singleton(
        AzureServiceBusTopicSender,
        connection_string=azure_service_bus_connection_string,
        topic_name=azure_service_bus_topic_name,
    )
    
async  def  main():
    sender = Container.azure_service_bus_topic_sender()
    for  i  in  range(1,  101):
        timestamp = time.time()
        custom_message = CustomMessage(timestamp=timestamp,  message_number=i)
        sender.send_message(custom_message)
        await  asyncio.sleep(0.1)  
        
if  __name__ == "__main__":
    asyncio.run(main())

enter image description here