What am I trying to achieve?
Calling async function inside the ActorHandler implementation
Code
so here is the struct:
pub struct WsConnectionManager {
sessions: HashMap<String, Socket>,
redis_client: RedisData, // redis client
}
I have methods implemented for the WsConnectionManager that accesses the sessions and redis_client and performs certain action. So I defined few methods for WsConnectionManager
impl WsConnectionManager {
pub fn new(redis_broadcast_manager: RedisData) -> WsConnectionManager {
WsConnectionManager {
sessions: HashMap::new(),
redis_broadcast_manager,
}
}
pub fn send_message(&mut self, message: &str, message_type: String, id_to: &String) {
if let Some(socket_recipient) = self.sessions.get(id_to) {
let _ = socket_recipient.do_send(WsMessage {
message_type,
data: message.to_owned(),
});
} else {
println!("attempting to send message but couldn't find user id.");
}
}
pub async fn remove_stream(&mut self, stream_id: &str) {
self.redis_broadcast_manager
.remove_value(stream_id)
.await
.unwrap();
}
pub async fn get_stream_viewers(&mut self, stream_id: &str) -> Vec<BroadcastState> {
let raw_stream_viewers = self
.redis_broadcast_manager
.get_value(stream_id)
.await
.unwrap();
let stream_viewers: Vec<BroadcastState> =
serde_json::from_str(raw_stream_viewers.as_ref().unwrap().as_str()).unwrap();
stream_viewers
}
pub async fn set_stream_viewers(&mut self, stream_id: &str, viewers: Vec<BroadcastState>) {
let viewers_json = serde_json::to_string(&viewers).unwrap();
self.redis_broadcast_manager
.set_value(stream_id, &viewers_json)
.await
.unwrap();
}
}
and then I implemented the Actor trait
impl Actor for WsConnectionManager {
type Context = Context<Self>;
}
then implementing the handler trait for different types of Message
impl Handler<Disconnect> for WsConnectionManager {
type Result = ();
fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
let key = format!("{}:{}", msg.user_id, msg.client_id);
if self.sessions.remove(&key).is_some() {
// send message to all other users in the room?
println!("DISCONNECTED: {}", key);
}
}
}
impl Handler<Connect> for WsConnectionManager {
type Result = ();
fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Self::Result {
let key = format!("{}:{}", msg.user_id, msg.client_id);
self.sessions.insert(key.clone(), msg.addr);
// self.send_message(&format!("your id is {}", key.clone()), &key);
println!("ACTIVE SESSION COUNT: {}", self.sessions.len());
}
}
impl Handler<BroadcastEvent> for WsConnectionManager {
type Result = ();
fn handle(&mut self, msg: BroadcastEvent, ctx: &mut Context<Self>) -> Self::Result {
let key = format!("player:{}:{}", msg.user_id, msg.client_id);
if msg.data.action == "broadcast_start" {
self.set_stream_viewers(&key, vec![]).await; <--- error here.
} else {
println!("UNKNOWN ACTION: {}", msg.data.action);
}
}
}
since self.set_stream_viewers(&key, vec![]); is async I need to add await but I can't make async fn handle in impl Handler<BroadcastEvent> for WsConnectionManager.
How can I call the async function inside the Handler implementation? Is there any other way/approach to go about this?
What have I tried so far?
- moving the
self.set_stream_viewers(&key, vec![]).awaitinsideasync movebut that gives error:lifetime may not live long enough returning this value requires that '1 must outlive 'staticwhich maybe solved by cloning theselfbut that is not efficient. - Using
block_in_place
block_in_place(|| {
tokio::runtime::Runtime::new().unwrap().block_on(async {
self.set_stream_viewers(&key, vec![]).await;
println!("STARTING STREAM: {}", key);
});
})
this gives runtime error:
can call blocking only when running on the multi-threaded runtime
- using
actix::spawn
actix::spawn(async move {
// remove entry from redis and broadcast end stream to everyone
self.remove_stream(&key).await;
println!("STOPPING STREAM: {}", key);
});
borrowed data escapes outside of method
`self` escapes the method body here
Here's my Cargo.toml:
[dependencies]
actix-web="4.5.1"
actix-web-actors="4.3.0"
actix = "0.13.3"
uuid = { version = "0.8", features = ["v4", "serde"] }
serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"
redis = { version = "0.24.0", features = ["tokio-comp"] }
env_logger = "0.11.2"
dotenv = "0.15.0"
actix-cors = "0.7.0"
actix-files = "0.6.5"
tokio = { version = "1", features = ["full"] }
and github repo: https://github.com/souvikinator/ws-server-rust