Problem with metadata using KSQLDB with Schema Registry and AVRO

34 Views Asked by At

I am using KSQLDB, with Schema Registry for the message schema. All the latest version of the cp docker image. The problem is that I have two topics, one for messages and another that is a message aggregator. My intention is to create a Stream from the message topic, then from this stream I want to have a table that aggregates the messages by an ID, this table is supported in the aggregator topic.

CREATE STREAM IF NOT EXISTS `messages` (`message_key_id` VARCHAR KEY) WITH (KAFKA_TOPIC='messages', PARTITIONS=10, REPLICAS=1, VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID='9');
CREATE OR REPLACE TABLE `aggregate` WITH (KAFKA_TOPIC='aggregator', PARTITIONS=10, REPLICAS=1, VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID='10') AS SELECT `message_key_id` AS `message_key_id`, COLLECT_LIST(STRUCT(`message_id`:= `message_id`, `type`:= `type`, `message_timestamp`:= `message_timestamp`, `process`:= `process`, `processed`:= `processed`)) AS `message_list` FROM `messages` GROUP BY `message_key_id` EMIT CHANGES;

When I produce a message in the message topic, the stream takes it correctly and if I do a query on the created table it shows me results. However, upon checking the aggregator topic it is empty. When I review the KSQLDB server logs I realize that it failed to produce the message because the schema was not compatible.

Reviewing the schema I realize that some fields called "connect.name" were added, which are supposed to be added by the avro converter. Reviewing the documentation I find the property connect.meta.data that I understand is what adds this metadata to the message, but I am adding the configuration in the connect compose with the ENV CONNECT_CONNECT_META_DATA and the error keeps happening.

Does anyone know how I could prevent these metadata fields from being added using the docker images or if I’m doing something wrong?

0

There are 0 best solutions below