I create the following dataset using groupBy:
Dataset<Row> rangeSet = inputSet.select(key, partitionIDColName).groupBy(partitionIDColName)
.agg(functions.min(key).as(minKeyColName), functions.max(key).as(maxKeyColName))
.orderBy(functions.col(partitionIDColName).asc());
key is a string. And the column represented by key is storing string values. I partition the dataset by the integer column partitionIDColName and get the minimum and maximum value of the key for each partition ID.
I then collect the rows of rangeSet onto the driver and loop over them for validation:
List<Row> ranges = rangeSet.collectAsList();
for (Row r : ranges) {
String rangeStr;
Integer partitionID = r.<Integer>getAs(partitionIDColName);
String minKeyVal = r.<String>getAs(minKeyColName);
String maxKeyVal = r.<String>getAs(maxKeyColName);
if (minKeyVal.compareTo(maxKeyVal) > 0) {
throw new Exception("minKeyVal is greter than maxKeyVal for range " + partitionID + ", minKeyVal: '" + minKeyVal + "'" + "maxKeyVal: '" + maxKeyVal + "'");
}
}
For some of the rows, the exception is triggered, indicating that the minimum returned by groupBy is less than the maximum value returned. How is this possible? Is Spark applying a different lexicographical ordering when performing the aggregation than when my driver calls to String::compareTo()? Is it a java versioning issue between Spark and my driver. I have verified that I am using jdk-11.0.19 to run Spark, Hadoop, and my driver. My driver is also built and executed with jdk-11.0.19.