How do I convert an Avro object to Json and back ? (when avro schema contains union)

584 Views Asked by At

I'm trying to implement the outbox pattern, but I'm running into an issue with Avro and Json.

I have a java quarkus application with an avro schema containing a union of objects. What I want to do is:

  1. create the class object (message to be sent)
  2. save it to the database in a human readable format such as json
  3. then retrieve it from the database and send it to kafka

This is an anonymised example of my schema.:

{
  "namespace": "com.acme.kafka",
  "type": "record",
  "name": "ValidatedUpdate",
  "fields": [
    {
      "name": "update",
      "type": {
        "type": "record",
        "name": "Update",
        "fields": [
          {
            "name": "createDate",
            "type": {
              "type": "long",
              "logicalType": "timestamp-millis"
            }
          },
          {
            "name": "details",
            "type": [
              {
                "name": "OtherUpdateDetails",
                "type": "record",
                "fields": [
                  {
                    "name": "stuff",
                    "type": "string"
                  },
                  {
                    "name": "otherNumber",
                    "type": "int"
                  }
                
                ]
              },
              {
                "name": "SpecialUpdateDetails",
                "type": "record",
                "fields": [
                  {
                    "name": "stuff",
                    "type": "string"
                  },
                  {
                    "name": "isYes",
                    "type": "boolean"
                  }
                ]
              }
            ]
          }
        ]
      }
    }
  ]
}

However when I read the json from the database, map it with jackson to the generated avro message object, all seems fine, but when I try to send it to kafka, it complains about not being able to send it on account of containing a union. What is the best way to fix this?

I've looked into jackson dataformat dependency, but I couldn't figure it out how to fix it. I've checked out other stackoverflow posts, but couldn't find exactly what I'm running into. I've tried to rebuild parts of the kafka message, but that didn't work either. Thank you all in advance for the help.

1

There are 1 best solutions below

0
Nan0fire On

UPDATE: I've found a solution, based on: https://www.baeldung.com/java-apache-avro

From Avro Class to JSON

  public static <T extends SpecificRecord> String toJson(T avroObject) {
        DatumWriter<T> writer = new SpecificDatumWriter<>(avroObject.getSchema());
        try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
            Encoder jsonEncoder = EncoderFactory.get().jsonEncoder(avroObject.getSchema(), stream);
            writer.write(avroObject, jsonEncoder);
            jsonEncoder.flush();
            return stream.toString();
        } catch (IOException e) {
            log.errorf("Serialization error:", e.getMessage());
        }
        return null;
    }

From JSON (String) to Avro Class


    public static SpecificRecord fromJson(String json, Schema schema) {
        DatumReader<SpecificRecord> reader = new SpecificDatumReader<>(schema);
        try {
            Decoder decoder = DecoderFactory.get().jsonDecoder(schema, json);
            return reader.read(null, decoder);
        } catch (IOException e) {
            log.errorf("Deserialization error: %s json: %s", e.getMessage(), json);
        }
        return null;
    }

This works for me, but feel free to suggest more elegant solutions.

Imports

import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import java.io.ByteArrayOutputStream;
import java.io.IOException;