Unknown field on documentdb streams (changeStream.fullDocumentBeforeChange)

131 Views Asked by At

I am trying to integrate document db (5.0.0) with kafka redshift connector. when doing so i keep on getting this issue where it complains that '$changeStream.fullDocumentBeforeChange' is an unknown field.

my configuration works fine when I use an atlpes database instance. I cant seems to find any issues either when using the atlas instnace but my document db instance gives this error

[ERROR] 2023-07-11 17:24:23,810 [task-thread-ent-analytics-0] com.mongodb.kafka.connect.source.MongoSourceTask tryCreateCursor - Invalid operation: BSON field '$changeStream.fullDocumentBeforeChange' is an unknown field. 40415. It is likely that you are trying to use functionality unsupported by your version of MongoDB.
[ERROR] 2023-07-11 17:24:23,811 [task-thread-ent-analytics-0] org.apache.kafka.connect.runtime.WorkerTask doRun - WorkerSourceTask{id=ent-analytics-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
...
Caused by: org.apache.kafka.connect.errors.ConnectException: Invalid operation: BSON field '$changeStream.fullDocumentBeforeChange' is an unknown field. 40415. It is likely that you are trying to use functionality unsupported by your version of MongoDB.
    at com.mongodb.kafka.connect.source.StartedMongoSourceTask.tryCreateCursor(StartedMongoSourceTask.java:456)

I expected this to work just like atles, I think this $changeStream.fullDocumentBeforeChange is not available on doc db ? but i went though the documentation but I cant seems to find anything about this

below is my configration for reference

{
  "name": "example-name",
  "config": {
    "topic.creation.default.partitions": "1",
    "output.data.format": "AVRO",
    "topic.creation.default.replication.factor": "1",
    "value.converter.schema.registry.url": "http://schemaregistry.confluent.svc.cluster.local:8081",
    "key.converter.schema.registry.url": "http://schemaregistry.confluent.svc.cluster.local:8081",
    "name": "example-name",
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "transforms": "",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "topic.creation.groups": "example",
    "connection.uri": "***",
    "database": "***",
    "publish.full.document.only": "true",
    "publish.full.document.only.tombstone.on.delete": "true",
    "change.stream.full.document.before.change": "whenAvailable",
    "change.stream.full.document": "whenAvailable",
    "topic.separator": "_",
    "topic.prefix": "",
    "output.format.key": "schema",
    "output.format.value": "schema",
    "output.schema.infer.value": "true",
    "output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
    "mongo.errors.log.enable": "true",
    "heartbeat.topic.name": "_mongo_heartbeat"
  }
}
0

There are 0 best solutions below