Kafka Connector : Debezium mongodb source connector got error if field in object that in array can be null

27 Views Asked by At

First of all my data in document in mongo have one field that array object but some fields in each object can be null such as field “a” in object at index 0 in array has date value but field “a” in object at index 1 in array has null value like this

arrayObject : [
          {
                "a" : 2021-01-10T00:00:00.000+00:00
          },
          {
                "a": null
          }
     ]

now i got error like this at source connector

"java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type STRING: class java.util.Date for field: \"a\"\n\tat org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)\n\tat org.apache.kafka.connect.data.Struct.put(Struct.java:216)\n\tat org.apache.kafka.connect.data.Struct.put(Struct.java:203)"

and here is my kafka source connector configuration

{
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "auto.create.topics.enable": "false",
    "topic.creation.enable": "true",
    "topic.creation.default.partitions": "3",
    "topic.prefix": "test-prefix",
    "topic.creation.default.replication.factor": "3",
    "topic.creation.default.compression.type": "gzip",
    "topic.creation.default.file.delete.delay.ms": "432000000",
    "topic.creation.default.cleanup.policy": "delete",
    "topic.creation.default.retention.ms": "432000000",
    "tombstones.on.delete": "false",
    "mongodb.connection.string" : "xxxxxxxxxx",
    "mongodb.name": "xxxxxxxxxx",
    "mongodb.user" : "xxxxxxxxxx",
    "mongodb.password" : "xxxxxxxxxx",
    "mongodb.authSource": "xxxxxxxxxx",
    "mongodb.connection.mode": "replica_set",
    "database.include.list" : "xxxxxxxxxx",
    "name": "test-connector",
    "collection.include.list": "xxxxxxxxxx",
    "schema.history.internal.kafka.bootstrap.servers": "kafka-0.kafka-headless.kafka-connector:9092",
    "schema.history.internal.kafka.topic": "test-connector-schema",
    "transforms": "ReplaceField, unwrap, RenameField, convertTS, RenameTSMS",
    "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.ReplaceField.exclude": "source",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.collection.expand.json.payload": "true",
    "transforms.unwrap.add.fields": "op,ts_ms",
    "transforms.unwrap.add.fields.prefix": "",
    "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.RenameField.renames": "_id:id",
    "transforms.convertTS.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.convertTS.field": "ts_ms",
    "transforms.convertTS.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
    "transforms.convertTS.target.type": "string",
    "transforms.RenameTSMS.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.RenameTSMS.renames": "ts_ms:@timestamp",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.nullable": "true",
    "value.converter.schemas.enable": "false",
    "capture.mode": "change_streams_update_full_with_pre_image",
    "snapshot.mode": "never",
    "capture.scope": "deployment",
    "tasks.max": "1"
}

how can i set some config to allow null value or ignore this error

Thanks

0

There are 0 best solutions below