Mocking EventHubAsyncCollector in Azure function using NSubstitute

80 Views Asked by At

I got a simple Azure function:

[FunctionName("TestFunction")]
public async Task Run([CosmosDBTrigger(
    databaseName: "Test",
    containerName: "test",
    CreateLeaseContainerIfNotExists = true,
    MaxItemsPerInvocation = 200,
    FeedPollDelay = 10000,
    Connection = "TestConnection",            
    LeaseContainerName = "testLeases")]IReadOnlyList<Document> inputs,
    [EventHub("test", Connection = "TestConnectionString")] IAsyncCollector<EventData> outputEvents,
    ILogger log)
{
    if (inputs != null && inputs.Count > 0)
    {
        try
        {
            await inputs.ForEachAsync(dop: 10, input =>
            {
                var id = input.GetPropertyValue<string>("id");
                var eventData = new EventData(Encoding.UTF8.GetBytes(input.ToString()));
                return outputEvents.AddAsync(eventData, id);
            });
        }
        catch (Exception exception)
        {
            _logger.LogError(exception, $"Error occurred while sending reports feed to datalake: {exception.Message}");
        }
    }
}

I'm trying to mock outputEvents and assert that AddAsync(eventData, id) has received a call. While it's no problem for AddAsync(eventData), it's a bit problematic for the above case, as I receive an exception from EventHubWebJobsExtensions:

public static Task AddAsync(this IAsyncCollector<EventData> instance, EventData eventData, string partitionKey, CancellationToken cancellationToken = default(CancellationToken)) =>
    instance switch
    {
        EventHubAsyncCollector ehCollector => ehCollector.AddAsync(eventData, partitionKey, cancellationToken),
        _ => throw new InvalidOperationException("Adding with a partition key is only available when using the Event Hubs extension package.")
    };

For AddAsync(eventData) I simply use suggested approach:

    public class MockAsyncCollector<T> : IAsyncCollector<T>
    {
        public readonly List<T> Items = new List<T>();

        public Task AddAsync(T item, CancellationToken cancellationToken = default)
        {
            Items.Add(item);
            return Task.FromResult(true);
        }

        public Task FlushAsync(CancellationToken cancellationToken = default)
        {
            return Task.FromResult(true);
        }
    }

Which works fine, but I cannot figure out the solution for AddAsync(eventData, id).

Currently my test looks like this:

public class Tests
{
    private readonly MockLogger<Test> _logger;
    private readonly Test _processor;

    public Tests()
    {
        _logger = Substitute.For<MockLogger<Test>>();
        _processor = new Test(_logger);
    }
    
    [Fact]
    public async Task Run_GivenProperMessage_ShouldNotThrowAndCallAddAsync()
    {
        // Arrange
        var input = new List<Document>();
        var inputDocument = new Document();
        inputDocument.SetPropertyValue("id", Guid.NewGuid().ToString());
        input.Add(inputDocument);

        var output = Substitute.For<IAsyncCollector<EventData>>();
        
        // Act
        var act = async () => await _processor.Run(input, output, _logger);

        // Assert
        await act.Should().NotThrowAsync();
        await output.Received(1).AddAsync(Arg.Any<EventData>(), Arg.Any<string>());
    }
}

Seems the reason for an exception is a switch, which expects EventHubAsyncCollector, instead it got an NSubstitute ObjectProxy.

1

There are 1 best solutions below

0
Suresh Chikkam On

The issue is due to the fact that the IAsyncCollector<EventData> instance in your test is a substitute from NSubstitute, and the AddAsync method is not being intercepted correctly due to the switch statement in the AddAsync extension method.

  • Create a custom implementation of IAsyncCollector<EventData> that correctly implements the AddAsync method with partition key support.
public class MockEventHubAsyncCollector : IAsyncCollector<EventData>
{
    public readonly List<(EventData, string)> Items = new List<(EventData, string)>();

    public Task AddAsync(EventData item, string partitionKey, CancellationToken cancellationToken = default)
    {
        Items.Add((item, partitionKey));
        return Task.CompletedTask;
    }

    public Task FlushAsync(CancellationToken cancellationToken = default) => Task.CompletedTask;
}

Test:

// Arrange
var input = new List<Document>();
var inputDocument = new Document();
var id = Guid.NewGuid().ToString();
inputDocument.SetPropertyValue("id", id);
input.Add(inputDocument);

var output = new MockEventHubAsyncCollector();
    
// Act
var act = async () => await _processor.Run(input, output, _logger);

// Assert
await act.Should().NotThrowAsync();
output.Items.Should().HaveCount(1);
output.Items[0].Item2.Should().Be(id); // Check the partition key

Here is my custom implementation for testing purposes:

using System.Collections.Generic;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;

namespace FunctionApp23
{
    public class Function1
    {
        private readonly ILogger _logger;

        public Function1(ILoggerFactory loggerFactory)
        {
            _logger = loggerFactory.CreateLogger<Function1>();
        }

        [Function("Function1")]
        public async Task Run(
            [CosmosDBTrigger(
                databaseName: "databaseName",
                collectionName: "collectionName",
                ConnectionStringSetting = "CONN_STRING",
                LeaseCollectionName = "leases")] IReadOnlyList<MyDocument> input,
            [EventHub("test", Connection = "EventHubConnectionString")] IAsyncCollector<MyEventData> outputEvents)
        {
            if (input != null && input.Count > 0)
            {
                _logger.LogInformation("Documents modified: " + input.Count);
                _logger.LogInformation("First document Id: " + input[0].Id);

                // Create a custom mock for IAsyncCollector<MyEventData>
                var mockOutput = new MockEventHubAsyncCollector();

                // Process the documents and send to Event Hub
                foreach (var document in input)
                {
                    var eventData = new MyEventData
                    {
                        Id = document.Id,
                        Text = document.Text,
                        Number = document.Number,
                        Boolean = document.Boolean
                    };

                    await outputEvents.AddAsync(eventData, document.Id);
                }

                // Use the mock to assert expectations
                // For example: Assert that AddAsync was called with the correct parameters
                // mockOutput.Items should contain the expected values
            }
        }
    }

    public class MyEventData
    {
        public string Id { get; set; }

        public string Text { get; set; }

        public int Number { get; set; }

        public bool Boolean { get; set; }
    }

    public class MockEventHubAsyncCollector : IAsyncCollector<MyEventData>
    {
        public readonly List<(MyEventData, string)> Items = new List<(MyEventData, string)>();

        public Task AddAsync(MyEventData item, string partitionKey, System.Threading.CancellationToken cancellationToken = default)
        {
            Items.Add((item, partitionKey));
            return Task.CompletedTask;
        }

        public Task FlushAsync(System.Threading.CancellationToken cancellationToken = default) => Task.CompletedTask;
    }
}

Sample data:

enter image description here

  • Set of events (MyEventData instances) representing the transformed data from the Cosmos DB Trigger, and each event is associated with the correct partition key. The specific details of the expected result depend on the input documents given in the Azure Function.

Produced Events:

enter image description here