Multiple Filtering in PySpark

1.7k Views Asked by At

I have imported a data set into Juputer notebook / PySpark to process through EMR, for example:

data sample

I want to clean up the data before using it using the filter function. This includes:

  1. Removing rows that are blank or '0' or NA cost or date. I think the filter would be something like: .filter(lambda (a,b,c,d): b = ?, c % 1 == c, d = ?). I'm unsure how to filter fruit and store here.
  2. Remove incorrect values e.g. '3' is not a fruit name. This is easy for numbers (just to number % 1 == number) but I'm unsure how it would filter out the words.
  3. Removing rows that are statistically outliers i.e. 3 standard deviations from the mean. So here cell C4 would clearly need to removed but I am unsure how to incorporate this logic into a filter.

Do I need to perform one filter at a time or is there a way to filter the data set (in lambda notation) all in one go?

Or, would it be easier to write a Spark SQL query instead which has many filters in the 'where' clause (but then #3 above is still difficult to write in SQL).

1

There are 1 best solutions below

5
mayank agrawal On BEST ANSWER

If you read in the documentation, http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.filter, it is written that

where() is an alias for filter().

So,you can safely use 'filter' instead of 'where' for multiple conditions too.

EDIT: If you want to filter on many conditions for many columns, I would prefer this method.

from dateutil.parser import parse
import pyspark.sql.functions as F

def is_date(string):
    try: 
       parse(string)
       return True
    except ValueError:
       return False
def check_date(d):
    if is_date(d):
        return d
    else:
        return None

date_udf = F.udf(check_date,StrinType())

def check_fruit(name):
    fruits_list #create a list of fruits(can easily find it on google)
                #difficult filtering words otherwise
                #try checking from what you want, rest will be filtered
    if name in fruits_list:
        return name
    else:
        return None

fruit_udf = F.udf(check_fruit,StringType())

def check_cost(value):
    mean, std #calculcated beforehand
    threshold_upper = mean + (3*std)
    threhold_lower = mean - (3*std)

    if value > threhold_lower and value < threshold_upper:
        return value
    else:
        return None
cost_udf = F.udf(check_cost,StringType())        

#Similarly create store_udf

df = df.select([date_udf(F.col('date')).alias('date'),\
            fruit_udf(F.col('fruit')).alias('fruit'),\
            cost_udf(F.col('cost')).alias('cost'),\
            store_udf(F.col('store')).alias('store')]).dropna()

This will result in working on all columns together.