I want to run a multiprocessing distributed tensorflow program on slurm. The script should use python multiprocessing library to open up different sessions on different nodes in parallel. This approach works when testing using slurm interactive sessions, but it doesn't seem to work when using sbatch jobs.
The script works correctly on slurm if multiprocessing is not used
My bash script:
#!/bin/bash
#SBATCH -N 2
#SBATCH -n 2
#SBATCH -c 8
#SBATCH --mem-per-cpu 8000
#SBATCH --exclusive
#SBATCH -t 01:00:00
NPROCS=$(( $SLURM_NNODES * $SLURM_CPUS_PER_TASK ))
export OMP_NUM_THREADS=$NPROCS
export MKL_NUM_THREADS=8
module load TensorFlow/1.8.0-foss-2018a-Python-3.6.4-CUDA-9.2.88
# Execute jobs in parallel
srun -N 1 -n 1 python slurmpythonparallel.py &
srun -N 1 -n 1 python slurmpythonparallel.py
wait
My python script is:
def run(worker_hosts,task_index,results, train_x, train_y,val_x,val_y,fold):
with tf.device("/job:worker/task:%d" % task_index):
with tf.container("Process%d"%task_index):
global x
global y
global prediction
global cost
global optimizer
global correct
global accuracy
x = tf.placeholder('float',[None, 784],name="x")
y = tf.placeholder('float',name="y_pred")
prediction=neural_network_model(x)
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=prediction,labels=y))
optimizer = tf.train.AdamOptimizer().minimize(cost)
correct = tf.equal(tf.argmax(prediction,1),tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct,'float'))
with tf.Session("grpc://"+worker_hosts[task_index]) as sess:
run_train(sess, train_x, train_y,task_index,fold)
results[fold]=sess.run(accuracy, feed_dict={x: val_x, y: val_y})
sys.path.append(os.getcwd())
folds=2
global worker_hosts
start_time = time.time()
cluster, job_name, task_index,worker_hosts = slurm()
cluster_spec = tf.train.ClusterSpec(cluster)
server = tf.train.Server(cluster_spec,job_name=job_name,task_index=task_index)
for i in range(len(worker_hosts)):
print("Worker Host:",worker_hosts[i])
if task_index != 0:
server.join()
else:
multiprocessing.set_start_method('forkserver', force=True)
results = multiprocessing.Array('d',folds)
p=[]
num_of_workers=len(worker_hosts)
index=0
i=0
for i in range(2):
p.append(Process(target=run, args=(worker_hosts,index,results,train_x_all,train_y_all,train_x_all,train_y_all,i)))
i=i+1
if i%(num_of_workers)==0: # i=2
for j in range(num_of_workers):
p[j].daemon = False
p[j].start()
for j in range(num_of_workers):
p[j].join()
#p[j].exit()
When I use multiprocessing, I get the error:
Address already in use
and
Could not start gRPC server
So the problem from what I guess is when python forks a new process it tries to use the same address already assigned by the parent process or by the other run of the script. I don't understand why this problem happens in slurm, but not when running it on my computer or in the interactive sessions