I am tying to setup kafka consumer using reactor kafka . Producer is integrated with kafka schema registry
@Value("${spring.kafka.schemaRegistryUrls}")
private String schemaRegistryEnvVarValue;
@Bean
public ReceiverOptions<String, MyProto> kafkaReceiverOptionsFloor(
KafkaProperties kafkaProperties) {
final Map<String, Object> kafkaConsumerProperties =
kafkaProperties.buildConsumerProperties();
for (Map.Entry<String, KafkaProperties.Consumer> entry :
kafkaConsumerPropertiesConfig.getConsumer().entrySet()) {
if (kafkaTopics.contains(entry.getKey())) {
kafkaConsumerProperties.putAll(entry.getValue().buildProperties());
}
}
kafkaConsumerProperties.put("schema.registry.url", schemaRegistryEnvVarValue);
final ReceiverOptions<String, MyProto> basicReceiverOptions =
ReceiverOptions.<String, MyProto>create(
kafkaConsumerProperties)
.withValueDeserializer(new MyProtoDeserializer())
// disabling auto commit, since we are managing committing once
// record is
// processed
.commitInterval(Duration.ZERO)
.commitBatchSize(0);
kafkaConsumerProperties.forEach((k, v) -> log.debug("k2 {} v2 {}", k, v));
return basicReceiverOptions
.subscription(kafkaTopicsFloor)
.addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, MyProto>
reactiveKafkaConsumerTemplate(
ReceiverOptions<String, MyProto>
kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
}
I am getting exception as Protocol message contained an invalid tag (zero). Its able to parse in my Unit tests (without schema registry)
Looks like schemaregistry is not being used . what am i doing wrong here .
Deserializer looks like below
@Slf4j
public class MyProtoDeserializer implements Deserializer<MyProto> {
public MyProtoDeserializer() {}
/**
* Deserializes the data to my_proto from byte array.
*
* @param topic
* @param data
* @return
*/
@Override
public MyProto deserialize(final String topic, final byte[] data) {
if (data == null) {
return null;
}
// TODO: Use schemaregistry and kpow
try {
return MyProto.getDefaultInstance()
.getParserForType()
.parseFrom(data);
} catch (Exception ex) {
log.debug("Exception in MyProto parse {}", ex.getMessage());
return MyProto.getDefaultInstance();
}
}
}
Reactor isn't the issue.
schema.registry.urlis only a property of the Confluent Deserializer class. You are not implementingconfigurefunction in the Deserializer, therefore you are ignoring that property. Similarly, directly callingparseFromisn't using any HTTP client to interact with a Registry.Import the library, rather than write your own
https://mvnrepository.com/artifact/io.confluent/kafka-protobuf-serializer/7.4.0
Also, this is how to auto configure Spring Boot with that property
ref https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties