kafka DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic

144 Views Asked by At

I am using confluent to create a POC for a project. I create the schema for the topic and added a redshift connector for that topic, so when I send a HTTP request to the kafka cluster based on the schema data should be inserted into the redshift db under the table of the topic. I cannot figure out why I am getting JSON schema convertor error when the request is JSON and the redshift connector is configured to use JSON_SR.

I am new to kafka so please bear if I have made fundamental mistakes in the configuration of the pipeline described above.

Any idea how to resolve this error?

Let me know if additional information is required.

schema : enter image description here HTTP Request via postman: enter image description here The error I got in the redshift connector sink is given below.

[
  {
    "key": "__connect.errors.topic",
    "value": "stripe-connector-2"
  },
  {
    "key": "__connect.errors.partition",
    "value": "4"
  },
  {
    "key": "__connect.errors.offset",
    "value": "0"
  },
  {
    "key": "__connect.errors.connector.name",
    "value": "lcc-2319gq"
  },
  {
    "key": "__connect.errors.task.id",
    "value": "0"
  },
  {
    "key": "__connect.errors.stage",
    "value": "VALUE_CONVERTER"
  },
  {
    "key": "__connect.errors.class.name",
    "value": "io.confluent.connect.json.JsonSchemaConverter"
  },
  {
    "key": "__connect.errors.exception.class.name",
    "value": "org.apache.kafka.connect.errors.DataException"
  },
  {
    "key": "__connect.errors.exception.message",
    "value": "Converting byte[] to Kafka Connect data failed due to serialization error of topic stripe-connector-2: "
  },
  {
    "key": "__connect.errors.exception.stacktrace",
    "value": "org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic stripe-connector-2: \n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:133)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:528)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:190)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:224)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:166)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:528)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:503)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:339)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:227)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:282)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:183)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:192)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:267)\n\tat io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:179)\n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:116)\n\t... 18 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:252)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:116)\n\t... 21 more\n"
  }
]
0

There are 0 best solutions below