I am currently using a websocket channel to implement a chat api, the websocket sink and stream work just fine for the most part, that is until i try to log out. I have it so that when I log out i close the channel using channel?.sink.close(), after that, whenever I log back in, i reconnect to the channel and re-declare the value of channel, then after I do that my channel sink still works just fine and i can add to it whenever, however the stream no longer updates or notifies me of the new messages.
I've tried changing how I approach the websocket stream, using it without asBroadcastStream(), saving it in it's own variable as a broadcast stream, and then add a listener that could tell me if it errored or is finished. however after closing the sink and then reconnecting, the stream doesn't update whatsoever.
below is the important code snippets.
BlocBuilder<MessageBloc, MessageState>(
builder: (BuildContext context, MessageState messageState) {
if (messageState is ListenedMessageState) {
messageController = messageState.messageController;
messageStream = messageState.messageController!.stream
.asBroadcastStream();
messageStream!.listen((event) {
print('stream event: $event');
}, onDone: () {
print('stream done.');
}, onError: (e, s) {
print('stream errored');
print(e);
print(s);
});
print('controller: $messageController');
}
return StreamBuilder<dynamic>(
stream: messageStream,
builder: (context, sp) {
print(sp.data);
if (sp.hasData) {
_addToMessageList(sp.data);
}....
@override
Future<void> cancelListenMessage() async {
await channel?.sink.close();
}
@override
Future<void> initializeWebSocket() async {
channel = WebSocketChannel.connect(Uri.parse('ws://13.48.221.106:8080'));
await channel!.ready.whenComplete(
() => print('Websocket chat channel successfully connected'));
// channel!.stream.asBroadcastStream().listen((event) {
// print('current stream event: $event');
// });
}
Future<MessageState?> _listenMessageSuccessOrFailure(
Either<Failure, WebSocketChannel> result, List<String> chatIds) async {
return result.fold((l) => ListenedMessageState(messageController: null),
(r) async {
try {
final StreamController _controller1 =
StreamController<dynamic>.broadcast();
_controller1.addStream(r.stream);
await _showNotification(_controller1, chatIds);
return ListenedMessageState(messageController: _controller1);
} catch (_) {
print(
'Failed to broadcast stream, mightve been already listened to....');
return null;
}
});
}