I'm trying to send data to Kafka with Avro, but the "messages" remain empty. I tried to add the schema to Kafka manually, but to no result. I don't see any errors in the logs of my application, in the schema-registry logs, or in Kafka's logs. Can you tell me what I am doing wrong?
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
import com.sksamuel.avro4s.{ AvroSchema, Record, RecordFormat, SchemaFor }
case class User(name: String, age: Int)
val connectConfig: Map[String, Object] =
Map(
"bootstrap.servers" -> "localhost:9092",
"client.id" -> s"clientID${UUID.randomUUID().toString}",
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"schema.registry.url" -> "localhost:8081"
)
implicit val producer = new KafkaProducer[String, Record](connectConfig.asJava)
def sendAvro(msg: DIL): Future[RecordMetadata] = {
val newUser = User("Joe", 42)
val schema = AvroSchema[User]
implicit val schemaFor = SchemaFor[User]
val format = RecordFormat[User]
val record = new ProducerRecord[String, Record]("kafkaLogTopic", format.to(newUser))
producer.send(record)
}
Kafka sends data in batches, and your single record is smaller than the default batch size.
Use
.send(data).get()orproducer.flush()to immediately wait on one record to be sent.Otherwise, you should define a thread on
scala.sys.ShutdownHookThreadto flush the producer