`ThreadPoolExecutor.shutdown` not working, code gets stuck indefinitely

81 Views Asked by At

I need to be able to either pause all threads if a certain exception occurs during the execution of one of the threads or otherwise terminate all threads immediately.

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep


class ThreadTerminationRequired(Exception):
    pass


def work(i):
    if i in range(50, 100):
        raise ThreadTerminationRequired
    print(f'sleeping for {i}')
    sleep(i)


if __name__ == '__main__':
    with ThreadPoolExecutor(64) as executor:
        futures = {executor.submit(work, i): i for i in range(1000)}

        try:
            for future in as_completed(futures):
                future.result()
        except ThreadTerminationRequired:
            executor.shutdown(wait=False, cancel_futures=True)

Apparently wait=False and cancel_futures=True are useless and not specifying any of them or both leads to the same outcome. I also tried:

for future in futures:
    future.cancel()

Either way, the code gets stuck while I need it to terminate immediately. Also I don't want to use sys.exit because if the ThreadTerminationRequired occurs, I will need to make changes then restart all threads. Even then, using exit(1) or sys.exit(1), gives the same outcome.

1

There are 1 best solutions below

0
fanta fles On

The behaviour of shutdown is described in the docs:

Signal the executor that it should free any resources that it is using when the currently pending futures are done executing.

All the futures that are already pending will keep running.

Also:

Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.

and:

If both cancel_futures and wait are True, all futures that the executor has started running will be completed prior to this method returning. The remaining futures are cancelled.

In short, this will only stop futures that haven't already started.

To stop the running threads you could use something like Events and shutdown the thread when a certain shutdown event is set. This will require you to regularly check the state of the event.

There are multiple other ways to kill a thread but it all depends on your specific needs.

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from time import sleep
import time


class ThreadTerminationRequired(Exception):
    pass


def work(i, shutdownEvent):
    start = time.time()
    if i in range(50, 100):
        raise ThreadTerminationRequired
    print(f'sleeping for {i}')

    # this will check if either the shutdown event is set or if the time has passed.
    # you can sleep as long as you like here.
    while not (shutdownEvent.isSet() or (time.time() > (start + i))):
       sleep(0) # sleep(0) will give other threads a chance to run, do whatever you need to do here.


if __name__ == '__main__':
    with ThreadPoolExecutor(64) as executor:
        shutdown = threading.Event()
        futures = {executor.submit(work, i, shutdown): i for i in range(1000)}

        try:
            for future in as_completed(futures):
                future.result()
        except ThreadTerminationRequired:
            shutdown.set()
            executor.shutdown(wait=False, cancel_futures=True)