I have a class that contains a stream that listens for objects to be created.
class Conversation {
// variables for live query
late QueryBuilder<ParseObject> message_live_query_;
final LiveQuery live_query = LiveQuery(debug: true);
// variables for streams
final message_added_stream_controller = StreamController<ParseObject>();
// constructor that gets messages and starts livequery
Conversation(this.conversation_, this.receiving_user_) {
// init and active live query
message_live_query_ = QueryBuilder<ParseObject>(ParseObject('Message'));
message_live_query_.whereEqualTo('conversation', conversation_);
initLiveQuery();
}
// returns the stream controller to be listened to
Stream get messageAddedStream => message_added_stream_controller.stream;
// listens to live query and if a message is detected, the stream notifies the listeners
void initLiveQuery() async {
final subscription = await live_query.client.subscribe(message_live_query_);
subscription.on(LiveQueryEvent.create, (message) {
print('*** MESSAGE CREATED FOR CONVERSATION OBJECT $getObjectID');
print('$message');
// notify the stream and add to local list
if(message_added_stream_controller.hasListener) { /// check if there is a listener, if not, don't add the value
message_added_stream_controller.add(message);
}
latest_message_ = message;
messages.add(message);
});
}
}
In a view I start listening to the stream with a stream listener, and when the user hits the back button I call another function to cancel the stream listener.
class _DirectMessageState extends State<DirectMessage> {
// list that holds messages
late List<types.Message> chatapp_message_list;
// variables for listening for new messages
StreamSubscription<dynamic>? messageAddedStreamListener = null;
// starts listening to stream
void listenMessageCreationStream() async {
// calls the classes getter function and starts listening to the stream
messageAddedStreamListener = widget.local_conversation.messageAddedStream.listen((message) {
print("MESSAGE HAS BEEN CREATED");
print(message);
if (message['sending_user_info']['objectId'] != widget.local_user.getUserInfoObjectID) {
setState(() {
chatapp_message_list.insert(0,convertToChatAppMessageObj(
message, chatapp_sending_user, chatapp_receiving_user));
});
}
});
}
@override
void initState() {
super.initState();
listenMessageCreationStream();
}
// cancels the listener and sets it to null
Future<void> disposeListeners() async {
await messageAddedStreamListener?.cancel();
messageAddedStreamListener = null;
}
@override
Widget build(BuildContext context) {
return Scaffold(
backgroundColor: BODY_BACKGROUND_COLOR,
appBar: AppBar(
backgroundColor: HEADER_FOOTER_BACKGROUND_COLOR,
title: const Text(
"Direct Message",
style: TextStyle(
color: TEXT_COLOR,
fontSize: SECONDARY_PAGE_HEADER_SIZE,
),
),
centerTitle: true,
leading: IconButton(
icon: const Icon(Icons.arrow_back),
onPressed: () async {
await disposeListeners(); // WAIT TO CANCEL SUBSCRIPTION HERE
globals.GlobalVars().navigatorKey.currentState?.pop(newlyCreatedListing);
}),
),
body: ...,
);
}
}
However when I navigate to the prior page, and re-navigate back to the DirectMessage page I get the following error that the stream is already being listened to
E/flutter ( 9347): [ERROR:flutter/runtime/dart_vm_initializer.cc(41)] Unhandled Exception: Bad state: Stream has already been listened to.
E/flutter ( 9347): #0 _StreamController._subscribe (dart:async/stream_controller.dart:676:7)
E/flutter ( 9347): #1 _ControllerStream._createSubscription (dart:async/stream_controller.dart:827:19)
E/flutter ( 9347): #2 _StreamImpl.listen (dart:async/stream_impl.dart:471:9)
E/flutter ( 9347): #3 _DirectMessageState.listenMessageCreationStream (package:supply_my_degree/social/direct_message.dart:88:79)
E/flutter ( 9347): #4 _DirectMessageState.initState (package:supply_my_degree/social/direct_message.dart:107:5)
E/flutter ( 9347): #5 StatefulElement._firstBuild (package:flutter/src/widgets/framework.dart:5015:57)
E/flutter ( 9347): #6 ComponentElement.mount (package:flutter/src/widgets/framework.dart:4853:5)
E/flutter ( 9347): #7 Element.inflateWidget (package:flutter/src/widgets/framework.dart:3863:16)
E/flutter ( 9347): #8 Element.updateChild (package:flutter/src/widgets/framework.dart:3592:18)
E/flutter ( 9347): #9 SingleChildRenderObjectElement.mount (package:flutter/src/widgets/framework.dart:6300:14)
E/flutter ( 9347): #10 Element.inflateWidget (package:flutter/src/widgets/framework.dart:3863:16)
E/flutter ( 9347): #11 Element.updateChild (package:flutter/src/widgets/framework.dart:3592:18)
E/flutter ( 9347): #12 ComponentElement.performRebuild (package:flutter/src/widgets/framework.dart:4904:16)
E/flutter ( 9347): #13 Element.rebuild (package:flutter/src/widgets/framework.dart:4604:5)
E/flutter ( 9347): #14 ComponentElement._firstBuild (package:flutter/src/widgets/framework.dart:4859:5)
E/flutter ( 9347): #15 ComponentElement.mount (package:flutter/src/widgets/framework.dart:4853:5)
I'm not understanding how the stream isn't being cancelled by calling the disposeListeners() function before I exit the screen.