Getting a distinct count from a dataframe using Apache Spark

2.8k Views Asked by At

I have data that looks like this

+--------------+---------+-------+---------+
|       dataOne|OtherData|dataTwo|dataThree|
+--------------+---------|-------+---------+
|          Best|     tree|      5|      533|
|            OK|     bush|      e|     3535|
|           MEH|      cow|      -|     3353|
|           MEH|      oak|   none|       12|
+--------------+---------+-------+---------+

and I'm trying to get it into the output of

+--------------+---------+
|       dataOne|    Count|
+--------------+---------|
|          Best|        1|
|            OK|        1|
|           Meh|        2|
+--------------+---------+

I have no problem getting the dataOne into a dataframe by itself and showing the contents of it in order to make sure I'm just grabbing the dataOne column, However I can't seem to find the correct syntax for either turning that sql query into a the data I need. I tried creating this following dataframe from the temp view created by the entire data set

Dataset<Row> dataOneCount = spark.sql("select dataOne, count(*) from 
dataFrame group by dataOne");
dataOneCount.show();

But spark The documentation I was able to find on this only showed how to do this type of aggregation in spark 1.6 and prior so any help would be appreciated.

Here's the error message I get, However I've checked the data and there is no indexing error in there.

 java.lang.ArrayIndexOutOfBoundsException: 11

I've also tried applying the functions() method countDistinct

Column countNum = countDistinct(dataFrame.col("dataOne"));
Dataset<Row> result = dataOneDataFrame.withColumn("count",countNum);
result.show();

where dataOneDataFrame is a dataFrame created from running

select dataOne from dataFrame

But it returns an analysis exception, I'm still new to spark so I'm not sure if there's an error with how/when I'm evaluating the countDistinct method

edit: To clarify, the first table shown is the result of the dataFrame I've created from reading the text file and applying a custom schema to it (they are still all strings)

Dataset<Row> dataFrame 

Here is my full code

public static void main(String[] args) {


    SparkSession spark = SparkSession
            .builder()
            .appName("Log File Reader")
            .getOrCreate();

    //args[0] is the textfile location
    JavaRDD<String> logsRDD = spark.sparkContext()
            .textFile(args[0],1)
            .toJavaRDD();

    String schemaString = "dataOne OtherData dataTwo dataThree";

    List<StructField> fields = new ArrayList<>();
    String[] fieldName = schemaString.split(" ");


    for (String field : fieldName){
        fields.add(DataTypes.createStructField(field, DataTypes.StringType, true));
    }
    StructType schema = DataTypes.createStructType(fields);

    JavaRDD<Row> rowRDD = logsRDD.map((Function<String, Row>) record -> {
       String[] attributes = record.split(" ");
       return RowFactory.create(attributes[0],attributes[1],attributes[2],attributes[3]);
    });


    Dataset<Row> dF = spark.createDataFrame(rowRDD, schema);

    //first attempt
    dF.groupBy(col("dataOne")).count().show();

    //Trying with a sql statement
    dF.createOrReplaceTempView("view");
    dF.sparkSession().sql("select command, count(*) from view group by command").show();

The most likely thing that comes to mind is the lambda function that returns the row using RowFactory? The idea seems sound but I'm not sure how it really holds up or if there's another way I could do it. Other than that I'm quite puzzled

sample data

best tree 5 533
OK bush e 3535
MEH cow - 3353
MEH oak none 12
1

There are 1 best solutions below

8
On BEST ANSWER

Using Scala syntax for convenience. It's very similar to the Java syntax:

// Input data
val df = {
  import org.apache.spark.sql._
  import org.apache.spark.sql.types._
  import scala.collection.JavaConverters._

  val simpleSchema = StructType(
    StructField("dataOne", StringType) ::
    StructField("OtherData", StringType) ::
    StructField("dataTwo", StringType) ::
    StructField("dataThree", IntegerType) :: Nil)

  val data = List(
    Row("Best", "tree", "5", 533),
    Row("OK", "bush", "e", 3535),
    Row("MEH", "cow", "-", 3353),
    Row("MEH", "oak", "none", 12)
  )

  spark.createDataFrame(data.asJava, simpleSchema)
}

df.show
+-------+---------+-------+---------+
|dataOne|OtherData|dataTwo|dataThree|
+-------+---------+-------+---------+
|   Best|     tree|      5|      533|
|     OK|     bush|      e|     3535|
|    MEH|      cow|      -|     3353|
|    MEH|      oak|   none|       12|
+-------+---------+-------+---------+
df.groupBy(col("dataOne")).count().show()
+-------+-----+
|dataOne|count|
+-------+-----+
|    MEH|    2|
|   Best|    1|
|     OK|    1|
+-------+-----+

I can submit the Java code given above as follows with the four row data file on S3 and it works fine:

$SPARK_HOME/bin/spark-submit \
  --class sparktest.FromStackOverflow \
  --packages "org.apache.hadoop:hadoop-aws:2.7.3" \
  target/scala-2.11/sparktest_2.11-1.0.0-SNAPSHOT.jar "s3a://my-bucket-name/sample.txt"