tqdm nested progress bars with multiprocessing

121 Views Asked by At

I'm using multiprocessing to do multiple long jobs, and an outer progress bar tracks how many jobs are completed. With an inner progress bar, I want to show the progress of an individual job, and also be able to print out when the inner progress bar completes.

This is what it should look like.

The problem is that when the inner progress bar completes, it disappears, because leave=False. leave=True also does not work because I have to be able to restart the inner progress bar. Therefore my solution has been to simply print out the completed bar manually.

My solution is shown below. Because it uses `sleep(.04)', the .04 needs to be changed depending on the computer, number of workers, job length etc. Also, it doesn't always work, even if you try to adjust the sleep. Therefore, I'm looking for a non-hacky answer which will work on any computer.

from tqdm import tqdm
from time import sleep
import multiprocessing


def do_the_thing(my_args):
    if my_args:
        pbar_inner = tqdm(total=15, position=1, leave=False)
        for i in range(15):
            sleep(.1)
            pbar_inner.update()
    else:
        sleep(1.5)


if __name__ == '__main__':
    postfix = ' [Use this line/progress bar to print some stuff out.]'
    pbar_outer = tqdm(total=60, position=0, leave=True)
    for n in range(3):
        pool = multiprocessing.Pool(2)
        args = [True if i % 8 == 0 else False for i in range(20)]
        for count, m in enumerate(pool.imap_unordered(do_the_thing, args)):
            pbar_outer.update()
            if args[count]:
                sleep(.04)
                my_pbar_inner = tqdm(total=15, position=1, leave=False,
                                     bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}' + postfix)
                my_pbar_inner.update(15)
                my_pbar_inner.set_postfix_str('')
        pool.close()
        pool.join()
2

There are 2 best solutions below

1
ken On BEST ANSWER

I believe I understand what you want to do. However, this is not a feature provided by tqdm.

tqdm is designed to close the bar upon instance deletion through __del __. Therefore, the bar will always be deleted (or left) when exiting the do_the_thing function.

There are several workarounds for this. The easiest method is a bit hacky. Although I said the bar will always be deleted, looking into its implementation reveals that it prevents multiple deletions via the disable attribute. So, by setting the disable attribute to True in advance, you can prevent its deletion.

import multiprocessing
from time import sleep

from tqdm import tqdm


def do_the_thing(args):
    my_args, postfix = args
    if my_args:
        pbar_inner = tqdm(total=15, position=1, leave=False)
        for i in range(15):
            sleep(.1)
            pbar_inner.update()
        pbar_inner.set_postfix_str(postfix)
        pbar_inner.disable = True  # Disable further updates, including clearing of the bar.
    else:
        sleep(1.5)


if __name__ == '__main__':
    postfix = ' [Use this line/progress bar to print some stuff out.]'
    pbar_outer = tqdm(total=60, position=0, leave=True)
    for n in range(3):
        pool = multiprocessing.Pool(2)
        args = [(i % 8 == 0, postfix) for i in range(20)]
        for count, m in enumerate(pool.imap_unordered(do_the_thing, args)):
            pbar_outer.update()
        pool.close()
        pool.join()

    # Setting disable attribute prevents it from being deleted even at the end,
    # so insert a dummy inner bar to overwrite and delete it (if you need).
    tqdm(range(1), position=1, leave=False)

Simple, but this solution depends on the implementation of tqdm, so you may not like it.

Another workaround is more "correct" but a bit more complicated. The problem is that the bar instance is deleted when the do_the_thing function exits, so we manage the bar instance elsewhere. That is, we create a background process to manage the bar separately from the Pool workers and only pass operations to the bar via a queue.

import multiprocessing
from time import sleep

from tqdm import tqdm


def pbar_inner_worker(queue: multiprocessing.Queue, position: int):
    """Worker that manages the inner bar."""
    pbar = tqdm(position=position, leave=False)
    while True:
        op = queue.get()
        if op is None:
            return
        for method, kwargs in op.items():
            getattr(pbar, method)(**kwargs)


def do_the_thing(args):
    my_args, postfix, pbar_inner = args
    if my_args:
        # This will invoke `pbar.reset(total=15)` in the pbar_inner_worker.
        pbar_inner.put({"reset": dict(total=15)})
        for i in range(15):
            sleep(0.1)
            pbar_inner.put({"update": dict(n=1)})
        pbar_inner.put({"set_postfix_str": dict(s=postfix)})
    else:
        sleep(1.5)


if __name__ == "__main__":
    postfix = " [Use this line/progress bar to print some stuff out.]"
    pbar_outer = tqdm(total=60, position=0, leave=True)

    # Create the inner bar.
    pbar_inner_queue = multiprocessing.Manager().Queue()
    pbar_inner_process = multiprocessing.Process(
        target=pbar_inner_worker,
        kwargs=dict(queue=pbar_inner_queue, position=1),
    )
    pbar_inner_process.start()

    for n in range(3):
        pool = multiprocessing.Pool(2)
        args = [(i % 8 == 0, postfix, pbar_inner_queue) for i in range(20)]
        for count, m in enumerate(pool.imap_unordered(do_the_thing, args)):
            pbar_outer.update()
        pool.close()
        pool.join()

    # Close the inner bar.
    pbar_inner_queue.put(None)
    pbar_inner_process.join()
    pbar_inner_process.close()

Please note that multiprocessing.Manager().Queue() can be slow, so if you update the inner bar very frequently, it may cause a performance degradation.

Also note that neither solution is intended for multiple processes to update the inner bar simultaneously.

2
Taw On

This is my solution now. It doesn't always work, but adding the sleep definitely helps. For whatever reason, the pbar still disappears most of the time in this test script. It tends to work much better in the actual code than this script.

from tqdm import tqdm
from time import sleep
import multiprocessing


def do_the_thing(my_args):
    if my_args:
        pbar_inner = tqdm(total=15, position=1, leave=False)
        for i in range(15):
            sleep(.1)
            pbar_inner.update()
    else:
        sleep(1.5)


if __name__ == '__main__':
    postfix = ' [Use this line/progress bar to print some stuff out.]'
    pbar_outer = tqdm(total=60, position=0, leave=True)
    for n in range(3):
        pool = multiprocessing.Pool(2)
        args = [True if i % 8 == 0 else False for i in range(20)]
        for count, m in enumerate(pool.imap_unordered(do_the_thing, args)):
            pbar_outer.update()
            if args[count]:
                sleep(.04)  # magic 
                my_pbar_inner = tqdm(total=15, position=1, leave=False,
                                     bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}' + postfix)
                my_pbar_inner.update(15)
                my_pbar_inner.set_postfix_str('')
        pool.close()
        pool.join()

enter image description here The point of this is that if the outer progress bar takes some hours/days, it's nice to have an inner progress bar that is on the timescale of minutes (the inner bar is repeatedly restarted throughout the process).

By default, the progress bar will disappear once completed, so the point of this "solution" is to just manually print it out again, in a completed state. This can only be seen in the (potentially brief) time between the previous bar finishing and the next one starting.