Kafka Type headers not removed by producer/consumer

45 Views Asked by At

application.properties

spring.cloud.function.definition=test
spring.kafka.bootstrap-servers=server-name-comes-here
spring.cloud.stream.kafka.binder.producerProperties.spring.json.add.type.headers=false
spring.cloud.stream.kafka.binder.consumerProperties.spring.json.use.type.headers=false
spring.cloud.stream.bindings.test-in-0.destination=${input.topic.name}
spring.cloud.stream.bindings.test-in-0.group=group

spring.cloud.stream.bindings.test-out-vne.destination=${output.topic.name}
spring.cloud.stream.bindings.test-out-vne.destination.transacted=true

Output headers:

[
  {
    "key": "jmsId",
    "stringValue": "1234"
  },
  {
    "key": "target-protocol",
    "stringValue": "kafka"
  },
  {
    "key": "UUID",
    "stringValue": "12334"
  },
  {
    "key": "contentType",
    "stringValue": "application/json"
  },
  {
    "key": "spring_json_header_types",
    "stringValue": "{\"jmsId\":\"java.lang.String\",\"UUID\":\"java.lang.String\",\"contentType\":\"java.lang.String\",\"target-protocol\":\"java.lang.String\"}"
  }
]

I tried with ADD_TYPE_INFO_HEADERS - false in Producer and USE_TYPE_INFO_HEADERS-false in comsumer but still output header comes with type info, pls advise how to get headers as below,

{
"jmsId": "1234",
"UUID": "12334"
}

I use ByteArrayJsonConverter in Producer config, Payload works fine but headers are having additional type info, need to remove the type info and need format as below

Expecting output:

{
"jmsId": "1234",
"UUID": "12334"
}
1

There are 1 best solutions below

0
sobychacko On

If you consume from that topic (from your outbound topic) in a Spring Cloud Stream application, you will not be able to see those headers unless you add a custom header mapper. Although, you set the add_type_header to false, the headers are still written to the topic as spring_json_header_types. That flag is for the JSON serializer to add the TYPE_ID header.

If you do not want the spring_json_header_types to go out as part of the Kafka topic record, you need to provide a custom header mapper in your application as below.

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
  BinderHeaderMapper headerMapper = new BinderHeaderMapper();
  headerMapper.setMapAllStringsOut(true);
  return headerMapper;
}

Notice that we set the mapAllStringsOut property to true, so that we map out any String values and since no values will be present, the spring_json_header_types header will not be sent to the Kafka topic.