I have cloudevent instance build using io.cloudevents.core.v1.CloudEventBuilder builder and I gave it custom extension using method builder.withExtension("whatever", "whatever");. Cloudevent is built, sent. I can see, that in kafka, there is respective message, and it has header "whatever" having value "whatever". So far so good.
But if I deserialize given message into io.cloudevents.CloudEvent instance, the header isn't 'there'. I'd expect it in io.cloudevents.CloudEventExtensions#getExtensionNames resp. io.cloudevents.CloudEventExtensions#getExtension but it's not there or anywhere else.
Looking into io.cloudevents.kafka.CloudEventDeserializer we can dig deep and reach io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl#read, which is shown below.
Debugging this method, we will find here our header, key==value=="whatever". So value is not null, we continue, it's not "content-type" header so we proceed to else branch, and it does not have "ce_" prefix (isCloudEventsHeader), and we're done. Custom extension ignored.
This is expected to happen? Am I overlooking smth? I'd expect, that if I am able to create Cloudevent with custom extension, I'd like to read it as well. Is there some special configuration for that or anything I forgot to enable?
@Override
public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
CloudEventWriter<V> visitor = writerFactory.create(this.version);
// Grab from headers the attributes and extensions
// This implementation avoids to use visitAttributes and visitExtensions
// in order to complete the visit in one loop
this.forEachHeader((key, value) -> {
if (value == null) {
return;
}
if (isContentTypeHeader(key)) {
visitor.withContextAttribute(CloudEventV1.DATACONTENTTYPE, toCloudEventsValue(value));
} else if (isCloudEventsHeader(key)) {
String name = toCloudEventsKey(key);
if (name.equals(CloudEventV1.SPECVERSION)) {
return;
}
visitor.withContextAttribute(name, toCloudEventsValue(value));
}
});
TLDR: even custom attributes must follow the same naming conventions as 'official' ones like ce_type. Custom attribute 'whatever' is incorrect, it has to be 'ce_whatever' (or cloudevens_whatever, based on where you use it).
In our project we have multiple producers and the one producing this specific record was kafka-connect, which was misconfigured. I misread specification, that even custom extensions has to have "ce_" prefix. I thought this rule holds only for cloudevents-specific, known headers. So I fixed kafka-connect configuration and sure enough, everything is fine.