Data not sent to Kafka with Avro serialization

356 Views Asked by At

I'm trying to send data to Kafka with Avro, but the "messages" remain empty. I tried to add the schema to Kafka manually, but to no result. I don't see any errors in the logs of my application, in the schema-registry logs, or in Kafka's logs. Can you tell me what I am doing wrong?

import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
import com.sksamuel.avro4s.{ AvroSchema, Record, RecordFormat, SchemaFor }    

case class User(name: String, age: Int)

val connectConfig: Map[String, Object] =
Map(
  "bootstrap.servers"   -> "localhost:9092",
  "client.id"           -> s"clientID${UUID.randomUUID().toString}",
  "key.serializer"      -> "org.apache.kafka.common.serialization.StringSerializer",
  "value.serializer"    -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
  "schema.registry.url" -> "localhost:8081"
)

implicit val producer = new KafkaProducer[String, Record](connectConfig.asJava)

def sendAvro(msg: DIL): Future[RecordMetadata] = {
    val newUser = User("Joe", 42)
    val schema  = AvroSchema[User]
    implicit val schemaFor = SchemaFor[User]
    val format = RecordFormat[User]

    val record = new ProducerRecord[String, Record]("kafkaLogTopic", format.to(newUser))
    producer.send(record)
}
1

There are 1 best solutions below

2
OneCricketeer On

Kafka sends data in batches, and your single record is smaller than the default batch size.

Use .send(data).get() or producer.flush() to immediately wait on one record to be sent.

Otherwise, you should define a thread on scala.sys.ShutdownHookThread to flush the producer