I had an Azure Function that uses an Orchestrator to sync products from an external source. Until today this function was invoked by a TimerTrigger or an HttpTrigger passing an InventoryId which was limiting the function to sync the specific product.
Now I need to add the queue support for it but I'm not able to inject the DurableTaskClient inside the IConsumer.
What I've done till now works but it does not use MassTransit Consumer at all. For how the producer is made, we are using MassTransit all around and we don't want to change the behavior and send a plain message without the whole body.
It's possible somehow to inject it or in the worst case to deserialize the message in a cleaner way respect to what I've done here
[Function(nameof(MessageReceiverHandler))]
public async Task Run(
[ServiceBusTrigger("%QueueName%", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message, [DurableClient] DurableTaskClient client, CancellationToken cancellationToken)
{
var str = message.Body.ToString();
var definition = new { message = new { inventoryId =3} };
var res = JsonConvert.DeserializeAnonymousType(str, definition);
await receiver.HandleConsumer<ProductSyncConsumer>(options.Value.QueueName, message, cancellationToken);
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(DurablePartnerSyncActions), res.message.inventoryId, cancellationToken);
logger.LogInformation("Started Individual orchestration with ID = '{instanceId}'.", instanceId);
}
public class ProductSyncConsumer : IConsumer<ExternalProductSyncSingle>
{
private readonly ILogger<ProductSyncConsumer> _logger;
public ProductSyncConsumer(ILogger<ProductSyncConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<ExternalProductSyncSingle> context)
{
return Task.FromResult(context.Message.InventoryId);
// DurableTaskClient client = default(DurableTaskClient);
// string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(DurablePartnerSyncActions), context.Message.InventoryId);
_logger.LogInformation("Started Individual orchestration with ID = '{instanceId}'.", 5);
// return await client.CreateCheckStatusResponseAsync(req, instanceId);
}
}
To trigger the Orchestration using message received in service bus queue you need to use Service Bus trigger, As you are using.
Mass Transit consumer is used to receive message and process it. For reference check this document.
You just need to use Queue name which Mass Transit consumer is using to consume, in your Service bus trigger. As Far As I Know
DurableTaskClientcannot be used in Mass Transit consumer directly.This worked for me.
Function.cs:OUTPUT:you can use message value and process it as per your requirement.