I am trying to integrate document db (5.0.0) with kafka redshift connector. when doing so i keep on getting this issue where it complains that '$changeStream.fullDocumentBeforeChange' is an unknown field.
my configuration works fine when I use an atlpes database instance. I cant seems to find any issues either when using the atlas instnace but my document db instance gives this error
[ERROR] 2023-07-11 17:24:23,810 [task-thread-ent-analytics-0] com.mongodb.kafka.connect.source.MongoSourceTask tryCreateCursor - Invalid operation: BSON field '$changeStream.fullDocumentBeforeChange' is an unknown field. 40415. It is likely that you are trying to use functionality unsupported by your version of MongoDB.
[ERROR] 2023-07-11 17:24:23,811 [task-thread-ent-analytics-0] org.apache.kafka.connect.runtime.WorkerTask doRun - WorkerSourceTask{id=ent-analytics-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
...
Caused by: org.apache.kafka.connect.errors.ConnectException: Invalid operation: BSON field '$changeStream.fullDocumentBeforeChange' is an unknown field. 40415. It is likely that you are trying to use functionality unsupported by your version of MongoDB.
at com.mongodb.kafka.connect.source.StartedMongoSourceTask.tryCreateCursor(StartedMongoSourceTask.java:456)
I expected this to work just like atles, I think this $changeStream.fullDocumentBeforeChange is not available on doc db ? but i went though the documentation but I cant seems to find anything about this
below is my configration for reference
{
"name": "example-name",
"config": {
"topic.creation.default.partitions": "1",
"output.data.format": "AVRO",
"topic.creation.default.replication.factor": "1",
"value.converter.schema.registry.url": "http://schemaregistry.confluent.svc.cluster.local:8081",
"key.converter.schema.registry.url": "http://schemaregistry.confluent.svc.cluster.local:8081",
"name": "example-name",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topic.creation.groups": "example",
"connection.uri": "***",
"database": "***",
"publish.full.document.only": "true",
"publish.full.document.only.tombstone.on.delete": "true",
"change.stream.full.document.before.change": "whenAvailable",
"change.stream.full.document": "whenAvailable",
"topic.separator": "_",
"topic.prefix": "",
"output.format.key": "schema",
"output.format.value": "schema",
"output.schema.infer.value": "true",
"output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"mongo.errors.log.enable": "true",
"heartbeat.topic.name": "_mongo_heartbeat"
}
}