public class MessagePublisher : BackgroundService
{
private readonly ILogger<MessagePublisher> _logger;
private readonly IBusControl _busControl;
private readonly IMessageIcd _messageIcd;
private readonly IArtemisMqOptions _artemisMqOptions;
private ITestHarness _testHarness;
public MessagePublisher(ILogger<MessagePublisher> logger, IArtemisMqOptions artemisMqOptions, IBusControl busControl, IMessageIcd messageIcd, ITestHarness testHarness)
{
_logger = logger;
_artemisMqOptions = artemisMqOptions ?? throw new ArgumentNullException(nameof(artemisMqOptions));
_busControl = busControl ?? throw new ArgumentNullException(nameof(busControl));
_messageIcd = messageIcd ?? throw new ArgumentNullException(nameof(messageIcd));
_testHarness = testHarness;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Message Publisher started");
try
{
await _testHarness.Start();
while (!stoppingToken.IsCancellationRequested)
{
if (_messageIcd != null)
{
await _testHarness.Bus.Publish(_messageIcd, ctx => ctx.DestinationAddress = new Uri($"queue:{_artemisMqOptions.InQueue}"));
var consumerTestHarness = _testHarness.GetConsumerHarness<MessageConsumer>();
Assert.True(await _testHarness.Published.Any<IMessageIcd>(), "Message not published");
Assert.True(await consumerTestHarness.Consumed.Any<IMessageIcd>(), "Message not consumed");
_logger.LogInformation("Message published successfully.");
}
else
{
_logger.LogWarning("MessageIcd is null. Skipping publishing.");
}
await Task.Delay(1000, stoppingToken); // Adjust the delay as needed
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Publishing operation cancelled.");
}
catch (Exception ex)
{
_logger.LogError(ex, "An error occurred while publishing the message.");
throw; // Rethrow the exception for further handling at a higher level
}
}
}
public class MessageConsumer : IConsumer<IMessageIcd>
{
private readonly ILogger<MessageConsumer> _logger;
private readonly SemaphoreSlim _consumedSignal;
public MessageConsumer(ILogger<MessageConsumer> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_consumedSignal = new SemaphoreSlim(0);
}
public async Task Consume(ConsumeContext<IMessageIcd> context)
{
// Implement your message handling logic here
var message = context.Message;
// Test logic
_logger.LogInformation("Message consumed: {Message}", message);
// Signal that message has been consumed
_consumedSignal.Release();
await Task.CompletedTask;
}
public async Task WaitForMessageConsumption(TimeSpan timeout)
{
// Wait for the consumed signal or timeout
await _consumedSignal.WaitAsync(timeout);
}
}
[Fact]
public async Task TestSendMessageAsync1()
{
try
{
ICommonLogicMethods common = new CommonLogicMethods(_messageIcd);
var sfa = _sfaTestData.GenerateData();
var messageicd = common.GenerateIcd<ISingleFormAuthority>(sfa, MessageType.Authority, 1001, "000", "SystemCRC", 23000, GlobalEnums.MessageAction.Create, GlobalEnums.PayLoadDataType.Object);
if (messageicd?.Result != null)
{
var timeout = TimeSpan.FromSeconds(30);
using var source = new CancellationTokenSource(timeout);
var publisher = new MessagePublisher(_loggerPublish, _artemisMqOptions, _busControl, messageicd.Result, _harness);
await publisher.StartAsync(source.Token);
await Task.Delay(5000);
// Get consumer harness
var consumerTestHarness = _harness.GetConsumerHarness<MessageConsumer>();
// Wait for message consumption
var consumedMessage = await consumerTestHarness.Consumed.Any<IMessageIcd>(source.Token);
// Perform assertions
Assert.True(consumedMessage, "Message not consumed");
_loggerPublish.LogInformation("Message published and consumed successfully.");
}
}
catch (Exception ex)
{
throw ex;
}
finally
{
await _harness.Stop();
}
}
Here is my test case with publish and consume the message. Unable to consume the message. Can someone help me out what I am missing here? It is coming null as in context. The test case I am written in test project and publisher and consumer is in main project. How to consume with specific queue name?