Does CloudEventDeserializer deserialize custom extensions (kafka headers)?

220 Views Asked by At

I have cloudevent instance build using io.cloudevents.core.v1.CloudEventBuilder builder and I gave it custom extension using method builder.withExtension("whatever", "whatever");. Cloudevent is built, sent. I can see, that in kafka, there is respective message, and it has header "whatever" having value "whatever". So far so good.

But if I deserialize given message into io.cloudevents.CloudEvent instance, the header isn't 'there'. I'd expect it in io.cloudevents.CloudEventExtensions#getExtensionNames resp. io.cloudevents.CloudEventExtensions#getExtension but it's not there or anywhere else.

Looking into io.cloudevents.kafka.CloudEventDeserializer we can dig deep and reach io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl#read, which is shown below.

Debugging this method, we will find here our header, key==value=="whatever". So value is not null, we continue, it's not "content-type" header so we proceed to else branch, and it does not have "ce_" prefix (isCloudEventsHeader), and we're done. Custom extension ignored.

This is expected to happen? Am I overlooking smth? I'd expect, that if I am able to create Cloudevent with custom extension, I'd like to read it as well. Is there some special configuration for that or anything I forgot to enable?

    @Override
    public <T extends CloudEventWriter<V>, V> V read(CloudEventWriterFactory<T, V> writerFactory, CloudEventDataMapper<? extends CloudEventData> mapper) throws CloudEventRWException, IllegalStateException {
        CloudEventWriter<V> visitor = writerFactory.create(this.version);

        // Grab from headers the attributes and extensions
        // This implementation avoids to use visitAttributes and visitExtensions
        // in order to complete the visit in one loop
        this.forEachHeader((key, value) -> {
            if (value == null) {
                return;
            }
            if (isContentTypeHeader(key)) {
                visitor.withContextAttribute(CloudEventV1.DATACONTENTTYPE, toCloudEventsValue(value));
            } else if (isCloudEventsHeader(key)) {
                String name = toCloudEventsKey(key);
                if (name.equals(CloudEventV1.SPECVERSION)) {
                    return;
                }
                visitor.withContextAttribute(name, toCloudEventsValue(value));
            }
        });
1

There are 1 best solutions below

0
Martin Mucha On

TLDR: even custom attributes must follow the same naming conventions as 'official' ones like ce_type. Custom attribute 'whatever' is incorrect, it has to be 'ce_whatever' (or cloudevens_whatever, based on where you use it).

In our project we have multiple producers and the one producing this specific record was kafka-connect, which was misconfigured. I misread specification, that even custom extensions has to have "ce_" prefix. I thought this rule holds only for cloudevents-specific, known headers. So I fixed kafka-connect configuration and sure enough, everything is fine.