I have built a kafka pipeline using MSK, MSK Connect, Debezium Postgres Source connector and AWS Glue schema registry. On producer side, I am able to publish AVRO records with schemas in Glue schema registry, I am using aws glue schema registry library for integration between kafka connect and glue schema registry. Below is my connector configurations for glue -
# Glue Schema Registry Specific Converters
"key.converter" = "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
"key.converter.schemas.enable" = false
"value.converter"= "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
"value.converter.schemas.enable" = true
"key.converter.region" = "us-east-1"
"key.converter.registry.name" = "<REGISTRY NAME>"
"key.converter.compatibility" = "FULL"
"key.converter.schemaAutoRegistrationEnabled" = true
"key.converter.dataFormat"="AVRO"
"key.converter.avroRecordType"="GENERIC_RECORD"
"key.converter.schemaNameGenerationClass" = "<SCHEMA NAME GENERATION CLASS>"
"value.converter.region" = "us-east-1"
"value.converter.registry.name" = "<REGISTRY NAME>"
"value.converter.compatibility" = "FULL"
"value.converter.schemaAutoRegistrationEnabled" = true
"value.converter.dataFormat"="AVRO"
"value.converter.avroRecordType"="GENERIC_RECORD"
"key.converter.schemaNameGenerationClass" = "<SCHEMA NAME GENERATION CLASS>"
On consumer side, I have an EC2 instance. I am using kafka-avro-console-consumer command provided by confluent to read the message from AVRO topic. I have also added aws glue schema registry library JAR in confluent/share/java/kafka-serde-tools folder to make it available for the command.
I am using following command -
kafka-avro-console-consumer --bootstrap-server <bootstrap_server_url> \
--consumer.config client.properties \
--property schema.registry.url=https://glue.us-east-1.amazonaws.com \
--property print.key=true \
--property print.value=true \
--key-deserializer com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer \
--value-deserializer com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer
--topic platform_avro_users --from-beginning
And the consumer config file contains -
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
key.deserializer=com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer
key.deserializer.region=us-east-1
key.deserializer.registry.name=<REGISTRY NAME>
key.deserializer.avroRecordType=GENERIC_RECORD
key.deserializer.schemaNameGenerationClass=<SCHEMANAME GENERATION CLASS NAME>
value.deserializer=com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer
value.deserializer.region=us-east-1
value.deserializer.registry.name=<REGISTRY NAME>
value.deserializer.avroRecordType=GENERIC_RECORD
value.deserializer.schemaNameGenerationClass=<SCHEMANAME GENERATION CLASS NAME>
I am expecting the command to print the messages for me but I get following error -
Processed a total of 1 messages
ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:44)
java.lang.NullPointerException: Cannot invoke "com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade.deserialize(com.amazonaws.services.schemaregistry.common.AWSDeserializerInput)" because "this.glueSchemaRegistryDeserializationFacade" is null
at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:149)
at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)
at io.confluent.kafka.formatter.AvroMessageFormatter$AvroMessageDeserializer.deserializeKey(AvroMessageFormatter.java:125)
at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:157)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:116)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:76)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Looks like the GlueSchemaRegistryDeserializationFacade class isn't being initilized hence the records are not able to deserialise. I am not sure if I am adding the deserialiser JAR at right place. Is there a way to fix this issue and read AVRO records when using glue schema registry?