I've always used OpenTracing to get distributed tracing of my APIs. But recently, I needed to propagate the trace to a .Net project, which works as a consumer of a RabbitMQ queue, so I used this article as a basis.
To get this tracking, including the messages fetched from the RabbitMQ queue, I needed to use OpenTelemetry.
In order to be able to trace the cycle of an end-to-end request (RabbitMQ Publisher API -> RabbitMQ Consumer API), I used the following flow below to configure this communication:
Class code that configures OpenTelemetry:
using framework.companyCSharp.Services.Queue.Folder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using System;
namespace company.project.CSharp.Extensions
{
public static class DistributedTracing
{
public static void ConfigureOpenTracing(this IServiceCollection services, IConfiguration configuration)
{
services.AddOpenTelemetryTracing(traceProvider =>
{
traceProvider
.AddSource(OpenTelemetryExtensions.ServiceName)
.SetResourceBuilder(
ResourceBuilder.CreateDefault()
.AddService(serviceName: configuration["Jaeger:JAEGER_SERVICE_NAME"],
serviceVersion: OpenTelemetryExtensions.ServiceVersion))
.AddAspNetCoreInstrumentation()
.AddJaegerExporter(exporter =>
{
exporter.AgentHost = configuration["Jaeger:JAEGER_AGENT_HOST"];
exporter.AgentPort = Convert.ToInt32(configuration["Jaeger:JAEGER_AGENT_PORT"]);
});
});
}
}
}
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
namespace framework.companyCSharp.Services.Queue.Folder
{
public static class OpenTelemetryExtensions
{
public static string Local { get; }
public static string Kernel { get; }
public static string Framework { get; }
public static string ServiceName { get; }
public static string ServiceVersion { get; }
static OpenTelemetryExtensions()
{
Local = Environment.MachineName;
Kernel = Environment.OSVersion.VersionString;
Framework = RuntimeInformation.FrameworkDescription;
ServiceName = typeof(OpenTelemetryExtensions).Assembly.GetName().Name + "RabbitMQ";
ServiceVersion = typeof(OpenTelemetryExtensions).Assembly.GetName().Version.ToString();
}
public static ActivitySource CreateActivitySource() =>
new ActivitySource(ServiceName, ServiceVersion);
}
}
Class code that sends messages to the RabbitMQ queue:
using framework.companyCSharp.Interfaces;
using framework.companyCSharp.Services.Queue.Folder;
using framework.companyCSharp.Services.Queue.Model;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry;
using RabbitMQ.Client;
using System;
using System.Diagnostics;
using System.Text;
using System.Threading.Channels;
using System.Collections.Generic;
using framework.companyCSharp.Extensions;
namespace framework.companyCSharp.Services.Queue
{
public class PublisherQueueService : QueueService, IPublisherQueueService
{
public PublisherQueueService(IOptions<QueueConfig> queueConfig) : base(queueConfig)
{}
public void SendMessage(object Mensagem)
{
try
{
if (Mensagem == null)
{
throw new Exception("PublisherQueueService: Mensagem nula");
}
var activityName = $"{this.NomeDaFila} send";
var activity = OpenTelemetryExtensions.CreateActivitySource()
.StartActivity(activityName, ActivityKind.Producer);
IBasicProperties properties = Canal.CreateBasicProperties();
properties.Persistent = true;
ActivityContext contextToInject = default;
if (activity != null)
{
contextToInject = activity.Context;
}
else if (Activity.Current != null)
{
contextToInject = Activity.Current.Context;
}
Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), properties,
InjectTraceContextIntoBasicProperties);
string mensagem = JsonConvert.SerializeObject(Mensagem);
byte[] body = Encoding.Default.GetBytes(mensagem);
Canal.BasicPublish(exchange: String.Empty, routingKey: QueueConfig.QueueName, basicProperties: properties, body: body);
}
catch (Exception ex)
{
throw ex;
}
}
private void InjectTraceContextIntoBasicProperties(IBasicProperties props, string key, string value)
{
try
{
if (props.Headers == null)
{
props.Headers = new Dictionary<string, object>();
}
props.Headers[key] = value;
}
catch (Exception ex)
{
throw new Exception("Failed to inject trace context." + ex.GetAllInnerExceptionMessage());
}
}
}
}
using framework.companyCSharp.Interfaces;
using framework.companyCSharp.Services.Queue.Model;
using Microsoft.Extensions.Options;
using OpenTelemetry.Context.Propagation;
using RabbitMQ.Client;
namespace framework.companyCSharp.Services.Queue
{
public class QueueService : IQueueService
{
protected readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
private string Servidor { get; set; }
private string Usuario { get; set; }
private string Senha { get; set; }
protected string NomeDaFila { get; set; }
protected IModel Canal { get; private set; }
protected IConnection Conexao { get; private set; }
public QueueConfig QueueConfig { get; private set; }
public QueueService(IOptions<QueueConfig> queueConfig)
{
if (queueConfig == null || queueConfig.Value == null)
throw new System.Exception("QueueService: Falha ao injetar queueConfig valor nulo");
this.QueueConfig = queueConfig.Value;
if (IsValidConfigs())
{
InicializaServidorDeFilas();
}
}
private bool IsValidConfigs()
{
bool resposta = true;
if (string.IsNullOrEmpty(QueueConfig.Server))
throw new System.Exception("O Nome do servidor de fila deve ser informado em appsettings");
if (string.IsNullOrEmpty(QueueConfig.UserName))
throw new System.Exception("UserName do servidor de fila deve ser informado em appsettings");
if (string.IsNullOrEmpty(QueueConfig.Password))
throw new System.Exception("Password do servidor de fila deve ser informado em appsettings");
if (string.IsNullOrEmpty(QueueConfig.QueueName))
throw new System.Exception("QueueName do servidor de fila deve ser informado em appsettings");
return resposta;
}
public void InicializaServidorDeFilas()
{
if (Canal != null && Canal.IsOpen) return;
this.Servidor = this.QueueConfig.Server;
this.Usuario = this.QueueConfig.UserName;
this.Senha = this.QueueConfig.Password;
this.NomeDaFila = this.QueueConfig.QueueName;
var factory = new ConnectionFactory()
{
HostName = this.Servidor,
UserName = this.Usuario,
Password = this.Senha
};
IConnection connection = factory.CreateConnection();
Canal = connection.CreateModel();
this.NomeDaFila = this.QueueConfig.QueueName;
Canal.QueueDeclare(queue: this.NomeDaFila, durable: true, exclusive: false, autoDelete: false, arguments: null);
}
}
}
Class code that gets and processes messages for the RabbitMQ queue:
using api.companyEFD.Domain.NotasFiscais.InterfaceServicos;
using api.companyEFD.Shared.ViewModels;
using framework.companyCSharp.Services.Queue;
using framework.companyCSharp.Services.Queue.Model;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using framework.companyCSharp.Extensions;
using framework.companyCSharp.Services;
using api.companyEFD.Shared.Resources;
using OpenTelemetry;
using System.Diagnostics;
using framework.companyCSharp.Services.Queue.Folder;
namespace api.companyEFD.Services
{
public class ConsumerQueueService : QueueService, IHostedService, IDisposable
{
private ISalvarNotaFiscalNoBanco _salvarNotaFiscalNoBanco;
private readonly IServiceProvider _serviceProvider;
static readonly SemaphoreSlim semaphoreSlim = new(1, 1);
private readonly IConfiguration _config;
public ConsumerQueueService(
IOptions<QueueConfig> queueConfig,
IServiceProvider serviceProvider,
IConfiguration config
) : base(queueConfig)
{
_serviceProvider = serviceProvider;
_config = config;
}
public void Dispose()
{
Canal.Close();
Conexao?.Close();
}
public Task StartAsync(CancellationToken cancellationToken)
{
try
{
cancellationToken.ThrowIfCancellationRequested();
var consumer = new EventingBasicConsumer(Canal);
consumer.Received += new EventHandler<BasicDeliverEventArgs>(
async delegate (Object canal, BasicDeliverEventArgs evento)
{
await OnRecebeMensagem(canal, evento);
});
consumer.Shutdown += OnConsumerShutdown;
consumer.Registered += OnConsumerRegistered;
consumer.Unregistered += OnConsumerUnregistered;
consumer.ConsumerCancelled += OnConsumerConsumerCancelled;
Canal.BasicConsume(NomeDaFila, false, consumer);
return Task.CompletedTask;
}
catch (Exception ex)
{
TratamentoDeErro(ex);
return Task.CompletedTask;
}
}
private void TratamentoDeErro(Exception ex)
{
var TraceId = Guid.NewGuid().ToString();
var OpenTracingId = GetOpenTracingId();
var erro = new
{
TraceId,
OpenTracingId,
Message = ex.GetAllInnerExceptionMessage(),
ex?.StackTrace
};
LogService.Log(erro.ToString());
}
private async Task OnRecebeMensagem(object canal, BasicDeliverEventArgs evento)
{
try
{
var parentContext = Propagator.Extract(default, evento.BasicProperties, this.ExtractTraceContextFromBasicProperties);
Baggage.Current = parentContext.Baggage;
var activityName = $"{evento.RoutingKey} receive";
using var activity = OpenTelemetryExtensions.CreateActivitySource()
.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext);
await semaphoreSlim.WaitAsync();
try
{
var content = Encoding.UTF8.GetString(evento.Body.ToArray());
QueueVM<JObject> mensagem = JsonConvert.DeserializeObject<QueueVM<JObject>>(content);
string Usuario = _config.GetValue<string>("SQLServer:Usuario");
string Senha = _config.GetValue<string>("SQLServer:Senha");
if (string.IsNullOrEmpty(Usuario) || string.IsNullOrEmpty(Senha))
{
throw new Exception(EfdResources.ClienteBdLoginNaoConfigurado);
}
else
{
mensagem.ConfigBd.usuario = Usuario;
mensagem.ConfigBd.senha = Senha;
}
if (mensagem?.Tipo == Shared.ENums.ETipoInformacao.NFe)
{
using IServiceScope scope = _serviceProvider.CreateScope();
_salvarNotaFiscalNoBanco = scope.ServiceProvider.GetRequiredService<ISalvarNotaFiscalNoBanco>();
await _salvarNotaFiscalNoBanco.SalvarNoBanco(mensagem.GetFirstData().ToObject<NotaFiscalVM>(), mensagem.ConfigBd);
}
Canal.BasicAck(evento.DeliveryTag, false);
}
finally
{
semaphoreSlim.Release();
}
}
catch (Exception ex)
{
TratamentoDeErro(new Exception("Erro ao processar mensagem do RabbitMQ." + "\n" + ex.GetAllInnerExceptionMessage()));
}
}
private IEnumerable<string> ExtractTraceContextFromBasicProperties(IBasicProperties props, string key)
{
try
{
if (props.Headers == null) return Enumerable.Empty<string>();
if (props.Headers.TryGetValue(key, out var value))
{
var bytes = value as byte[];
return new[] { Encoding.UTF8.GetString(bytes!) };
}
}
catch (Exception ex)
{
throw new Exception("Falha durante a extração do trace context: " + ex.GetAllInnerExceptionMessage());
}
return Enumerable.Empty<string>();
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
#region Eventos
private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }
private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }
private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }
private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }
private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
#endregion
}
}
After switching from OpenTracing to OpenTelemetry, when viewing the traces in JaegerUI, I can see that the end-to-end tracing works almost satisfactorily, with the exception that now, the hierarchical level is no longer shown, but the processes are displayed stacked one about the other. Which no longer allows verifying the correct order in which each communication occurred.
Using OpenTracing:
<PackageReference Include="Jaeger" Version="1.0.3" />
<PackageReference Include="OpenTracing.Contrib.NetCore" Version="0.8.0" />
Using OpenTelemetry:
<PackageReference Include="OpenTelemetry.Exporter.Jaeger" Version="1.3.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.0.0-rc9.4" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.0.0-rc9.4" />
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.0.0-rc9.4" />
Any help will be welcome.


Try using Odigos. It will automatically instrument your applications and generate distributed tracing (with context propagation) and send them to Jaeger (or any other destination your desire). Its a 5 minute installation. https://github.com/keyval-dev/odigos