org.bson.BsonInvalidOperationException: Value expected to be of type INT64 is of unexpected type OBJECT_ID

19 Views Asked by At

I try to make simple Apache Flink MongoDB connector codes to read and write json data in MongoDB. First, Below codes are the MongoDB Sink codes.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
        
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("Hello", 1));
data.add(new Tuple2<>("Hi", 2));
data.add(new Tuple2<>("Hey", 3));
        
DataStream<Tuple2<String, Integer>> stream = env.fromCollection(data);

MongoSink<Tuple2<String, Integer>> sink = MongoSink.<Tuple2<String, Integer>>builder()
            .setUri("mongodb://127.0.0.1:27017")
            .setDatabase("test_db")
            .setCollection("test_coll")
            .setBatchSize(1000)
            .setBatchIntervalMs(1000)
            .setMaxRetries(3)
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .setSerializationSchema(
                    (input, context) 
                        -> {
                            Document doc = new Document(input.f0, input.f1);
                            return new InsertOneModel<>(BsonDocument.parse(doc.toJson()));
                        })
            .build();

stream.sinkTo(sink);

These sink codes insert json type documents into MongoDB successfully. The generated documents are

{
  "_id": {
    "$oid": "65f67f3b9779060fd2390d0e"
  },
  "Hello": 1
}

But the MongoDB source codes bring some error message.

MongoSource<Tuple2<String,Integer>> source = MongoSource.<Tuple2<String,Integer>>builder()
            .setUri("mongodb://127.0.0.1:27017")
            .setDatabase("test_db")
            .setCollection("test_coll")
            .setDeserializationSchema(new MongoDeserializationSchema<Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> deserialize(BsonDocument document) {
                    String key = document.getFirstKey();
                    Integer value = document.getInt64(key).intValue();  // this line throws the error message

                    return new Tuple2<String, Integer>(key, value);
                }

                @Override
                public TypeInformation<Tuple2<String, Integer>> getProducedType() {
                    return Types.TUPLE(Types.STRING, Types.INT);
                }
            })
            .build();

DataStream<Tuple2<String, Integer>> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MongoDB-Source");
ds.print();

The error messages are

Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type INT64 is of unexpected type OBJECT_ID
        at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
        at org.bson.BsonValue.asInt64(BsonValue.java:105)
        at org.bson.BsonDocument.getInt64(BsonDocument.java:203)
        at com.aaa.test.FlinkMongoTest$1.deserialize(FlinkMongoTest.java:63)
        at com.aaa.test.FlinkMongoTest$1.deserialize(FlinkMongoTest.java:1)
        at org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema.deserialize(MongoDeserializationSchema.java:58)
        at org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:54) 
        at org.apache.flink.connector.mongodb.source.reader.emitter.MongoRecordEmitter.emitRecord(MongoRecordEmitter.java:34) 
        at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:160)

I think the value type of the request json data should be INT64 but the returned type is OBJECT_ID so these codes bring the errors. Kindly inform me how to call the integer value of mongodb document, not the OBJECT_ID. Any reply will be thanksful.

0

There are 0 best solutions below