Why is threading.Condition.notfiy_all not trigger that a waiting thread is continued?

62 Views Asked by At

With the following code, I want to show how to synchronize with a thread.

  • I want to have a separate thread that updates an image.
  • From these images, I want to have an asynchronous generator.
  • The images should only be updated when the asynchronous generator used it.
  • The async generator should be waiting for a new image to be created.

Below, you find the code for that. It gets stuck waiting for the first image.

Why is the notify_all not releasing the image_created.wait?

# Output
create new image
waiting for new image
start waiter
notify_all
wait for someone to take it
waiting for image_created
import asyncio
import random
import threading
import time


class ImageUpdater:
    def __init__(self):
        self.image = None
        self.image_used = threading.Event()
        self.image_created = threading.Condition()

    def update_image(self):
        while True:
            self.image_used.clear()
            with self.image_created:
                print("create new image")
                time.sleep(0.6)
                self.image = str(random.random())
                print("notify_all")
                self.image_created.notify_all()
            print("wait for someone to take it")
            self.image_used.wait()
            print("someone took it")

    async def image_generator(self):
        def waiter():
            print("start waiter")
            time.sleep(0.1)
            with self.image_created:
                print("waiting for image_created")
                self.image_created.wait()
            print("waiter finished")
            self.image_used.set()

        while True:
            print("waiting for new image")
            await asyncio.to_thread(waiter)

            yield self.image


async def main():
    updater = ImageUpdater()

    update_thread = threading.Thread(target=updater.update_image)
    update_thread.start()

    async for image in updater.image_generator():
        print(f"Received new image: {image}")


if __name__ == "__main__":
    loop = asyncio.run(main())

5

There are 5 best solutions below

1
Paul Cornelius On BEST ANSWER

In your function waiter, the __enter__ method of the context manager (with self.image_created) acquires self.image_created's lock. When you call wait the lock is released, and the thread will wait there until the other thread calls self.image_created.notify. That never happens because the other thread is blocked, waiting for self.image_used to be set. The two threads are deadlocked.

The context manager for self.image_created acquires the lock on entry to the with: block and releases it on exit. So this code:

with self.image_created:
    self.image_created.wait()

is basically equivalent to:

self.image_created.acquire()  
# when you reach here, this thread owns the lock
self.image_created.wait()  # releases the lock
                           # waits for another thread to call notify
# You will never get this far since the other thread is blocked
self.image_created.release()

Instead of calling wait inside with self.image_created, you could just process the image and call self.image_process.set(). That eliminates the call to self.image_process.wait, which is causing the deadlock. But in that case you aren't using the functionality of the Condition object at all - you're just using it like another threading.Event.

This simpler program, which uses two Event objects to insure that the threads take turns, works:

import asyncio
import random
import threading
import time


class ImageUpdater:
    def __init__(self):
        self.image = None
        self.image_used = threading.Event()
        self.image_created = threading.Event()

    def update_image(self):
        while True:
            self.image_used.clear()
            print("create new image")
            time.sleep(0.6)
            self.image = str(random.random())
            print("new image created")
            self.image_created.set()
            print("wait for someone to take it")
            self.image_used.wait()
            print("someone took it")

    async def image_generator(self):
        def waiter():
            print("start waiter")
            time.sleep(0.1)
            print("waiting for image_created")
            self.image_created.wait()
            self.image_created.clear()
            time.sleep(0.3)
            self.image_used.set()    

        while True:
            print("waiting for new image")
            await asyncio.to_thread(waiter)

            yield self.image

async def main():
    updater = ImageUpdater()

    update_thread = threading.Thread(target=updater.update_image)
    update_thread.start()

    async for image in updater.image_generator():
        print(f"Received new image: {image}")


if __name__ == "__main__":
    loop = asyncio.run(main())

You said that you eventually wanted to have more than one thread processing the images, so perhaps a producer-consumer architecture would be cleaner in the long run. See quamrana's answer.

2
Miguel Angel Villarreal torres On

The problem in your code lies in the fact that image_created.wait() is not being woken up properly in the main thread. The reason behind this is that the image creation and notification are being done inside a block with self.image_created, and this block is closed before notify_all() is called. As a result, when the asynchronous generator tries to wait on image_created.wait(), no notification has occurred.

def update_image(self):
    while True:
        self.image_used.clear()
        with self.image_created:
            print("create new image")
            time.sleep(0.6)
            self.image = str(random.random())
        print("notify_all")
        self.image_created.notify_all()
        print("wait for someone to take it")
        self.image_used.wait()
        print("someone took it")
6
Miguel Angel Villarreal torres On

Can you try this code and tell me if you still get the same thing? It seems you get different errors than mine.

class ImageUpdater:
    def __init__(self):
        self.image = None
        self.image_used = threading.Event()
        self.image_created = threading.Event()
        self.lock = threading.Lock()

    def update_image(self):
        while True:
            time.sleep(0.6)
            with self.lock:
                self.image = str(random.random())
                self.image_created.set()
                self.image_used.wait()
                self.image_created.clear()

    async def image_generator(self):
        while True:
            with self.lock:
                self.image_created.clear()
                self.image_used.clear()
                print("waiting for new image")
                await asyncio.sleep(0)
                yield self.image
                self.image_used.set()

async def main():
    updater = ImageUpdater()

    update_thread = threading.Thread(target=updater.update_image)
    update_thread.start()


    async for image in updater.image_generator():
        print(f"Received new image: {image}")

if __name__ == "__main__":
    asyncio.run(main())
0
quamrana On

Here is an alternative which uses a queue to synchronise the producer and consumer(s):

import asyncio
import random
import threading
import queue
import time

class ImageUpdater:
    def __init__(self):
        self.q = queue.Queue(1)

    def update_image(self):
        while True:
            time.sleep(0.6)
            print('Creating new image')
            image = str(random.random())
            self.q.put(image)

    async def image_generator(self):
        while True:
            print("waiting for new image")
            image = self.q.get()
            yield image

async def main():
    updater = ImageUpdater()

    update_thread = threading.Thread(target=updater.update_image)
    update_thread.start()


    async for image in updater.image_generator():
        print(f"Received new image: {image}")

if __name__ == "__main__":
    asyncio.run(main())

Sample output:

waiting for new image
Creating new image
Received new image: 0.004152636324671333
waiting for new image
Creating new image
Received new image: 0.23799099029083715
waiting for new image
Creating new image
Received new image: 0.17644888185774932
waiting for new image

Update: Updated the code to create the queue with max capacity of 1 so that the update_image() method thread blocks until there is enough room in the queue for the next image.

0
Miguel Angel Villarreal torres On

time.sleepen in the update_imagemethod can potentially block the event loop. It is better to use await asyncio.sleepen instead.

The image_generator method must be an asynchronous generator function for async for to work correctly.

You can use asyncio instead. Event to signal when a new image is available.