Using MassTransit to invoke a DurableTaskClient function

48 Views Asked by At

I had an Azure Function that uses an Orchestrator to sync products from an external source. Until today this function was invoked by a TimerTrigger or an HttpTrigger passing an InventoryId which was limiting the function to sync the specific product.

Now I need to add the queue support for it but I'm not able to inject the DurableTaskClient inside the IConsumer.

What I've done till now works but it does not use MassTransit Consumer at all. For how the producer is made, we are using MassTransit all around and we don't want to change the behavior and send a plain message without the whole body.

It's possible somehow to inject it or in the worst case to deserialize the message in a cleaner way respect to what I've done here

  [Function(nameof(MessageReceiverHandler))]
  public async Task Run(
      [ServiceBusTrigger("%QueueName%", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message, [DurableClient] DurableTaskClient client, CancellationToken cancellationToken)
  {
      var str = message.Body.ToString();

      var definition = new { message = new { inventoryId =3} };
      var res = JsonConvert.DeserializeAnonymousType(str, definition);

      await receiver.HandleConsumer<ProductSyncConsumer>(options.Value.QueueName, message, cancellationToken);

      string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(DurablePartnerSyncActions), res.message.inventoryId, cancellationToken);

      logger.LogInformation("Started Individual orchestration with ID = '{instanceId}'.", instanceId);
     
  }

 public class ProductSyncConsumer : IConsumer<ExternalProductSyncSingle>
 {
     private readonly ILogger<ProductSyncConsumer> _logger;

     public ProductSyncConsumer(ILogger<ProductSyncConsumer> logger)
     {
      
         _logger = logger;
     }

     public  Task Consume(ConsumeContext<ExternalProductSyncSingle> context)
     {
         return Task.FromResult(context.Message.InventoryId);
        // DurableTaskClient client = default(DurableTaskClient);
        // string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(DurablePartnerSyncActions), context.Message.InventoryId);

         _logger.LogInformation("Started Individual orchestration with ID = '{instanceId}'.", 5);
       //  return await client.CreateCheckStatusResponseAsync(req, instanceId);
     }
 }
1

There are 1 best solutions below

2
Vivek Vaibhav Shandilya On

To trigger the Orchestration using message received in service bus queue you need to use Service Bus trigger, As you are using.

Mass Transit consumer is used to receive message and process it. For reference check this document.

You just need to use Queue name which Mass Transit consumer is using to consume, in your Service bus trigger. As Far As I Know DurableTaskClient cannot be used in Mass Transit consumer directly.

This worked for me.

Function.cs:

using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;

namespace FunctionApp16
{
    public static class Function
    {
        [Function(nameof(Function))]
        public static async Task<List<string>> RunOrchestrator(
            [OrchestrationTrigger] TaskOrchestrationContext context)
        {
            ILogger logger = context.CreateReplaySafeLogger(nameof(Function));
            logger.LogInformation("Saying hello.");
            var outputs = new List<string>();

            outputs.Add(await context.CallActivityAsync<string>(nameof(SayHello), "Tokyo"));
            outputs.Add(await context.CallActivityAsync<string>(nameof(SayHello), "Seattle"));
            outputs.Add(await context.CallActivityAsync<string>(nameof(SayHello), "London"));

            return outputs;
        }

        [Function(nameof(SayHello))]
        public static string SayHello([ActivityTrigger] string name, FunctionContext executionContext)
        {
            ILogger logger = executionContext.GetLogger("SayHello");
            logger.LogInformation("Saying hello to {name}.", name);
            return $"Hello {name}!";
        }

        [Function("servicebus")]
        public static async Task Run(
            [ServiceBusTrigger(queueName:"hello",Connection ="sb_conn")]string message,
            [DurableClient] DurableTaskClient client,
            FunctionContext executionContext)
        {
            ILogger logger = executionContext.GetLogger("servicebus");
            var msg = message.Body.ToString();

            string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(Function));
            
            logger.LogInformation($"Message content: {msg}");

            logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);
        }
    }
}

OUTPUT:

you can use message value and process it as per your requirement.

Message content: {
[2024-03-25T09:57:57.128Z]   "messageId": "043e0000-7a6f-b022-cf98-08dc4cb20be6",
[2024-03-25T09:57:57.139Z]   "requestId": null,
[2024-03-25T09:57:57.142Z]   "correlationId": null,
[2024-03-25T09:57:57.145Z]   "conversationId": "043e0000-7a6f-b022-d097-08dc4cb20be6",
[2024-03-25T09:57:57.147Z]   "initiatorId": null,
[2024-03-25T09:57:57.151Z]   "sourceAddress": "sb://durablefuncsb.servicebus.windows.net/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx?autodelete=300",
[2024-03-25T09:57:57.154Z]   "destinationAddress": "sb://durablefuncsb.servicebus.windows.net/Contracts/Hello?type=topic",
[2024-03-25T09:57:57.157Z]   "responseAddress": null,
[2024-03-25T09:57:57.159Z]   "faultAddress": null,
[2024-03-25T09:57:57.163Z]   "messageType": [
[2024-03-25T09:57:57.167Z]     "urn:message:Contracts:Hello"
[2024-03-25T09:57:57.170Z]   ],
[2024-03-25T09:57:57.172Z]   "message": {
[2024-03-25T09:57:57.175Z]     "name": "Vivek"
[2024-03-25T09:57:57.182Z]   },
[2024-03-25T09:57:57.185Z]   "expirationTime": null,
[2024-03-25T09:57:57.192Z]   "sentTime": "2024-03-25T09:57:56.7333272Z",
[2024-03-25T09:57:57.195Z]   "headers": {},
[2024-03-25T09:57:57.201Z]   "host": {
[2024-03-25T09:57:57.203Z]     "machineName": "xxxxxxxxxxx",
[2024-03-25T09:57:57.207Z]     "processName": "MT_servicebus",
[2024-03-25T09:57:57.210Z]     "processId": 15876,
[2024-03-25T09:57:57.215Z]     "assembly": "MT_servicebus",
[2024-03-25T09:57:57.218Z]     "assemblyVersion": "1.0.0.0",
[2024-03-25T09:57:57.221Z]     "frameworkVersion": "6.0.28",
[2024-03-25T09:57:57.225Z]     "massTransitVersion": "8.1.3.0",
[2024-03-25T09:57:57.232Z]     "operatingSystemVersion": "Microsoft Windows NT 10.0.22631.0"
[2024-03-25T09:57:57.235Z]   }
[2024-03-25T09:57:57.237Z] }