I have kafka topic where scyllaDB is publishing it's CDC logs. Also, every minute or so Scylla does a health check with the source connector and sends a message to kafka topic. In my connector config I am trying to use Filter SMT to include all the messages that are related to CDC logs and ignore the message for health check.
CDC Kafka message -
Message Key
Struct{id=some_id, name="ABC"}
Message Value
{
"source": {
"version": "1.1.3",
"connector": "scylla",
"name": "scylla_cluster",
"ts_ms": 1705542883564,
"snapshot": "false",
"db": "my_db",
"keyspace_name": "my_db",
"table_name": "my_table",
"ts_us": 1705542883564623
},
"before": null,
"after": {
"id": "some_id",
"type": {
"value": "type1"
},
"created_on": {
"value": 1705542883677
},
"name": "ABC",
"payload": {
"value": "JSON_PAYLOAD_IN_STRING_FORMAT"
}
},
"op": "c",
"ts_ms": 1705542940611,
"transaction": null
}
Health check Kafka Message -
Message Key
Struct{serverName=scylla_cluster}
Message Value
{"ts_ms":1705543090612}
Now, in my scylla-cdc-source-connector config I am using Transforms to filter out correct messages and then use ExtractField on it to set my Kafka message key to "id"'s value, but my connector keep failing sometimes with "id" field not found exception or wrong filter condition.
"transforms": "filterOutEmptyMessage,extractKey",
"transforms.filterOutEmptyMessage.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.filterOutEmptyMessage.filter.condition": ""$.[?(@.after != null)]",
"transforms.filterOutEmptyMessage.filter.type": "include",
"transforms.filterOutEmptyMessage.missing.or.null.behavior": "exclude",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractKey.field": "id",
Also, I want to flatten the nested values in my Kafka message value, such that it becomes -
KEY
"some_id"
VALUE
{
"source": {
"version": "1.1.3",
"connector": "scylla",
"name": "scylla_cluster",
"ts_ms": 1705542883564,
"snapshot": "false",
"db": "my_db",
"keyspace_name": "my_db",
"table_name": "my_table",
"ts_us": 1705542883564623
},
"before": null,
"after": {
"id": "some_id",
"type": "type1",
"created_on": 1705542883677
"name": "ABC",
"payload": "JSON_PAYLOAD_IN_STRING_FORMAT"
},
"op": "c",
"ts_ms": 1705542940611,
"transaction": null
}
P.S. - I am fairly new to Kafka and Kafka connect. Any help or guidance is appreciated.
Things I have tried -
- I tried changing the value.converter from JSON to String
- Different filter conditions like "exists($.after)", "$.after" etc.
- Transforms from apache and confluent like Filter, ExtractField, ScyllaExtractNewState.