Create Kafka Struct based on a case class and schema

100 Views Asked by At

I have many case classes/models for which i need to automatically create Kafka struct. I have below code to achieve this manually but I have to do this manually for all case classes which can have different fields as well. Is there a way to create a common method that can accept the case class, schema and records and generate the kafka struct out of it?

Here xyz is the case class/model which has hitDateTime, marketplaceId, hitYear & hitMonth fields.

  override val KAFKA_SCHEMA: Schema = xyz.KAFKA_SCHEMA
  override def buildKafkaStruct(record: Any): Struct = {
    val parsedRecord = record.asInstanceOf[xyz]
    val structRecord = new Struct(KAFKA_SCHEMA)
      .put("time", parsedRecord.time)
      .put("marketplace", parsedRecord.marketplace)

    structRecord
  }
}
1

There are 1 best solutions below

4
Nams On

You can use reflection with macros or libraries such as Shapeless to create structs in a generic way. It's not straightforward, but you can definitely do it.

One other way could be to use Avro together with Kafka. Using Avro, you can convert your case classes to Avro records and vice versa. There are libraries available like Avro4s which allows you auto derive Avro schema and Avro binary format for case classes.

import com.sksamuel.avro4s.{AvroSchema, AvroOutputStreamcase class 
Xyz(hitDateTime: String, marketplaceId: String, hitYear: Int, hitMonth: 
Int)

val schema = AvroSchema[Xyz]
println(schema)  // You can review your Avro schema here

val xyzRecord = Xyz("2020-09-13T21:30:00", "001", 2020, 9)

val baos = new ByteArrayOutputStream()
val output = AvroOutputStream.binary[Xyz].to(baos).build(schema)
output.write(xyzRecord)
output.close()

val avroBinaryDataForXyz = baos.toByteArray() // This is the Avro data you can send to Kafka