When implementing a thread that is intended to periodically read from a stream, I cannot manage to make the thread stop correctly. This is only the case when the callback function that I use is implemented as a method of the agent (Worker). See this example (python v3.10.11):
import threading
from time import sleep
import weakref
class Consumer(threading.Thread):
"""This class periodically reads from a stream."""
def __init__(self, stream_key, callback):
super().__init__()
self._stream_key: str = stream_key
self._handlers = {callback}
self._running = True
def run(self):
"""Poll the event stream and call each handler with each event item returned."""
counter = 0
while self._running:
for number, handler in enumerate(self._handlers):
handler(number, counter)
print("reading from stream: ", self._stream_key)
counter += 1
sleep(2)
def stop(self):
"""Stop polling the event stream."""
self._running = False
self.join()
def start(self) -> None:
self._running = True
return super().start()
def add_handler(self, callback):
self._handlers.add(callback)
def remove_handler(self, callback):
self._handlers.remove(callback)
class EventHandler:
def __init__(self):
self.consumers = weakref.WeakValueDictionary()
def subscribe(self, stream_key: str, callback):
if stream_key in self.consumers:
self.consumers[stream_key].add_handler(callback)
else:
consumer = Consumer(stream_key=stream_key, callback=callback)
self.consumers[stream_key] = consumer
self.consumers[stream_key].start()
def __del__(self):
for consumer in self.consumers.values():
consumer.stop()
class Worker:
def __init__(self) -> None:
self._eventhandler = EventHandler()
self.registered = False
self._subscriptions = {("test-stream-key", self.handlerfunc)}
def register(self):
self._start_listeners()
self.registered = True
def _start_listeners(self):
for subscription in self._subscriptions:
self._eventhandler.subscribe(*subscription)
def handlerfunc(self, number, counter):
print(f"handler {number} doing things, counting: {counter}")
worker = Worker()
worker.register()
del worker
it keeps producing output like
reading from stream: test-stream-key
handler 0 doing things, counting: 1
reading from stream: test-stream-key
handler 0 doing things, counting: 2
...
After the del command I expect the garbage collection to do its magic and thereby stop the agent (incl. the EventHandler that has also a __del__ method).
Interestingly, this works fine in case I do not define the handlerfunc as a method of Worker but in the global scope:
import threading
from time import sleep
import weakref
class Consumer(threading.Thread):
"""This class periodically reads from a stream."""
def __init__(self, stream_key, callback):
super().__init__()
self._stream_key: str = stream_key
self._handlers = {callback}
self._running = True
def run(self):
"""Poll the event stream and call each handler with each event item returned."""
counter = 0
while self._running:
for number, handler in enumerate(self._handlers):
handler(number, counter)
print("reading from stream: ", self._stream_key)
counter += 1
sleep(2)
def stop(self):
"""Stop polling the event stream."""
self._running = False
self.join()
def start(self) -> None:
self._running = True
return super().start()
def add_handler(self, callback):
self._handlers.add(callback)
def remove_handler(self, callback):
self._handlers.remove(callback)
class EventHandler:
def __init__(self):
self.consumers = weakref.WeakValueDictionary()
def subscribe(self, stream_key: str, callback):
if stream_key in self.consumers:
self.consumers[stream_key].add_handler(callback)
else:
consumer = Consumer(stream_key=stream_key, callback=callback)
self.consumers[stream_key] = consumer
self.consumers[stream_key].start()
def __del__(self):
for consumer in self.consumers.values():
consumer.stop()
class Worker:
def __init__(self) -> None:
self._eventhandler = EventHandler()
self.registered = False
self._subscriptions = {("test-stream-key", handlerfunc)}
def register(self):
self._start_listeners()
self.registered = True
def _start_listeners(self):
for subscription in self._subscriptions:
self._eventhandler.subscribe(*subscription)
def handlerfunc(number, counter):
print(f"handler {number} doing things, counting: {counter}")
worker = Worker()
worker.register()
del worker
in that case it stops after one message, more or less immediately. this is what I would expect with the class scoped method as well.
What is happening here? And is it correct to use weakref.WeakValueDictionary()? (obviously not) But is it at least the idea of using weakref correct?