I'm currently encountering an issue with Quarkus while trying to connect to Kafka using Avro Schemas already added in the Confluent Avro Schemas under the name default.avro Here's a breakdown of the problem:
Problem Description When attempting to connect to the default Avro schema created in the Confluent Avro Schemas, Quarkus is not functioning as expected.
Configuration Details Below is the relevant configuration excerpt:
properties
Kafka Configuration in application.properties
mp.messaging.outgoing.logistics-events-example-out.topic=logistics.events.eta.example.test
mp.messaging.outgoing.logistics-events-example-out.transforms=SetSchemaMetadata
mp.messaging.outgoing.logistics-events-example-out.transforms.SetSchemaMetadata.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
mp.messaging.outgoing.logistics-events-example-out.transforms.SetSchemaMetadata.schema.name=default.avro
mp.messaging.outgoing.logistics-events-example-out.transforms.SetSchemaMetadata.schema.version=1
Dependencies Here are the dependencies listed in the pom.xml file:
xml
<!-- AVRO -->
<!-- the extension -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-confluent-registry-avro</artifactId>
</dependency>
<!-- Confluent registry libraries use Jakarta REST client -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-reactive</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.3.0</version>
<exclusions>
<exclusion>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.3.0</version>
</dependency>
Additional Information The configuration seems correct, but despite this, the connection to the default Avro schema from Confluent Avro Schemas isn't working as intended.
Any insights or suggestions on how to resolve this issue would be greatly appreciated!
error from the logs
sampled=false] io.smallrye.reactive.messaging.kafka.impl.KafkaSink lambda$writeMessageToKafka$4 SRMSG18212: Message org.eclipse.microprofile.reactive.messaging.Message$5@32c7bed1 from channel 'test-events-example-out' was not sent to Kafka topic 'logistics.events.eta.example.test' - nacking message: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema{"type":"record","name":"ExampleChangedEvent","namespace":"Example","fields":[{"name":"countryCode","type":{"type":"string","avro.java.string":"String"}},{"name":"occurredOn","type":{"type":"string","avro.java.string":"String"}}]}
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:253)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:168)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:61)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.lambda$serialize$1(SerializerWrapper.java:56)
at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.wrapSerialize(SerializerWrapper.java:81)
at io.smallrye.reactive.messaging.kafka.fault.SerializerWrapper.serialize(SerializerWrapper.java:56)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1000)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947)
at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.lambda$send$4(ReactiveKafkaProducer.java:164)
at io.smallrye.context.impl.wrappers.SlowContextualConsumer.accept(SlowContextualConsumer.java:21)
at io.smallrye.mutiny.operators.uni.builders.UniCreateWithEmitter.subscribe(UniCreateWithEmitter.java:22)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:81)
at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onItem(UniOperatorProcessor.java:47)
at io.smallrye.mutiny.operators.uni.UniMemoizeOp.forwardTo(UniMemoizeOp.java:123)
at io.smallrye.mutiny.operators.uni.UniMemoizeOp.subscribe(UniMemoizeOp.java:67)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
at [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at [email protected]/java.lang.Thread.runWith(Thread.java:1596)
at [email protected]/java.lang.Thread.run(Thread.java:1583)
at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:833)
at org.graalvm.nativeimage.builder/com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:211)
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'example.name-value' not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:302)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:372)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:463)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:448)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getIdFromRegistry(CachedSchemaRegistryClient.java:350)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getId(CachedSchemaRegistryClient.java:565)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:137)
... 24 more
I expected Quarkus to connect to Kafka using the specified Avro schema (default.avro) from the Confluent Avro Schemas without any issues. The configuration seemed correct, and I anticipated that the connection would be established successfully.
where i got my info for the application.properties
https://docs.confluent.io/platform/current/connect/transforms/setschemametadata.html
"transforms": "SetSchemaMetadata", "transforms.SetSchemaMetadata.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value", "transforms.SetSchemaMetadata.schema.name": "order-value" "transforms.SetSchemaMetadata.schema.version": "2"