I've taken over some code a former colleague wrote, which was frequently getting stuck when one or more parallelised functions through a NameError exception, which wasn't caught. (The parallelisation is handled by multiprocessing.Pool.) Because the exception is due to certain arguments not being defined, the only way I've been able to catch this exception is to put the pool.apply_async commands into try...except blocks, like so:
from multiprocessing import Pool
# Define worker functions
def workerfn1(args1):
#commands
def workerfn2(args2):
#more commands
def workerfn3(args3):
#even more commands
# Execute worker functions in parallel
with Pool(processes=os.cpu_count()-1) as pool:
try:
r1 = pool.apply_async(workerfn1, args1)
except NameError as e:
print("Worker function r1 failed")
print(e)
try:
r2 = pool.apply_async(workerfn2, args2)
except NameError as e:
print("Worker function r2 failed")
print(e)
try:
r3 = pool.apply_async(workerfn3, args3)
except NameError as e:
print("Worker function r3 failed")
print(e)
Obviously, the try...except blocks are not parallelised, but the interpreter has to read the apply_async commands sequentially anyway while it assigns them to different CPUs...so will these three functions still be executed in parallel (if they don't throw the NameError exception), or does the use of try...except prevent this from happening?
First, you need to be more careful in posting code that is not full of spelling and other errors.
Method
multiprocessing.pool.Pool.apply_async(notapply_sync) returns amultiprocessing.pool.AsyncResultinstance. It is only when you call methodgeton this instance that you get either the return value from your worker function or any exception that occurred in your worker function is now thrown. So:Prints:
Note
Without calling
geton theAsyncResultreturned fromapply_asyncyou are not waiting for the completion of the submitted task and there is no point in surrounding the call with try/catch. When you then fall through thewithblock an implicit call toterminatewill be done on the pool instance that will immediately kill all running pool processes and any running tasks will be halted and any tasks waiting to run will be purged. You can callpool.close()followed bypool.join()within the block and that sequence will wait for all submitted tasks to complete. But without explicitly callinggeton theAsyncResultinstances you will not be able to get return values or exceptions.