Problem with the efficiency of foreach and collect pyspark

40 Views Asked by At

I have the next code:

for i in range(I):
    local_accum = sc.accumulator([], ListParam()) 

    rdd_fitness = rdd_master.map(lambda x:fitnessEval(x,n))
  
    rdd_fitness.foreach(lambda x: modifyAccum(x,n))
    
    for j in range(m):
        blf = local_accum.value[0][j][1]
        if blf < best_global_fitness:
            best_global_fitness = blf
            mejor_pos_global = local_accum.value[0][j][0]
    
    resultado = rdd_fitness.map(lambda x:posEval(x,mejor_pos_global,n))

    resultado = resultado.collect()
        
    rdd_master= sc.parallelize(resultado)

I have a problem with the efficiency of for each and collect operations, I have measured the execution time of every part in the program and I have found out the times I get in the lines:

rdd_fitness.foreach(lambda x: modifyAccum(x,n))

resultado = resultado.collect()

are ridiculously high.

I am wondering how can I modify this to improve the efficiency.

I tried to modify local_accum in the map method but it does not modify the accumulator

0

There are 0 best solutions below