Why does reading from CSV fail with NumberFormatException?

1.7k Views Asked by At

I use Spark 1.6.0 and Scala 2.10.5.

$ spark-shell --packages com.databricks:spark-csv_2.10:1.5.0

import org.apache.spark.sql.SQLContext   
import sqlContext.implicits._    
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val bankSchema = StructType(Array(
  StructField("age", IntegerType, true),
  StructField("job", StringType, true),
  StructField("marital", StringType, true),
  StructField("education", StringType, true),
  StructField("default", StringType, true),
  StructField("balance", IntegerType, true),
  StructField("housing", StringType, true),
  StructField("loan", StringType, true),
  StructField("contact", StringType, true),
  StructField("day", IntegerType, true),
  StructField("month", StringType, true),
  StructField("duration", IntegerType, true),
  StructField("campaign", IntegerType, true),
  StructField("pdays", IntegerType, true),
  StructField("previous", IntegerType, true),
  StructField("poutcome", StringType, true),
  StructField("y", StringType, true)))

val market_details = sqlContext.
  read.
  format("com.databricks.spark.csv").
  option("header", "true").
  schema(bankSchema).
  load("/user/sachnil.2007_gmail/Project1_dataset_bank-full.csv")    
market_details.registerTempTable("phone_table")    
val temp = sqlContext.sql("SELECT * FROM phone_table").show()

The error I am getting is:

17/05/14 06:11:42 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.NumberFormatException: For input string: "58;"management";"married";"tertiary";"no";2143;"yes";"no";"unknown";5;"may";261;1;-1;0;"unknown";"no"" at 
    java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at 
    java.lang.Integer.parseInt(Integer.java:580) at 
    java.lang.Integer.parseInt(Integer.java:615) at 
    scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at 
    scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at 
    com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:61) at 
    com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$2.apply(CsvRelation.scala:121) at 
    com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$2.apply(CsvRelation.scala:108) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at 
    scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at 
    scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at 
    scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at 
    scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at 
    scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at 
    scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at 
    org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)

The CSV contents looks like:

"age";"job";"marital";"education";"default";"balance";"housing";"loan";"contact";"day";"month";"duration";"campaign";"pdays";"previous";"poutcome";"y"
58;"management";"married";"tertiary";"no";2143;"yes";"no";"unknown";5;"may";261;1;-1;0;"unknown";"no"
44;"technician";"single";"secondary";"no";29;"yes";"no";"unknown";5;"may";151;1;-1;0;"unknown";"no"
33;"entrepreneur";"married";"secondary";"no";2;"yes";"yes";"unknown";5;"may";76;1;-1;0;"unknown";"no"
47;"blue-collar";"married";"unknown";"no";1506;"yes";"no";"unknown";5;"may";92;1;-1;0;"unknown";"no"

How can I solve it?

2

There are 2 best solutions below

4
On BEST ANSWER

Spark 1.6.0 is so old that almost no one supports it these days (unless it's part of a commercial support). I strongly recommend upgrading to the latest version 2.1.1 that gives you plenty of choices.


Let me start off with this: in my custom 2.3.0-SNAPSHOT build loading your CSV file just works so I think you may have ran into some unsupported feature of spark-csv in the version you use.

Please note that spark-csv module has been integrated in Spark as of Spark 2+ (one of the many reasons why you should upgrade your Spark).


If you insist on a custom schema (which you could let Spark to figure out itself when using inferSchema option) use at least the DSL to cut the keystrokes:

import org.apache.spark.sql.types._

val bankSchema = StructType(
  $"age".int ::
  $"job".string ::
  $"marital".string ::
  $"education".string ::
  $"default".string ::
  $"balance".int ::
  $"housing".string ::
  $"loan".string ::
  $"contact".string ::
  $"day".int ::
  $"month".string ::
  $"duration".int ::
  $"campaign".int ::
  $"pdays".int ::
  $"previous".int ::
  $"poutcome".string ::
  $"y".string ::
  Nil)

scala> println(bankSchema.treeString)
root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)

If you develop Spark applications using Scala, I'd strongly recommend describing schema using a case class and leveraging encoders (it's again Spark 2+).

case class Market(
  age: Int,
  job: String,
  marital: String,
  education: String,
  default: String,
  balance: Int,
  housing: String,
  loan: String,
  contact: String,
  day: Int,
  month: String,
  duration: Int,
  campaign: Int,
  pdays: Int,
  previous: Int,
  poutcome: String,
  y: String)
import org.apache.spark.sql.Encoders
scala> val bankSchema = Encoders.product[Market]
java.lang.UnsupportedOperationException: `default` is a reserved keyword and cannot be used as field name
- root class: "Market"
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:611)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:609)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:344)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:609)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:440)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
  at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
  ... 48 elided

(In this particular case it is not possible due to the reserved keyword default which you may therefore want to avoid in your hand-built schema too).


Once you have the schema reading gives no errors with the sample you've included in the question:

val marketDetails = spark.
  read.
  schema(bankSchema).
  option("header", true).
  option("delimiter", ";").
  csv("market_details.csv")

scala> marketDetails.show
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician| single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur|married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar|married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+

What I really like about Spark SQL is that you can stick to using pure SQL if that's your preferred "language" in Spark.

val q = """
  CREATE OR REPLACE TEMPORARY VIEW phone_table
  USING csv
  OPTIONS (
    inferSchema true,
    header true,
    delimiter ';',
    path 'market_details.csv')"""

// execute the above query and discard the result
// we're only interested in the side effect of creating a temp view
sql(q).collect

scala> sql("select * from phone_table").show
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician| single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur|married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar|married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+

PROTIP: Use spark-sql and you can leave Scala aside completely.

4
On

There seems to be two issues here:

  1. CSV delimiter

Your CSV data uses ; as delimiter, you should add the following

.option("delimiter", ";")

To the read operation in order to use instruct spark to use the right delimiter

val market_details = sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.schema(bankSchema)
.option("delimiter", ";")
.load("/user/sachnil.2007_gmail/Project1_dataset_bank-full.csv")    

More info about csv format spark-csv

delimiter: by default columns are delimited using , but delimiter can be set to any character

  1. Input data includes quote sign (")

Your input data includes un-needed "
Please remove the " from your csv input file, and run it again (PSB example input):

age;job;marital;education;default;balance;housing;loan;contact;day;month;duration;campaign;pdays;previous;poutcome;y
58;management;married;tertiary;no;2143;yes;no;unknown;5;may;261;1;-1;0;unknown;no
44;technician;single;secondary;no;29;yes;no;unknown;5;may;151;1;-1;0;unknown;no
33;entrepreneur;married;secondary;no;2;yes;yes;unknown;5;may;76;1;-1;0;unknown;no
47;blue-collar;married;unknown;no;1506;yes;no;unknown;5;may;92;1;-1;0;unknown;no

Here you can find spark-sql-csv-examples

The Baby Names example uses the following CSV input (title, follow by samples, without the quote sign):

Year,First Name,County,Sex,Count
2013,GAVIN,ST LAWRENCE,M,9
2013,LEVI,ST LAWRENCE,M,9
2013,LOGAN,NEW YORK,M,44