Using MassTransit on an Azure Function to receive a message and process it

95 Views Asked by At

I have an Azure function that till now was receiving a classical normal formatted message that another application sent and processed it.

I've discovered MassTransit and wanted to use it in both the ASP.Net application and the Azure Function receiver. But I'm not able to set correctly in the Handler the function to decode and process the message. I've also tried to follow this post but it doesn't seem to to work on 8.x version of MassTransit.

Here's my registration file and the handler

Handler

public class MessageReceiverHandler
{
    private readonly ILogger<MessageReceiverHandler> _logger;
    private readonly IKlaviyoNameFormSender _klaviyoNameFormSender;
    private readonly IConsumer<MyOrderItemFormFilledPayload> _consumer;
    public MessageReceiverHandler(ILogger<MessageReceiverHandler> logger, IKlaviyoNameFormSender klaviyoNameFormSender, IConsumer<MyOrderItemFormFilledPayload> consumer)
    {
        _logger = logger;
        _klaviyoNameFormSender = klaviyoNameFormSender;
        _consumer = consumer;
    }

    [Function(nameof(MessageReceiverHandler))]
    public async Task Run(
        [ServiceBusTrigger("sbq-klaviyo-orderitem-filled-form", Connection = "ServiceBusConnection")]
        ServiceBusReceivedMessage message,
        ServiceBusMessageActions messageActions)
    {
        _logger.LogInformation("Message ID: {id}", message.MessageId);
        _logger.LogInformation("Message Body: {body}", message.Body);

      
        var orderItem = JsonConvert.DeserializeObject<MessageBody>(message.Body.ToString());
        await _klaviyoNameFormSender.SendKlaviyoNameFormSender(orderItem.OrderItemId, default);

        await messageActions.CompleteMessageAsync(message);
    }
}

Registration

public static IHostBuilder RegisterServices(this IHostBuilder builder)
{
    builder.ConfigureServices((context, services) =>
    {
        services.AddScoped<IKlaviyoNameFormSender, KlaviyoNameFormSender>();
        services.AddScoped<ITTGLinq2DbRepository, TTGLinq2DbRepository>();

        services.Configure<AzureServiceBusSettings>(context.Configuration.GetSection("AzureServiceBusSettings"));
        services.AddApplicationInsightsTelemetryWorkerService();
        services.ConfigureFunctionsApplicationInsights();
        services.Configure<KlaviyoOption>(opt =>
        {
[...omiss...]
        });

        services.AddMassTransit(x =>
        {
            x.AddConsumer<KlaviyoNameCollectionMessageConsumer>();
            x.UsingAzureServiceBus((ctx, cfg) =>
                {
                    var settings = ctx.GetRequiredService<IOptions<AzureServiceBusSettings>>().Value;

                    cfg.Host(settings.ConnectionString);

                    cfg.ReceiveEndpoint(settings.QueueName, e =>
                    {
                        e.ConfigureConsumer<KlaviyoNameCollectionMessageConsumer>(ctx);
                    });
                });
        });
    });
    return builder;
}

Consumer

 public class KlaviyoNameCollectionMessageConsumer
     : IConsumer<KlaviyoOrderItemFormFilledPayload>
 {
     public async Task Consume(ConsumeContext<KlaviyoOrderItemFormFilledPayload> context)
     {
         return;
         //TO IMPLEMENT
     }
 }

 public class KlaviyoOrderItemFormFilledPayload
 {
     public int  OrderItemId { get; set; }
 }

Can someone explain to me how to do that? I've read that it should be best to have an app service that runs with a background service listening on the queue, but we have a defined schema with all AzureFunction and I would like to keep the same behavior.

1

There are 1 best solutions below

2
Vivek Vaibhav Shandilya On

This worked for me. I create a Mass Transit Application by following the reference post you have mentioned.

command:

dotnet new -i MassTransit.Templates
dotnet new mtworker -n MT_servicebus
dotnet new mtconsumer -n Welcome

I have created a service trigger function in .NET 8 isolated and used the Worker and Contaracts code in the function.

My Directory:

I added the service from Mass Transit application's Program.cs file to Function's Program.cs file.

Program.cs:

using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using MassTransit;
using System.Reflection;
using System.Threading.Tasks;
using MT_servicebus;

var host = new HostBuilder()
    .ConfigureFunctionsWebApplication()
    .ConfigureServices(services =>
    {
        services.AddApplicationInsightsTelemetryWorkerService();
        services.ConfigureFunctionsApplicationInsights();
        services.AddMassTransit(x =>
        {
            x.SetKebabCaseEndpointNameFormatter();

            x.SetInMemorySagaRepositoryProvider();

            var entryAssembly = Assembly.GetEntryAssembly();

            x.AddConsumers(entryAssembly);
            x.AddSagaStateMachines(entryAssembly);
            x.AddSagas(entryAssembly);
            x.AddActivities(entryAssembly);
            x.UsingAzureServiceBus(configure: (context, config) =>
            {
                config.Host("Endpoint=sb://servicebusouteventgrid.servicebus.windows.net/;SharedAccessKeyName=xxxxxxxxxxxx");
                config.ConfigureEndpoints(context);
            });
            services.AddHostedService<Worker>();
        });
    })
    .Build();

host.Run();

Worker.cs:

using Contracts;
using MassTransit;
using Microsoft.Extensions.Hosting;
using System.Threading;
using System.Threading.Tasks;

namespace MT_servicebus
{
    public class Worker: BackgroundService
    {
        readonly IBus _bus;
        
        public Worker(IBus bus)
        {
            _bus = bus;
        }
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                await _bus.Publish(new Welcome
                {
                    Name = "Vivek"
                },stoppingToken);
                await Task.Delay(1000,stoppingToken);
            }
        }
    }
}

Contracts/Welcome.cs:

namespace Contracts
{
    public record Welcome
    {
        public string Name { get; init; }
    }
}

Function1.cs:

using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace FunctionApp10
{
    public class Function1
    {
        private readonly ILogger<Function1> _logger;

        public Function1(ILogger<Function1> logger)
        {
            _logger = logger;
        }

        [Function(nameof(Function1))]
        public async Task Run(
            [ServiceBusTrigger("welcome", Connection = "conn")]
            ServiceBusReceivedMessage message,
            ServiceBusMessageActions messageActions)
        {
            _logger.LogInformation("Message ID: {id}", message.MessageId);
            _logger.LogInformation("Message Body: {body}", message.Body);
            _logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);

            // Complete the message
            await messageActions.CompleteMessageAsync(message);
        }
    }
}

OUTPUT:

Functions:

        Function1: serviceBusTrigger

For detailed output, run func with --verbose flag.
[2024-03-13T11:31:01.985Z] Host lock lease acquired by instance ID '000000000000000000000000116B5F66'.
[2024-03-13T11:31:06.493Z] Executing 'Functions.Function1' (Reason='(null)', Id=79c28ced-01a0-4e5f-b8b0-7c26c997eb84)
[2024-03-13T11:31:06.497Z] Trigger Details: MessageId: 303d00007a6fb022aa1f08dc43510f5d, SequenceNumber: 1039, DeliveryCount: 1, EnqueuedTimeUtc: 2024-03-13T11:31:07.1550000+00:00, LockedUntilUtc: 2024-03-13T11:36:07.1550000+00:00, SessionId: (null)
[2024-03-13T11:31:06.664Z] Message ID: 303d00007a6fb022aa1f08dc43510f5d
[2024-03-13T11:31:06.664Z] Message Content-Type: application/vnd.masstransit+json
[2024-03-13T11:31:06.664Z] Message Body: {
[2024-03-13T11:31:06.668Z]   "messageId": "303d0000-7a6f-b022-aa1f-08dc43510f5d",
[2024-03-13T11:31:06.669Z]   "requestId": null,
[2024-03-13T11:31:06.670Z]   "correlationId": null,
[2024-03-13T11:31:06.671Z]   "conversationId": "303d0000-7a6f-b022-b9fc-08dc43510f60",
[2024-03-13T11:31:06.672Z] Start processing HTTP request POST http://127.0.0.1:64397/Settlement/Complete
[2024-03-13T11:31:06.672Z]   "initiatorId": null,
[2024-03-13T11:31:06.673Z] Sending HTTP request POST http://127.0.0.1:64397/Settlement/Complete
[2024-03-13T11:31:06.673Z]   "sourceAddress": "sb://servicebusouteventgrid.servicebus.windows.net/REDACTED_FunctionApp10_bus_gy6oyyd4p6anrc8cbdqrgwecbx?autodelete=300",
[2024-03-13T11:31:06.676Z]   "destinationAddress": "sb://servicebusouteventgrid.servicebus.windows.net/Contracts/Welcome?type=topic",
[2024-03-13T11:31:06.677Z]   "responseAddress": null,
[2024-03-13T11:31:06.678Z]   "faultAddress": null,
[2024-03-13T11:31:06.679Z]   "messageType": [
[2024-03-13T11:31:06.680Z]     "urn:message:Contracts:Welcome"
[2024-03-13T11:31:06.681Z]   ],
[2024-03-13T11:31:06.682Z]   "message": {
[2024-03-13T11:31:06.682Z]     "name": "Vivek"
[2024-03-13T11:31:06.683Z]   },
[2024-03-13T11:31:06.684Z]   "expirationTime": null,
[2024-03-13T11:31:06.684Z]   "sentTime": "2024-03-13T11:31:00.8976415Z",
[2024-03-13T11:31:06.685Z]   "headers": {},
[2024-03-13T11:31:06.686Z]   "host": {
[2024-03-13T11:31:06.686Z]     "machineName": "REDACTED",
[2024-03-13T11:31:06.687Z]     "processName": "FunctionApp10",
[2024-03-13T11:31:06.688Z]     "processId": 15664,
[2024-03-13T11:31:06.689Z]     "assembly": "FunctionApp10",
[2024-03-13T11:31:06.689Z]     "assemblyVersion": "1.0.0.0",
[2024-03-13T11:31:06.691Z]     "frameworkVersion": "8.0.3",
[2024-03-13T11:31:06.692Z]     "massTransitVersion": "8.1.3.0",
[2024-03-13T11:31:06.693Z]     "operatingSystemVersion": "Microsoft Windows NT 10.0.22631.0"
[2024-03-13T11:31:06.694Z]   }
[2024-03-13T11:31:06.695Z] }
[2024-03-13T11:31:07.083Z] Received HTTP response headers after 403.8996ms - 200
[2024-03-13T11:31:07.084Z] End processing HTTP request after 416.4544ms - 200
[2024-03-13T11:31:07.130Z] Executed 'Functions.Function1' (Succeeded, Id=79c28ced-01a0-4e5f-b8b0-7c26c997eb84, Duration=676ms)