I have below images running in docker.
quay.io/debezium/connect:2.0
quay.io/debezium/kafka:2.0
quay.io/debezium/zookeeper:2.0
container-registry.oracle.com/database/enterprise:latest
debezium-source-connector config
"name": "customers-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"topic.prefix": "server1",
"database.hostname": "dbz_oracle21",
"database.port": "1521",
"database.user": "c##dbzuser",
"database.password": "dbz",
"database.dbname": "ORCLCDB",
"database.pdb.name": "ORCLPDB1",
"database.server.name": "server1",
"table.include.list": "C##DBZUSER.CUSTOMERS",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.customers",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
"internal.key.converter.schemas.enable":false,
"internal.value.converter.schemas.enable":false,
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
Oracle-JDBC-Sink-Connector Config:
"name": "jdbc-sink-2",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "CUSTOMERS",
"table.name.format": "kafka_customers",
"connection.url": "jdbc:oracle:thin:@dbz_oracle21:1521/orclpdb1",
"connection.user": "c##sinkuser",
"connection.password": "sinkpw",
"auto.create":true,
"auto.evolve":true,
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"pk.fields": "id",
"insert.mode":"insert",
"pk.mode": "record_key"
}
I can see the CDC events getting published to kafka topic : "customers".
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"}],"optional":false,"name":"server1.C__DBZUSER.CUSTOMERS.Key"},"payload":{"ID":1011}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"}],"optional":true,"name":"server1.C__DBZUSER.CUSTOMERS.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"}],"optional":true,"name":"server1.C__DBZUSER.CUSTOMERS.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"string","optional":true,"field":"txId"},{"type":"string","optional":true,"field":"scn"},{"type":"string","optional":true,"field":"commit_scn"},{"type":"string","optional":true,"field":"lcr_position"},{"type":"string","optional":true,"field":"rs_id"},{"type":"int32","optional":true,"field":"ssn"},{"type":"int32","optional":true,"field":"redo_thread"},{"type":"string","optional":true,"field":"user_name"}],"optional":false,"name":"io.debezium.connector.oracle.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"server1.C__DBZUSER.CUSTOMERS.Envelope","version":1},"payload":{"before":{"ID":1011,"NAME":"r 3"},"after":{"ID":1011,"NAME":"233"},"source":{"version":"2.0.1.Final","connector":"oracle","name":"server1","ts_ms":1674978001000,"snapshot":"false","db":"ORCLPDB1","sequence":null,"schema":"C##DBZUSER","table":"CUSTOMERS","txId":"0a001b007a020000","scn":"3252353","commit_scn":"3252452","lcr_position":null,"rs_id":null,"ssn":0,"redo_thread":1,"user_name":"C##DBZUSER"},"op":"u","ts_ms":1674978030086,"transaction":null}}
When I try to sink these CDC event from topic name "customers" with sink connector config, I see error message in connector log -
tition CUSTOMERS-0 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2023-01-29 14:57:13,174 INFO || [Consumer clientId=connector-consumer-jdbc-sink-2-0, groupId=connect-jdbc-sink-2] Resetting offset for partition CUSTOMERS-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[172.17.0.3:9092 (id: 1 rack: null)], epoch=0}}. [org.apache.kafka.clients.consumer.internals.SubscriptionState]
2023-01-29 14:57:13,181 INFO || Attempting to open connection #1 to Oracle [io.confluent.connect.jdbc.util.CachedConnectionProvider]
2023-01-29 14:57:13,222 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter]
2023-01-29 14:57:13,263 ERROR || WorkerSinkTask{id=jdbc-sink-2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: PK mode for table 'kafka_customers' is RECORD_KEY with configured PK fields [id], but record key schema does not contain field: id [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.ConnectException: PK mode for table 'kafka_customers' is RECORD_KEY with configured PK fields [id], but record key schema does not contain field: id
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extractRecordKeyPk(FieldsMetadata.java:208)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:97)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:63)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:114)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2023-01-29 14:57:13,263 ERROR || WorkerSinkTask{id=jdbc-sink-2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: PK mode for table 'kafka_customers' is RECORD_KEY with configured PK fields [id], but record key schema does not contain field: id
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extractRecordKeyPk(FieldsMetadata.java:208)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:97)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:63)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:114)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more
2023-01-29 14:57:13,263 INFO || Stopping task [io.confluent.connect.jdbc.sink.JdbcSinkTask]
Oracle is know as a case insensitive system, so
select id from tabworks even if the column name isID.But most application quote the column name, so the cofiguration
will lead to something like
select "id" from tabwhich will trigger the error
record key schema does not contain field: idFix the case.