Ksqldb, kafka streams. Splitting topic messages and publish into different topics as is by condition

71 Views Asked by At

I have a topic, let's say "topic_soure". Messages are in json format.

The top level fields are the same for all messages, but the 'data' field may have different models.

I do not knew exactly what 'data' field model might be. But every 'data' field model has a 'priority' field at the top level.

Examples:

{
    "id": "39f8582e-91e1-41b0-8590-f83f0a4b756a",
    "time": "2023-12-14T10:45:36.913Z",
    "data": {
        "priority": 1,
        "sub_data":{
            "id":"some id",
            "value": "some value"
        }
    }
}
{
    "id": "39f8582e-91e1-41b0-8590-f83f0a4b756a",
    "time": "2023-12-14T10:45:36.913Z",
    "data": {
        "priority": 2,
        "related_data":{
            "id":"some id2",
            "value": "some value2"
        }
    }
}

As I understand it, AVRO and SchemaRegistry are not available for use in my environment.

I need to split 'topic_soure' into two topics by the field 'priority'.

To solve the issue, I create stream with backing topic 'topic_soure';

create stream soure_stream ( \
   id              varchar,   \
   time            varchar,   \
   data            varchar    \
) with (key_format='kafka', kafka_topic = 'topic_soure', value_format='json');

then, I create two streams with sink topics for each priority

create stream soure_stream_priority_1 \
with (kafka_topic = 'topic_soure_priority_1', format='JSON')\
as select * from soure_stream \
where extractjsonfield(data, '$.priority') = 1;

create stream soure_stream_priority_2 \
with (kafka_topic = 'topic_soure_priority_2', format='JSON')\
as select * from soure_stream \
where extractjsonfield(data, '$.priority') = '2';

It works well.

But, in result topics message model change.

{
  "ID": "39f8582e-91e1-41b0-8590-f83f0a4b756a",
  "TIME": "2023-12-14T10:45:36.913Z",
  "DATA": "{\"priority\":1,\"sub_data\":{\"id\":\"some id\",\"value\":\"some value\"}}"
}

The 'data' field becomes an escaped json string. I need the message to go through unchanged, as is.

Is there any way to convert the 'data' field format? or.. Can I solve this problem in another way using ksqldb/streams?

2

There are 2 best solutions below

1
Oussama ZAGHDOUD On

If you are looking for resolving that using the kafka-streams java API, the answer can be like that :

final KStream<String, String> initialStream = streamsBuilder.stream("topic_soure", Consumed.with(Serdes.String(), Serdes.String()));

final KStream<String, String> dataWithPeriority1 =  initialStream.filter((key, value) -> {
            return isMessagePeriorityEqualTo(value, 1);
        });
final KStream<String, String> dataWithPeriority2 =  initialStream.filter((key, value) -> {
            return isMessagePeriorityEqualTo(value, 2);
        });
dataWithPeriority1.to("periority-one-output-topic");
dataWithPeriority1.to("periority-two-output-topic");


private boolean isMessagePeriorityEqualTo(final String message, final int expectedPeriority) {
        final JSONObject messageAsJson = new JSONObject(message);
        final JSONObject data = messageAsJson.getJSONObject("data");
        final String periority = data.getString("periority");
        // do that checking part and enhance it based on what you need 
        int dataPeriority = 0;
        if (periority.contains("1"))
            dataPeriority = 1;
        else
            dataPeriority = 2;

        return dataPeriority==expectedPeriority;
    }

Steps :

  • Consume from the input topic.
  • Create two streams based on what you need.
  • Send the data from each stream to an output topic dedicated(should be created before).

Like that you will send the same data(records) received.

Note : the part of checking is the periority is equal to 1 or 2 is your logic part, you can check how to do that in the best way, here I used the JSONObject to get the data as json .. but you can use anything else.

0
DmitrySpb On

Using JSON format is not a good idea.

Working solution

create stream soure_stream(message string) \
with (key_format='kafka', kafka_topic = 'topic_soure', value_format='kafka');

create stream soure_stream_priority_1 \
with (kafka_topic = 'topic_soure_priority_1', format='kafka')\
as select * from soure_stream\
where extractjsonfield(message, '$.data.priority') = '1';

create stream soure_stream_priority_1 \
with (kafka_topic = 'topic_soure_priority_2', format='kafka')\
as select * from soure_stream\
where extractjsonfield(message, '$.data.priority') = '2';