So here is my websocket server implementation.
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.collect { case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage) }
.via(chatActorFlow(UUID.randomUUID()))
.map(event => TextMessage.Strict(protocol.serialize(event)))
def chatActorFlow(connectionId: UUID) : Flow[Protocol.Message, Protocol.Event, Any] = {
val sink = Flow[Protocol.Message]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))
val source = Source
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}
Flow.fromSinkAndSource(sink, source)
}
I'm wondering if there is any way to close connection once message of type ConnectionClosed is sent by chatRef?
The solution below allows to drop connections from the server side by terminating the Actor materialized by the
Source.actorRefstage. This is simply done by sending aPoisonPillto it.Now, it is still not clear to me how you'd like to identify a "banned" client at connection time, so the example is - on purpose - very simple: the server drops any connection after a maximum amount of clients are connected. If you want to use any other strategy to kick out clients at any time, you can still apply the same logic and send
PoisonPillto their own source actors.