How to run multiprocessing python with distributed tensorflow on slurm

521 Views Asked by At

I want to run a multiprocessing distributed tensorflow program on slurm. The script should use python multiprocessing library to open up different sessions on different nodes in parallel. This approach works when testing using slurm interactive sessions, but it doesn't seem to work when using sbatch jobs.

The script works correctly on slurm if multiprocessing is not used

My bash script:

#!/bin/bash
#SBATCH -N 2
#SBATCH -n 2 
#SBATCH -c 8
#SBATCH --mem-per-cpu 8000
#SBATCH --exclusive
#SBATCH -t 01:00:00
NPROCS=$(( $SLURM_NNODES * $SLURM_CPUS_PER_TASK ))
export OMP_NUM_THREADS=$NPROCS
export MKL_NUM_THREADS=8
module load TensorFlow/1.8.0-foss-2018a-Python-3.6.4-CUDA-9.2.88
# Execute jobs in parallel
srun -N 1 -n 1  python slurmpythonparallel.py &
srun -N 1 -n 1  python slurmpythonparallel.py 
wait 

My python script is:


def run(worker_hosts,task_index,results, train_x, train_y,val_x,val_y,fold):
    with tf.device("/job:worker/task:%d" % task_index):
        with tf.container("Process%d"%task_index):
            global x
            global y
            global prediction
            global cost
            global optimizer
            global correct
            global accuracy

            x = tf.placeholder('float',[None, 784],name="x")
            y = tf.placeholder('float',name="y_pred")


            prediction=neural_network_model(x)
            cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=prediction,labels=y))
            optimizer = tf.train.AdamOptimizer().minimize(cost)
            correct = tf.equal(tf.argmax(prediction,1),tf.argmax(y,1))
            accuracy = tf.reduce_mean(tf.cast(correct,'float'))
        with tf.Session("grpc://"+worker_hosts[task_index]) as sess:
            run_train(sess, train_x, train_y,task_index,fold)
            results[fold]=sess.run(accuracy, feed_dict={x: val_x, y: val_y})




sys.path.append(os.getcwd())
folds=2
global worker_hosts
start_time = time.time()
cluster, job_name, task_index,worker_hosts = slurm()
cluster_spec = tf.train.ClusterSpec(cluster)
server = tf.train.Server(cluster_spec,job_name=job_name,task_index=task_index)
for i in range(len(worker_hosts)):
    print("Worker Host:",worker_hosts[i])
if task_index != 0:
    server.join()
else:
    multiprocessing.set_start_method('forkserver', force=True)
    results = multiprocessing.Array('d',folds)
    p=[]
    num_of_workers=len(worker_hosts)
    index=0 
    i=0
    for i in range(2):
        p.append(Process(target=run, args=(worker_hosts,index,results,train_x_all,train_y_all,train_x_all,train_y_all,i)))
        i=i+1
        if i%(num_of_workers)==0: # i=2
            for j in range(num_of_workers):
                p[j].daemon = False
                p[j].start()
            for j in range(num_of_workers):
                p[j].join()
                #p[j].exit()




When I use multiprocessing, I get the error:

Address already in use

and

Could not start gRPC server

So the problem from what I guess is when python forks a new process it tries to use the same address already assigned by the parent process or by the other run of the script. I don't understand why this problem happens in slurm, but not when running it on my computer or in the interactive sessions

0

There are 0 best solutions below