Error message in a loop for on pypsark using regexp_replace

321 Views Asked by At

i'm making a loop in pyspark, and i have this message:
"Column is not iterable" 

This is the code:

(regexp_replace(data_join_result[varibale_choisie],
(random.choice(data_join_result.collect()[j][varibale_choisie])),
data_join_result.collect()[j][lettre_choisie] )))) 

in the error message, the problem comes at this moment:

data_join_result.collect()[j][lettre_choisie]

My input:
VARIABLEA  | VARIABLEB
BLUE        | WHITE
PINK         | DARK

My expected output:
VARIABLEA  | VARIABLEB
BLTE        | WHITE
PINK         | DARM

If someone knows how to fix it! Thx

2

There are 2 best solutions below

6
Raghu On

Collecting the data in driver is not advisable, also iterating through dataframe. Spark offers multiple api that allows us to perform our tasks in a parallelized manner. In your case, you can try these approaches:

For a single character replacement, try this (performance intensive) option

import pyspark.sql.functions as F
import string
import random
test1 = spark.createDataFrame([("Mike","apple", "oranges", "red wine"),("Kate","Whitewine", "green beans", "waterrr"), ("Leah", "red wine","juice","rice")],schema=["col1","col2","col3","col4"])
cols = test1.columns
alp=(list(string.ascii_lowercase))  

@F.udf(test1.schema)
    def randomize(row):
        row_d = row.asDict()
        pos_sel = random.randint(0,len(cols)-1)    
        col_select = cols[pos_sel]
        row_d[col_select]=row_d[col_select].replace(alp[random.randint(1,24)],alp[random.randint(1,24)],1)
        return(row_d)
    
    test2 = test1.withColumn("struct_coln",randomize(F.struct(cols))).select('struct_coln.*')

results:

+----+---------+-----------+--------+
|col1|col2     |col3       |col4    |
+----+---------+-----------+--------+
|Mike|apple    |orangos    |red wine|
|Kate|Whitewine|green beans|waterrr |
|Leah|red wine |juice      |rice    |
+----+---------+-----------+--------+

you can see that oranges is corrupted as orangos. The chances of corruption will increase if you limit the alphabets to replace to just vowels.

If you don't need a one character replacement, try this:

test1 = spark.createDataFrame([("Mike","apple", "oranges", "red wine"),("Kate","Whitewine", "green beans", "waterrr"), ("Leah", "red wine","juice","rice")],schema=["col1","col2","col3","col4"])
cols = test1.columns
alp=(list(string.ascii_lowercase))


#%%"
for i in range(30):    
    pos_sel = random.randint(0,len(cols)-1)    
    col_select = cols[pos_sel]
    tst_rep = test1.withColumn(col_select,F.translate(F.col(col_select),alp[random.randint(1,24)],alp[random.randint(1,24)]))
    test1 = tst_rep

Here you can have a little of control by controlling the loop iterations

results:

test1.show()
+----+---------+-----------+--------+
|col1|     col2|       col3|    col4|
+----+---------+-----------+--------+
|Mike|    applu|    oranges|rjd winj|
|Kate|Whifuwinu|green beans| watjrrr|
|Leah| rud winu|      juihe|    ricj|
+----+---------+-----------+--------+
0
Neoooar On


>Finally, I find how to creat a **loop to corrup a dataset**. I'm sharing if someone needs one day!

First, you need to defind errors you want to creat, letters to use to replace for example, variable you want to corrup, and I add errors with special caracters:

lettre = [ "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z"]

code_erreur= [ "replace","inserte","delete","espace","caract_spe", "NA","inverse"]

nombre_erreur=["1","1","1","2"]

varibale =["VARIABLEA","VARIABLEB"]

caract_spe =["_", "^", "¨", "", ".", "é", "-", "*","ù","ï","à","è","î","â"]
  • I creat a list "nombre_erreur", bc I want 75% of my dataset with 1 error and 25% with 2 errors.

Next, creat definition:

def def_code_erreur(code_erreur,varibale ,nombre_erreur,lettre,caract_spe):

  if type_erreur=="delete":
    for i in range(0,int(nb_erreur)):
      longueur = len(col1)
      pos = random.choice(range(1,longueur))
      col1 = col1[:pos] + col1[(pos+1):]
      
  if type_erreur=="espace":
    for i in range(0,int(nb_erreur)):
      longueur = len(col1)
      pos = random.choice(range(1,longueur))
      col1 = col1[:pos] + " " + col1[(pos):]
      
  if type_erreur=="inserte":
    for i in range(0,int(nb_erreur)):
      longueur = len(col1)
      pos = random.choice(range(1,longueur))
      col1 = col1[:pos] + lettre_choisie + col1[(pos):] 
      
  if type_erreur=="caract_spe":
    for i in range(0,int(nb_erreur)):
      longueur = len(col1)
      pos = random.choice(range(1,longueur))
      col1 = col1[:pos] + caract_spe_choisi + col1[(pos):]
      
  if type_erreur=="replace":
    for i in range(0,int(nb_erreur)):
      longueur = len(col1)
      pos = random.choice(range(1,longueur))
      col1 = col1[:pos-1] + lettre_choisie + col1[(pos):]      
      
  if type_erreur=="inverse":
    for i in range(0,int(nb_erreur)):
      longueur = len(col1)
      pos = random.choice(range(1,longueur))
      col1 = col1[:pos-1] + col1[pos:pos+1] + col1[pos-1:pos] + col1[(pos+1):]      
      
  if type_erreur=="NA":
    for i in range(0,int(nb_erreur)):
      col1 = col1

    
  return col1


udf_def_code_erreur = udf(def_code_erreur, StringType())

In fine, you have to call "udf_def_code_erreur "!! You can call it in a loop if you want to corrupt the whole dataset.