garbage collection in python threading

92 Views Asked by At

When implementing a thread that is intended to periodically read from a stream, I cannot manage to make the thread stop correctly. This is only the case when the callback function that I use is implemented as a method of the agent (Worker). See this example (python v3.10.11):

import threading
from time import sleep
import weakref


class Consumer(threading.Thread):
    """This class periodically reads from a stream."""

    def __init__(self, stream_key, callback):
        super().__init__()

        self._stream_key: str = stream_key
        self._handlers = {callback}

        self._running = True

    def run(self):
        """Poll the event stream and call each handler with each event item returned."""
        counter = 0
        while self._running:
            for number, handler in enumerate(self._handlers):
                handler(number, counter)
                print("reading from stream: ", self._stream_key)
                counter += 1
                sleep(2)

    def stop(self):
        """Stop polling the event stream."""
        self._running = False
        self.join()

    def start(self) -> None:
        self._running = True
        return super().start()

    def add_handler(self, callback):
        self._handlers.add(callback)

    def remove_handler(self, callback):
        self._handlers.remove(callback)


class EventHandler:
    def __init__(self):
        self.consumers = weakref.WeakValueDictionary()

    def subscribe(self, stream_key: str, callback):
        if stream_key in self.consumers:
            self.consumers[stream_key].add_handler(callback)
        else:
            consumer = Consumer(stream_key=stream_key, callback=callback)
            self.consumers[stream_key] = consumer
            self.consumers[stream_key].start()

    def __del__(self):
        for consumer in self.consumers.values():
            consumer.stop()


class Worker:
    def __init__(self) -> None:
        self._eventhandler = EventHandler()
        self.registered = False
        self._subscriptions = {("test-stream-key", self.handlerfunc)}

    def register(self):
        self._start_listeners()
        self.registered = True

    def _start_listeners(self):
        for subscription in self._subscriptions:
            self._eventhandler.subscribe(*subscription)

    def handlerfunc(self, number, counter):
        print(f"handler {number} doing things, counting: {counter}")


worker = Worker()


worker.register()


del worker

it keeps producing output like

reading from stream:  test-stream-key
handler 0 doing things, counting: 1
reading from stream:  test-stream-key
handler 0 doing things, counting: 2
...

After the del command I expect the garbage collection to do its magic and thereby stop the agent (incl. the EventHandler that has also a __del__ method).

Interestingly, this works fine in case I do not define the handlerfunc as a method of Worker but in the global scope:

import threading
from time import sleep
import weakref


class Consumer(threading.Thread):
    """This class periodically reads from a stream."""

    def __init__(self, stream_key, callback):
        super().__init__()

        self._stream_key: str = stream_key
        self._handlers = {callback}

        self._running = True

    def run(self):
        """Poll the event stream and call each handler with each event item returned."""
        counter = 0
        while self._running:
            for number, handler in enumerate(self._handlers):
                handler(number, counter)
                print("reading from stream: ", self._stream_key)
                counter += 1
                sleep(2)

    def stop(self):
        """Stop polling the event stream."""
        self._running = False
        self.join()

    def start(self) -> None:
        self._running = True
        return super().start()

    def add_handler(self, callback):
        self._handlers.add(callback)

    def remove_handler(self, callback):
        self._handlers.remove(callback)


class EventHandler:
    def __init__(self):
        self.consumers = weakref.WeakValueDictionary()

    def subscribe(self, stream_key: str, callback):
        if stream_key in self.consumers:
            self.consumers[stream_key].add_handler(callback)
        else:
            consumer = Consumer(stream_key=stream_key, callback=callback)
            self.consumers[stream_key] = consumer
            self.consumers[stream_key].start()

    def __del__(self):
        for consumer in self.consumers.values():
            consumer.stop()


class Worker:
    def __init__(self) -> None:
        self._eventhandler = EventHandler()
        self.registered = False
        self._subscriptions = {("test-stream-key", handlerfunc)}

    def register(self):
        self._start_listeners()
        self.registered = True

    def _start_listeners(self):
        for subscription in self._subscriptions:
            self._eventhandler.subscribe(*subscription)


def handlerfunc(number, counter):
    print(f"handler {number} doing things, counting: {counter}")


worker = Worker()


worker.register()


del worker

in that case it stops after one message, more or less immediately. this is what I would expect with the class scoped method as well.

What is happening here? And is it correct to use weakref.WeakValueDictionary()? (obviously not) But is it at least the idea of using weakref correct?

0

There are 0 best solutions below