I have data that looks like this
+--------------+---------+-------+---------+
| dataOne|OtherData|dataTwo|dataThree|
+--------------+---------|-------+---------+
| Best| tree| 5| 533|
| OK| bush| e| 3535|
| MEH| cow| -| 3353|
| MEH| oak| none| 12|
+--------------+---------+-------+---------+
and I'm trying to get it into the output of
+--------------+---------+
| dataOne| Count|
+--------------+---------|
| Best| 1|
| OK| 1|
| Meh| 2|
+--------------+---------+
I have no problem getting the dataOne into a dataframe by itself and showing the contents of it in order to make sure I'm just grabbing the dataOne column, However I can't seem to find the correct syntax for either turning that sql query into a the data I need. I tried creating this following dataframe from the temp view created by the entire data set
Dataset<Row> dataOneCount = spark.sql("select dataOne, count(*) from
dataFrame group by dataOne");
dataOneCount.show();
But spark The documentation I was able to find on this only showed how to do this type of aggregation in spark 1.6 and prior so any help would be appreciated.
Here's the error message I get, However I've checked the data and there is no indexing error in there.
java.lang.ArrayIndexOutOfBoundsException: 11
I've also tried applying the functions() method countDistinct
Column countNum = countDistinct(dataFrame.col("dataOne"));
Dataset<Row> result = dataOneDataFrame.withColumn("count",countNum);
result.show();
where dataOneDataFrame is a dataFrame created from running
select dataOne from dataFrame
But it returns an analysis exception, I'm still new to spark so I'm not sure if there's an error with how/when I'm evaluating the countDistinct method
edit: To clarify, the first table shown is the result of the dataFrame I've created from reading the text file and applying a custom schema to it (they are still all strings)
Dataset<Row> dataFrame
Here is my full code
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("Log File Reader")
.getOrCreate();
//args[0] is the textfile location
JavaRDD<String> logsRDD = spark.sparkContext()
.textFile(args[0],1)
.toJavaRDD();
String schemaString = "dataOne OtherData dataTwo dataThree";
List<StructField> fields = new ArrayList<>();
String[] fieldName = schemaString.split(" ");
for (String field : fieldName){
fields.add(DataTypes.createStructField(field, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = logsRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(" ");
return RowFactory.create(attributes[0],attributes[1],attributes[2],attributes[3]);
});
Dataset<Row> dF = spark.createDataFrame(rowRDD, schema);
//first attempt
dF.groupBy(col("dataOne")).count().show();
//Trying with a sql statement
dF.createOrReplaceTempView("view");
dF.sparkSession().sql("select command, count(*) from view group by command").show();
The most likely thing that comes to mind is the lambda function that returns the row using RowFactory? The idea seems sound but I'm not sure how it really holds up or if there's another way I could do it. Other than that I'm quite puzzled
sample data
best tree 5 533
OK bush e 3535
MEH cow - 3353
MEH oak none 12
Using Scala syntax for convenience. It's very similar to the Java syntax:
I can submit the Java code given above as follows with the four row data file on S3 and it works fine: