Problem with TimestampConverter from Confluent

60 Views Asked by At

I'm new with Kafk and I'm struggling dealing with timestamp field within my JDBC sink connector.

Making long things short, I'm using Docker to test a source PostgresSQL connector to copy the data of one table and then using a JDBC sink Connector to insert this data in another Postgres database in a table that already exists and have different column names. I think I did everything right with the rest of the configuration, but, for some reason, the TimestampConverter is not working, so it's passing a bigint on the query.

My sink connector:

confluentinc/kafka-connect-jdbc:10.7.4

name = collaborator-sink-postgres-connector
connector.class = io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max = 1
topics = collab-service.public.parametros
connection.url = host
connection.user = user
connection.password = password
database = collaborator_suite
auto.create = false
table.options = CREATE_IF_NOT_EXISTS    
transforms = timestampConverter,extractValue,replaceField,dropKey
transforms.timestampConverter.type = org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.dropKey.type = io.confluent.connect.transforms.Drop$Key
transforms.extractValue.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.replaceField.type = org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.dropKey.schema.behavior=retain

transforms.timestampConverter.field = para_dt_cadastro
transforms.timestampConverter.format=yyyy-MM-dd HH:mm:ss.SSS
transforms.timestampConverter.unix.precision= microseconds
transforms.timestampConverter.target.type = Timestamp

transforms.replaceField.renames = para_cd_id:pasi_cd_id, para_tx_dominio:pasi_tx_dominio, para_tx_descricao:pasi_tx_descricao, para_tx_valor:pasi_tx_valor, para_tx_tipo:pasi_tx_tipo, para_dt_ult_alt:pasi_dt_ult_alt, para_dt_cadastro:pasi_dt_cadastro, usua_cd_id_cadastro:usua_cd_id_cadastro, usua_cd_id_ult_alt:usua_cd_id_ult_alt, para_tx_sistema:pasi_tx_sistema
db.timezone = UTC
pk.fields = pasi_cd_id
table.name.format = public.parametro_sistema
transforms.extractValue.field=after
insert.field = pasi_cd_id, pasi_tx_dominio, pasi_tx_descricao, pasi_tx_valor, pasi_tx_tipo, pasi_dt_ult_alt, pasi_dt_cadastro, usua_cd_id_cadastro, usua_cd_id_ult_alt, pasi_tx_sistema

The message error on my Kafka Connect Docker console:

I've highlighted the timestamps fields on my insert.

Caused by: java.sql.SQLException: Exception chain: 2024-01-04 14:11:50 java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "public"."parametro_sistema" ("pasi_cd_id","pasi_tx_dominio","pasi_tx_descricao","pasi_tx_valor","pasi_tx_tipo","pasi_dt_cadastro","pasi_dt_ult_alt","usua_cd_id_cadastro","usua_cd_id_ult_alt","pasi_tx_sistema") VALUES (1,'SISTEMA_CLOUD_BUCKETNAME','Variável para referenciar o nome do bucket na nuvem do Google','r2d2-neki','R2D2',1669047144317563,1669047144317563,NULL,NULL,'SISTEMA') was aborted: ERROR: column "pasi_dt_cadastro" is of type timestamp without time zone but expression is of type bigint 2024-01-04 14:11:50 Hint: You will need to rewrite or cast the expression.

The part of the log when the sink connector try to convert the timestamp field:

transforms.timestampConverter.field = para_dt_cadastro
2024-01-04 14:11:36     transforms.timestampConverter.format = yyyy-MM-dd HH:mm:ss.SSS
2024-01-04 14:11:36     transforms.timestampConverter.negate = false
2024-01-04 14:11:36     transforms.timestampConverter.predicate = 
2024-01-04 14:11:36     transforms.timestampConverter.target.type = Timestamp
2024-01-04 14:11:36     transforms.timestampConverter.type = class org.apache.kafka.connect.transforms.TimestampConverter$Value
2024-01-04 14:11:36     value.converter = null

My timestamp field inside my message:

 "after": {
                    "para_cd_id": 2,
                    "para_tx_dominio": "SISTEMA_CLOUD_PROJECTID",
                    "para_tx_descricao": "Variável para referenciar o id do projeto na nuvem do Google",
                    "para_tx_valor": "collaborator-364516",
                    "para_tx_tipo": "SISTEMA",
                    "para_dt_cadastro": 1669047144317563,
                    "para_dt_ult_alt": 1669047144317563,
                    "usua_cd_id_cadastro": null,
                    "usua_cd_id_ult_alt": null,
                    "para_tx_sistema": "R2D2"
                },

I accept any kind of help with the timestamp problem and feel free to give me heads up on the rest of the configuration.

0

There are 0 best solutions below