I have an issue with a Redis client which I'm trying to integrate into a larger message broker.
The problem is that I am using the PUBSUB functionality of Redis in order to subscribe to a topic and the async implementation shown in the docs example does not properly react to disconnects from the server.
Basically doing a loop { tokio::select!{ Some(msg) = pubsub_stream.next() => { handle_message(msg); } } } would properly handle new messages, but when the server went down or got unreachable, I would not get notified and pubsub_stream.next() would wait forever on a dead connection. I assume that the client would then drop this connection as soon as a command would get sent to Redis, but this is a listen-only service with no intention to issue other commands.
So I tried to use an approach which I learned while adding WebSocket support via axum to this broker, where an unbound mpsc channel is used to deliver messages to a specific WebSocket client, and there it works.
The following is the approach which I'm trying to get to work, but for some reason the code in the select! loop is never executed. I'm intending to add more code from other channels to the select! loop, but I've removed them to keep this example clean.
Basically I am seeing the - 1 REDIS subscription event printouts but not the - 2 REDIS subscription event printouts.
pub async fn redis_async_task(storage: Arc<Storage>) {
//-----------------------------------------------------------------
let mut eb_broadcast_rx = storage.eb_broadcast_tx.subscribe();
let (mpsc_tx, mut mpsc_rx) = mpsc::unbounded_channel::<Msg>();
let mut interval_5s = IntervalStream::new(tokio::time::interval(Duration::from_secs(5)));
//-----------------------------------------------------------------
let _task = tokio::spawn({
async move {
loop {
tokio::select! {
Some(msg) = mpsc_rx.recv() => {
// Why is this never called?
let channel = msg.get_channel_name().to_string();
let payload = msg.get_payload::<String>().unwrap();
println!(" - 2 REDIS: subscription event: {} channel: {} payload: {}", channel, payload);
},
Some(_ts) = interval_5s.next() => {
// compute messages per second
println!("timer");
},
Ok(evt) = eb_broadcast_rx.recv() => {
// Some other events unrelated to Redis
if let Event::WsClientConnected{id: _id, name: _name} = evt {}
else if let Event::WsClientDisconnected{id: _id, name: _name} = evt {}
},
}
}
}
});
//-----------------------------------------------------------------
loop {
// loop which runs forever, reconnecting to Redis server upon disconnect
// and resubscribing to the topic.
println!("REDIS connecting");
let client = redis::Client::open("redis://127.0.0.1:6379/").unwrap();
if let Ok(mut connection) = client.get_connection() {
// We have a connection
println!("REDIS connected");
if let Err(error) = connection.subscribe(&["tokio"], |msg| {
// We are subscribed to the topic and receiving messages
if let Ok(payload) = msg.get_payload::<String>() {
let channel = msg.get_channel_name().to_string();
println!(" - 1 REDIS subscription event: channel: {} payload: {}", channel, payload);
// Send the message through the channel into the select! loop
if let Err(error) = mpsc_tx.send(msg) {
eprintln!("REDIS: error sending: {}", error);
}
};
// ControlFlow::Break(())
ControlFlow::<i32>::Continue
}) {
// Connection to Redis is lost, subscription aborts
eprintln!("REDIS subscription error: {:?} ", error);
};
} else {
// Connection to Redis failed, is probably not reachable.
println!("REDIS connection failed");
}
// Sleep for 1 second before reconnecting.
sleep(Duration::from_millis(1000)).await;
}
}
The code above is called from main like so, in parallel to other clients like WebSocket and MQTT, which do work.
#[tokio::main]
async fn main() {
// ...
tokio::spawn(task_redis::redis_async_task(storage.clone()))
// ...
}