I have a system that pushes messages to a RabbitMQ exchange and several services that consume that data.
The publishing code (F#) is the following:
type RabbitPublisher(connectionString, exchangeType: RMQExchangeType, exchangeName) =
let connection = createConnection connectionString
let channel = connection.CreateModel()
do
// set up the exchange
channel.ExchangeDeclare(exchangeName, exchangeType.Converted, durable = true)
info $"started publisher for exchange '{exchangeName}'"
member this.Send(message: byte array, ?key: string) =
try
let key = defaultArg key String.Empty
channel.BasicPublish(exchangeName, key, null, message)
with ex ->
error $"error while sending message to '{exchangeName}': {ex.Humanize()}"
member this.Purge(queueName) =
try
channel.QueuePurge(queueName) |> ignore
with ex ->
error $"error while purging queue '{queueName}': {ex.Humanize()}"
and when consuming, I have this code that takes a callback for every message:
type RabbitExchangeSubscriber(connectionString, exchangeType: RMQExchangeType, exchangeName, callback: string -> byte array -> unit, ?keys: string list, ?queueLength: int) =
let keys = defaultArg keys []
let queueLength = defaultArg queueLength 10
let connection = createConnection connectionString
let channel = connection.CreateModel()
let consumer = EventingBasicConsumer(channel)
member this.Start () =
// properties
let properties =
[
"x-max-length", box queueLength
]
|> dict
// set up the exchange
channel.ExchangeDeclare(exchangeName, exchangeType.Converted, durable = true)
let queue = channel.QueueDeclare(arguments = properties)
match keys with
| [] -> channel.QueueBind(queue.QueueName, exchangeName, String.Empty)
| k -> k |> List.iter (fun x -> channel.QueueBind(queue.QueueName, exchangeName, x))
// purge the queue on startup
channel.QueuePurge(queue.QueueName) |> ignore
// register the callback
consumer.Received.AddHandler(fun model ea -> callback ea.RoutingKey (ea.Body.ToArray()))
// get ready to consume events
channel.BasicConsume(queue.QueueName, true, consumer) |> ignore // auto-ack
member this.Stop () =
channel.Close ()
The publisher is constantly pushing data. It is important to note that the data CAN be lost, if it's not consumed, it can disappear.
The consumers will get a lot of duplicate messages on startup and I do not understand why.
Is there anything obviously wrong with this setup?