Context: I'm trying to simulate the long-term repair of damaged bridges by checking the bridges' states (damaged or undamaged) at a number of points during a period of time. To do so, I've written a Python function called repair_over_horizon that takes (among other things) a list of damaged bridges and outputs a list of lists, each of which contains the bridges still damaged at a particular timestep. For example, if the input list were:
[100, 423, 667, 904, 221, 56, 495, 70, 109, 65]
and I used repair_over_horizon with 3 timesteps, I would expect the output to be something like:
[[100, 423, 667, 904, 221, 56, 495, 70, 109, 65], [100, 423, 904, 56, 70, 109, 65], [423, 904]].
The function repair_over_horizon actually takes multiple input lists, or "maps", at a time and computes their repairs in parallel. The parallelization is implemented for each timestep, as shown in the code below. I've also included the functions called within repair_over_horizon -- namely, compute_repairs_at_timestep and repair -- for reference.
Problem: While the function works as expected in terms of output, at each iteration (i.e. timestep), the repair takes significantly longer. This increase in time becomes apparent when the number of maps per timestep increases -- for example, in one situation where I have 50 maps per timestep and 10 timesteps, the first iteration takes ~6s, and the last iteration takes ~21s. When I have 500 maps per timestep and 5 timesteps, the first iteration takes ~36s and the fifth iteration takes ~110s.
Troubleshooting: I've profiled the code (see results for 500 maps per timestep and 5 timesteps below) and checked memory usage, but it's not apparent to me from the results where I'm going wrong in the implementation.
Question: What could be causing this massive slowdown between iterations and as the number of maps increases? Any ideas on how to fix it would be much appreciated.
UPDATE: To test whether there was a memory issue, I inserted gc.collect() at various points toward the ends of functions. However, I still found that the repair takes longer with each iteration, as stated in the Problem section above.
Partial profiling results:
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
2500 1.102 0.000 289.855 0.116 /Library/Python/2.7/site-packages/pp.py:380(submit)
2505 285.901 0.114 285.901 0.114 {cPickle.dumps}
2500 0.020 0.000 127.742 0.051 /Library/Python/2.7/site-packages/pp.py:96(__call__)
2500 0.040 0.000 127.719 0.051 /Library/Python/2.7/site-packages/pp.py:116(__unpickle)
2500 127.675 0.051 127.675 0.051 {cPickle.loads}
5 0.001 0.000 2.282 0.456 /Library/Python/2.7/site-packages/pp.py:280(__init__)
5 0.001 0.000 2.279 0.456 /Library/Python/2.7/site-packages/pp.py:491(set_ncpus)
20 0.001 0.000 2.278 0.114 /Library/Python/2.7/site-packages/pp.py:134(__init__)
20 0.006 0.000 2.276 0.114 /Library/Python/2.7/site-packages/pp.py:140(start)
20 0.001 0.000 1.938 0.097 /Library/Python/2.7/site-packages/pptransport.py:133(receive)
40 1.935 0.048 1.935 0.048 {method 'read' of 'file' objects}
2500 0.174 0.000 1.666 0.001 /Library/Python/2.7/site-packages/pp.py:656(__scheduler)
16084 1.364 0.000 1.364 0.000 {method 'acquire' of 'thread.lock' objects}
1261 0.034 0.000 0.910 0.001 /Library/Python/2.7/site-packages/ppcommon.py:38(start_thread)
1261 0.063 0.000 0.580 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:726(start)
1261 0.057 0.000 0.370 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:603(wait)
20 0.007 0.000 0.330 0.017 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py:650(__init__)
20 0.014 0.001 0.316 0.016 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py:1195(_execute_child)
1261 0.058 0.000 0.288 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:656(__init__)
2500 0.107 0.000 0.244 0.000 /Library/Python/2.7/site-packages/pp.py:73(__init__)
60344 0.232 0.000 0.232 0.000 {isinstance}
889 0.045 0.000 0.224 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:309(wait)
20 0.201 0.010 0.201 0.010 {posix.fork}
2522 0.032 0.000 0.151 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:242(Condition)
1261 0.145 0.000 0.145 0.000 {thread.start_new_thread}
1261 0.011 0.000 0.142 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:542(Event)
1261 0.025 0.000 0.131 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:561(__init__)
2522 0.113 0.000 0.119 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:260(__init__)
repair_over_horizon
def repair_over_horizon(n_gm_maps, n_damage_maps,
bridge_array_internal_assembled, bridge_array_new_assembled, horizon =
365, n_steps = 10, trajectories = 10, plot = False, quiet = False):
from itertools import product
t_per_timestep = []
sim_times = get_times(horizon,n_steps)
thresholds = []
for i in range(0,n_steps):
thresholds.append(get_repair_threshold(sim_times[i]))
ppservers = () # starting parallelization
for k in range(0, n_steps):
start = time.time()
jobs = []
job_server = pp.Server(ppservers=ppservers)
print "Starting pp for recovery modelling at k=", k, "with", job_server.get_ncpus(), "workers"
for m, n, l in product(xrange(n_gm_maps), xrange(n_damage_maps), xrange(trajectories)):
jobs.append(job_server.submit(compute_repairs_at_timestep, (m, n, k, l, bridge_array_internal_assembled,
bridge_array_new_assembled, thresholds[k], n_steps), depfuncs = (repair,), modules=('numpy', 'random', 'copy',), group = k))
# get the results of timestep k, since the next timestep depends on those
for job in jobs:
temp_dict = job()
if temp_dict is not None:
if k < n_steps-1:
temp_inds = temp_dict['indices']
mstar = temp_inds[0]
nstar = temp_inds[1]
kstar = temp_inds[2] + 1
lstar = temp_inds[3]
bridge_array_internal_assembled[mstar][nstar][kstar][lstar][:] = temp_dict['internal'][mstar][nstar][kstar][lstar]
bridge_array_new_assembled[mstar][nstar][kstar][lstar][:] = temp_dict['new'][mstar][nstar][kstar][lstar]
else:
pass
t_per_timestep.append(time.time()-start)
print "Done with timestep k = ", k, " in ", time.time() - start, " seconds"
# close server
job_server.destroy()
if quiet == False:
#print bridge_array_internal_assembled_rec
print 'Great. You have modeled stochastic recovery of bridges and created time-dependent damage maps.'
if plot == True:
pass
# returns the two lists of lists as a single dictionary with two keys (internal and new)
return {'internal':bridge_array_internal_assembled, 'new':bridge_array_new_assembled, 'timesteps':t_per_timestep}
compute_repairs_at_timestep
def compute_repairs_at_timestep(m, n, k, l,
bridge_array_internal_assembled, bridge_array_new_assembled, threshold,
n_steps, limit = None):
# keep track of ground motion map, associated damage map, timestep,
and recovery trajectory
inds = [m,n,k,l]
# repair bridges stochastically at a single timestep
temp_dict = repair(bridge_array_internal_assembled[m][n][k][l],
bridge_array_new_assembled[m][n][k][l], threshold, limit)
# propagate repair to next timestep until timesteps are done
if k < n_steps-1:
bridge_array_internal_assembled[m][n][k + 1][l][:] =
temp_dict['internal']
bridge_array_new_assembled[m][n][k + 1][l][:] = temp_dict['new']
return {'indices': inds, 'internal':
bridge_array_internal_assembled, 'new':
bridge_array_new_assembled}
else:
return {'indices': inds, 'internal':
bridge_array_internal_assembled, 'new':
bridge_array_new_assembled}
repair
def repair(bridge_array_internal, bridge_array_new, ext_threshold, limit=None):
import numpy as np
d = len(bridge_array_internal)
if d > 0:
temp_internal = copy.deepcopy(bridge_array_internal)
temp_new = copy.deepcopy(bridge_array_new)
# repair bridges stochastically per Shinozuka et al 2003 model
temp = np.random.rand(1, d)
temp = temp[0]
temp = np.ndarray.tolist(temp)
rep_inds_internal = [] # track the internal ID numbers of bridges that are repaired
rep_inds_new = [] # track the new ID numbers of bridges that are repaired
if limit is None:
for i in range(0, d):
if temp[i] < ext_threshold:
rep_inds_internal.append(temp_internal[i])
rep_inds_new.append(temp_new[i])
if len(rep_inds_internal) > 0: # if bridges have been repaired in the previous loop, we need to remove them from the list of damaged bridges
for i in range(0, len(rep_inds_internal)):
try:
temp_internal.remove(rep_inds_internal[i])
temp_new.remove(rep_inds_new[i])
except ValueError:
pass
else: # if limit is a number
rep_count = 0
for i in range(0, d):
if (temp[i] < ext_threshold) & (rep_count < limit):
rep_inds_internal.append(temp_internal[i])
rep_inds_new.append(temp_new[i])
rep_count += 1
elif rep_count == limit: # if we've repaired as many bridges as we are allowed to, stop repairing bridges
break
else:
pass
if len(rep_inds_internal) > 0: # if bridges have been repaired in the previous loop, we need to remove them from the list of damaged bridges
for i in range(0, len(rep_inds_internal)):
try:
temp_internal.remove(rep_inds_internal[i])
temp_new.remove(rep_inds_new[i])
except ValueError:
pass
return {'internal': temp_internal,'new': temp_new} # these damaged bridge arrays have been updated to not include repaired bridges
elif d == 0:
# all bridges have already been repaired
return {'internal': [], 'new': []}