There are several similar questions over there on the internet,but no one has answers.
I am using following code to save the mongo data to Hive, but exceptions occur as shown in the end. I would ask how to work around this problem
I am using
spark-mongo-connector (spark 2.1.0 - scala 2.11)
java-mongo-driver 3.10.2
import com.mongodb.spark.MongoSpark import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType object MongoConnector_Test { def main(args: Array[String]): Unit = { val conf = new SparkConf().set("spark.mongodb.input.uri", "mongodb://user:pass@mongo1:123456/db1.t1").setMaster("local[4]").setAppName("MongoConnectorTest") val session = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() val schema: StructType = new StructType().add("_id", "string").add("x", "string").add("y", "string").add("z", "string")// val df = MongoSpark.read(session).schema(schema).load() df.write.saveAsTable("MongoConnector_Test" + System.currentTimeMillis()) } }
But, following exception occurs.
Caused by: org.bson.BsonInvalidOperationException: Invalid state INITIAL
at org.bson.json.StrictCharacterStreamJsonWriter.checkState(StrictCharacterStreamJsonWriter.java:395)
at org.bson.json.StrictCharacterStreamJsonWriter.writeNull(StrictCharacterStreamJsonWriter.java:192)
at org.bson.json.JsonNullConverter.convert(JsonNullConverter.java:24)
at org.bson.json.JsonNullConverter.convert(JsonNullConverter.java:21)
at org.bson.json.JsonWriter.doWriteNull(JsonWriter.java:206)
at org.bson.AbstractBsonWriter.writeNull(AbstractBsonWriter.java:557)
at org.bson.codecs.BsonNullCodec.encode(BsonNullCodec.java:38)
at org.bson.codecs.BsonNullCodec.encode(BsonNullCodec.java:28)
at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
at org.bson.codecs.BsonValueCodec.encode(BsonValueCodec.java:62)
at com.mongodb.spark.sql.BsonValueToJson$.apply(BsonValueToJson.scala:29)
at com.mongodb.spark.sql.MapFunctions$.bsonValueToString(MapFunctions.scala:103)
at com.mongodb.spark.sql.MapFunctions$.com$mongodb$spark$sql$MapFunctions$$convertToDataType(MapFunctions.scala:78)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:39)
at com.mongodb.spark.sql.MapFunctions$$anonfun$3.apply(MapFunctions.scala:37)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at com.mongodb.spark.sql.MapFunctions$.documentToRow(MapFunctions.scala:37)
at com.mongodb.spark.sql.MongoRelation$$anonfun$buildScan$2.apply(MongoRelation.scala:45)
at com.mongodb.spark.sql.MongoRelation$$anonfun$buildScan$2.apply(MongoRelation.scala:45)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
... 8 more
Mongo stores data in document and schema is not fixed for all documents. So, please note this case due to meta data may be null and it's root cause for your issue. Let ignore some fields aren't available in some documents and the issue will be fixed.